spark UDF result can do 'show', but can't do 'filter'
spark UDF
works when I do show()
, but it gives me error when I do filter
on UDF
result.
udf function
def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
cip = udf(chkInterPunctuation, BooleanType())
show()
works
df_punct = dfs.withColumn("in_length", length("input")).
withColumn("out_length", length("output")).withColumn("cip", cip(col("input")))
df_punct.show()
but it gives me error when I do filter
df_punct.where(col("cip") == True).show()
these are filter
error
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-171-e206ffd07f75> in <module>()
----> 1 df_punct.where(col("cip") == True).collect()
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in collect(self)
308 """
309 with SCCallSiteSync(self._sc) as css:
--> 310 port = self._jdf.collectToPython()
311 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
312
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JJavaError(
311 "An error occurred while calling {0}{1}{2}.n".
--> 312 format(target_id, ".", name), value)
313 else:
314 raise Py4JError(
Py4JJavaError: An error occurred while calling o3378.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 40 in stage 238.0 failed 1 times, most recent failure: Lost task 40.0 in stage 238.0 (TID 8862, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
my googling suggest that py4j
error usually occur when UDF
function doesn't return proper value or it has error. but my UDF
function
always return true or false. in addition, spark query returns right value when I do show. it doesn't make sense for me. What can be possible cause?
Thank you for in advance
python apache-spark pyspark apache-spark-sql
add a comment |
spark UDF
works when I do show()
, but it gives me error when I do filter
on UDF
result.
udf function
def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
cip = udf(chkInterPunctuation, BooleanType())
show()
works
df_punct = dfs.withColumn("in_length", length("input")).
withColumn("out_length", length("output")).withColumn("cip", cip(col("input")))
df_punct.show()
but it gives me error when I do filter
df_punct.where(col("cip") == True).show()
these are filter
error
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-171-e206ffd07f75> in <module>()
----> 1 df_punct.where(col("cip") == True).collect()
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in collect(self)
308 """
309 with SCCallSiteSync(self._sc) as css:
--> 310 port = self._jdf.collectToPython()
311 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
312
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JJavaError(
311 "An error occurred while calling {0}{1}{2}.n".
--> 312 format(target_id, ".", name), value)
313 else:
314 raise Py4JError(
Py4JJavaError: An error occurred while calling o3378.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 40 in stage 238.0 failed 1 times, most recent failure: Lost task 40.0 in stage 238.0 (TID 8862, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
my googling suggest that py4j
error usually occur when UDF
function doesn't return proper value or it has error. but my UDF
function
always return true or false. in addition, spark query returns right value when I do show. it doesn't make sense for me. What can be possible cause?
Thank you for in advance
python apache-spark pyspark apache-spark-sql
Could the error be due to some udf input being to short? Or empty?
– Shaido
Nov 21 '18 at 8:03
Can you update your question with some input data?
– martinarroyo
Nov 21 '18 at 8:29
TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType
– sramalingam24
Nov 21 '18 at 8:35
Before you do the filtering of course
– sramalingam24
Nov 21 '18 at 8:36
2
Without knowing for sure, I'm guessing thatshow()
works the same way in pyspark as it does in regular spark (scala). In normal spark, ashow
- unlike acollect
will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by yourshow
command but will indeed be processed by yourfilter(...).collect()
command. Maybe try to do adf_punct.collect()
(no filter) instead ofdf_punct.show()
to verify this?
– Glennie Helles Sindholt
Nov 21 '18 at 9:20
add a comment |
spark UDF
works when I do show()
, but it gives me error when I do filter
on UDF
result.
udf function
def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
cip = udf(chkInterPunctuation, BooleanType())
show()
works
df_punct = dfs.withColumn("in_length", length("input")).
withColumn("out_length", length("output")).withColumn("cip", cip(col("input")))
df_punct.show()
but it gives me error when I do filter
df_punct.where(col("cip") == True).show()
these are filter
error
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-171-e206ffd07f75> in <module>()
----> 1 df_punct.where(col("cip") == True).collect()
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in collect(self)
308 """
309 with SCCallSiteSync(self._sc) as css:
--> 310 port = self._jdf.collectToPython()
311 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
312
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JJavaError(
311 "An error occurred while calling {0}{1}{2}.n".
--> 312 format(target_id, ".", name), value)
313 else:
314 raise Py4JError(
Py4JJavaError: An error occurred while calling o3378.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 40 in stage 238.0 failed 1 times, most recent failure: Lost task 40.0 in stage 238.0 (TID 8862, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
my googling suggest that py4j
error usually occur when UDF
function doesn't return proper value or it has error. but my UDF
function
always return true or false. in addition, spark query returns right value when I do show. it doesn't make sense for me. What can be possible cause?
Thank you for in advance
python apache-spark pyspark apache-spark-sql
spark UDF
works when I do show()
, but it gives me error when I do filter
on UDF
result.
udf function
def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
cip = udf(chkInterPunctuation, BooleanType())
show()
works
df_punct = dfs.withColumn("in_length", length("input")).
withColumn("out_length", length("output")).withColumn("cip", cip(col("input")))
df_punct.show()
but it gives me error when I do filter
df_punct.where(col("cip") == True).show()
these are filter
error
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-171-e206ffd07f75> in <module>()
----> 1 df_punct.where(col("cip") == True).collect()
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in collect(self)
308 """
309 with SCCallSiteSync(self._sc) as css:
--> 310 port = self._jdf.collectToPython()
311 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
312
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JJavaError(
311 "An error occurred while calling {0}{1}{2}.n".
--> 312 format(target_id, ".", name), value)
313 else:
314 raise Py4JError(
Py4JJavaError: An error occurred while calling o3378.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 40 in stage 238.0 failed 1 times, most recent failure: Lost task 40.0 in stage 238.0 (TID 8862, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
my googling suggest that py4j
error usually occur when UDF
function doesn't return proper value or it has error. but my UDF
function
always return true or false. in addition, spark query returns right value when I do show. it doesn't make sense for me. What can be possible cause?
Thank you for in advance
python apache-spark pyspark apache-spark-sql
python apache-spark pyspark apache-spark-sql
edited Nov 21 '18 at 7:29


Ali AzG
6651616
6651616
asked Nov 21 '18 at 7:08
jinhwanjinhwan
3551319
3551319
Could the error be due to some udf input being to short? Or empty?
– Shaido
Nov 21 '18 at 8:03
Can you update your question with some input data?
– martinarroyo
Nov 21 '18 at 8:29
TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType
– sramalingam24
Nov 21 '18 at 8:35
Before you do the filtering of course
– sramalingam24
Nov 21 '18 at 8:36
2
Without knowing for sure, I'm guessing thatshow()
works the same way in pyspark as it does in regular spark (scala). In normal spark, ashow
- unlike acollect
will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by yourshow
command but will indeed be processed by yourfilter(...).collect()
command. Maybe try to do adf_punct.collect()
(no filter) instead ofdf_punct.show()
to verify this?
– Glennie Helles Sindholt
Nov 21 '18 at 9:20
add a comment |
Could the error be due to some udf input being to short? Or empty?
– Shaido
Nov 21 '18 at 8:03
Can you update your question with some input data?
– martinarroyo
Nov 21 '18 at 8:29
TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType
– sramalingam24
Nov 21 '18 at 8:35
Before you do the filtering of course
– sramalingam24
Nov 21 '18 at 8:36
2
Without knowing for sure, I'm guessing thatshow()
works the same way in pyspark as it does in regular spark (scala). In normal spark, ashow
- unlike acollect
will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by yourshow
command but will indeed be processed by yourfilter(...).collect()
command. Maybe try to do adf_punct.collect()
(no filter) instead ofdf_punct.show()
to verify this?
– Glennie Helles Sindholt
Nov 21 '18 at 9:20
Could the error be due to some udf input being to short? Or empty?
– Shaido
Nov 21 '18 at 8:03
Could the error be due to some udf input being to short? Or empty?
– Shaido
Nov 21 '18 at 8:03
Can you update your question with some input data?
– martinarroyo
Nov 21 '18 at 8:29
Can you update your question with some input data?
– martinarroyo
Nov 21 '18 at 8:29
TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType
– sramalingam24
Nov 21 '18 at 8:35
TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType
– sramalingam24
Nov 21 '18 at 8:35
Before you do the filtering of course
– sramalingam24
Nov 21 '18 at 8:36
Before you do the filtering of course
– sramalingam24
Nov 21 '18 at 8:36
2
2
Without knowing for sure, I'm guessing that
show()
works the same way in pyspark as it does in regular spark (scala). In normal spark, a show
- unlike a collect
will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by your show
command but will indeed be processed by your filter(...).collect()
command. Maybe try to do a df_punct.collect()
(no filter) instead of df_punct.show()
to verify this?– Glennie Helles Sindholt
Nov 21 '18 at 9:20
Without knowing for sure, I'm guessing that
show()
works the same way in pyspark as it does in regular spark (scala). In normal spark, a show
- unlike a collect
will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by your show
command but will indeed be processed by your filter(...).collect()
command. Maybe try to do a df_punct.collect()
(no filter) instead of df_punct.show()
to verify this?– Glennie Helles Sindholt
Nov 21 '18 at 9:20
add a comment |
4 Answers
4
active
oldest
votes
That happens because you don't correct for NULL
presence. Try:
def chkInterPunctuation(sent) :
if not sent: return # In None return
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
add a comment |
Issue is resolved by updating to spark2.4.0. (I used 2.0.0)
add a comment |
Here is a working example:
df = spark.sql(
"""
select 'gjgjgjgjgjg' as word
union
select 'lalalal?ala' as word
union
select 'ryryry.ryry' as word
""")
df.createOrReplaceTempView("words")
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import col
def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
cip = udf(chkInterPunctuation, BooleanType())
udf_df = df.withColumn("cip", cip(col("word")))
udf_df.where("cip = true").show()
Regardless, you can do this without a udf:
spark.sql("""
SELECT *
FROM words
WHERE word LIKE '%.%'
OR word LIKE '%?%'
""").show()
add a comment |
I think it's because you use show()
after applying where()
.
I suggest first apply filter and save it in a new variable and then do show()
.
df_punct_filtered = df_punct.where(col("cip") == True)
df_punct_filtered.show()
UPDATE:
or you can use filter()
function instead of where()
:
df_punct_filtered = df_punct.filter(df_punct.cip == True)
df_punct_filtered.show()
I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.
– jinhwan
Nov 21 '18 at 7:32
@jinhwan I've updated my answer. check that out.
– Ali AzG
Nov 21 '18 at 7:34
sorry but filter are just alias for where. I have tried but it doesn't work
– jinhwan
Nov 21 '18 at 7:43
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53406889%2fspark-udf-result-can-do-show-but-cant-do-filter%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
4 Answers
4
active
oldest
votes
4 Answers
4
active
oldest
votes
active
oldest
votes
active
oldest
votes
That happens because you don't correct for NULL
presence. Try:
def chkInterPunctuation(sent) :
if not sent: return # In None return
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
add a comment |
That happens because you don't correct for NULL
presence. Try:
def chkInterPunctuation(sent) :
if not sent: return # In None return
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
add a comment |
That happens because you don't correct for NULL
presence. Try:
def chkInterPunctuation(sent) :
if not sent: return # In None return
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
That happens because you don't correct for NULL
presence. Try:
def chkInterPunctuation(sent) :
if not sent: return # In None return
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
answered Nov 21 '18 at 10:51
user10685334user10685334
211
211
add a comment |
add a comment |
Issue is resolved by updating to spark2.4.0. (I used 2.0.0)
add a comment |
Issue is resolved by updating to spark2.4.0. (I used 2.0.0)
add a comment |
Issue is resolved by updating to spark2.4.0. (I used 2.0.0)
Issue is resolved by updating to spark2.4.0. (I used 2.0.0)
answered Nov 21 '18 at 9:17
jinhwanjinhwan
3551319
3551319
add a comment |
add a comment |
Here is a working example:
df = spark.sql(
"""
select 'gjgjgjgjgjg' as word
union
select 'lalalal?ala' as word
union
select 'ryryry.ryry' as word
""")
df.createOrReplaceTempView("words")
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import col
def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
cip = udf(chkInterPunctuation, BooleanType())
udf_df = df.withColumn("cip", cip(col("word")))
udf_df.where("cip = true").show()
Regardless, you can do this without a udf:
spark.sql("""
SELECT *
FROM words
WHERE word LIKE '%.%'
OR word LIKE '%?%'
""").show()
add a comment |
Here is a working example:
df = spark.sql(
"""
select 'gjgjgjgjgjg' as word
union
select 'lalalal?ala' as word
union
select 'ryryry.ryry' as word
""")
df.createOrReplaceTempView("words")
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import col
def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
cip = udf(chkInterPunctuation, BooleanType())
udf_df = df.withColumn("cip", cip(col("word")))
udf_df.where("cip = true").show()
Regardless, you can do this without a udf:
spark.sql("""
SELECT *
FROM words
WHERE word LIKE '%.%'
OR word LIKE '%?%'
""").show()
add a comment |
Here is a working example:
df = spark.sql(
"""
select 'gjgjgjgjgjg' as word
union
select 'lalalal?ala' as word
union
select 'ryryry.ryry' as word
""")
df.createOrReplaceTempView("words")
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import col
def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
cip = udf(chkInterPunctuation, BooleanType())
udf_df = df.withColumn("cip", cip(col("word")))
udf_df.where("cip = true").show()
Regardless, you can do this without a udf:
spark.sql("""
SELECT *
FROM words
WHERE word LIKE '%.%'
OR word LIKE '%?%'
""").show()
Here is a working example:
df = spark.sql(
"""
select 'gjgjgjgjgjg' as word
union
select 'lalalal?ala' as word
union
select 'ryryry.ryry' as word
""")
df.createOrReplaceTempView("words")
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import col
def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False
cip = udf(chkInterPunctuation, BooleanType())
udf_df = df.withColumn("cip", cip(col("word")))
udf_df.where("cip = true").show()
Regardless, you can do this without a udf:
spark.sql("""
SELECT *
FROM words
WHERE word LIKE '%.%'
OR word LIKE '%?%'
""").show()
answered Nov 21 '18 at 12:54
VitaliyVitaliy
4,44932644
4,44932644
add a comment |
add a comment |
I think it's because you use show()
after applying where()
.
I suggest first apply filter and save it in a new variable and then do show()
.
df_punct_filtered = df_punct.where(col("cip") == True)
df_punct_filtered.show()
UPDATE:
or you can use filter()
function instead of where()
:
df_punct_filtered = df_punct.filter(df_punct.cip == True)
df_punct_filtered.show()
I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.
– jinhwan
Nov 21 '18 at 7:32
@jinhwan I've updated my answer. check that out.
– Ali AzG
Nov 21 '18 at 7:34
sorry but filter are just alias for where. I have tried but it doesn't work
– jinhwan
Nov 21 '18 at 7:43
add a comment |
I think it's because you use show()
after applying where()
.
I suggest first apply filter and save it in a new variable and then do show()
.
df_punct_filtered = df_punct.where(col("cip") == True)
df_punct_filtered.show()
UPDATE:
or you can use filter()
function instead of where()
:
df_punct_filtered = df_punct.filter(df_punct.cip == True)
df_punct_filtered.show()
I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.
– jinhwan
Nov 21 '18 at 7:32
@jinhwan I've updated my answer. check that out.
– Ali AzG
Nov 21 '18 at 7:34
sorry but filter are just alias for where. I have tried but it doesn't work
– jinhwan
Nov 21 '18 at 7:43
add a comment |
I think it's because you use show()
after applying where()
.
I suggest first apply filter and save it in a new variable and then do show()
.
df_punct_filtered = df_punct.where(col("cip") == True)
df_punct_filtered.show()
UPDATE:
or you can use filter()
function instead of where()
:
df_punct_filtered = df_punct.filter(df_punct.cip == True)
df_punct_filtered.show()
I think it's because you use show()
after applying where()
.
I suggest first apply filter and save it in a new variable and then do show()
.
df_punct_filtered = df_punct.where(col("cip") == True)
df_punct_filtered.show()
UPDATE:
or you can use filter()
function instead of where()
:
df_punct_filtered = df_punct.filter(df_punct.cip == True)
df_punct_filtered.show()
edited Nov 21 '18 at 7:34
answered Nov 21 '18 at 7:17


Ali AzGAli AzG
6651616
6651616
I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.
– jinhwan
Nov 21 '18 at 7:32
@jinhwan I've updated my answer. check that out.
– Ali AzG
Nov 21 '18 at 7:34
sorry but filter are just alias for where. I have tried but it doesn't work
– jinhwan
Nov 21 '18 at 7:43
add a comment |
I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.
– jinhwan
Nov 21 '18 at 7:32
@jinhwan I've updated my answer. check that out.
– Ali AzG
Nov 21 '18 at 7:34
sorry but filter are just alias for where. I have tried but it doesn't work
– jinhwan
Nov 21 '18 at 7:43
I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.
– jinhwan
Nov 21 '18 at 7:32
I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.
– jinhwan
Nov 21 '18 at 7:32
@jinhwan I've updated my answer. check that out.
– Ali AzG
Nov 21 '18 at 7:34
@jinhwan I've updated my answer. check that out.
– Ali AzG
Nov 21 '18 at 7:34
sorry but filter are just alias for where. I have tried but it doesn't work
– jinhwan
Nov 21 '18 at 7:43
sorry but filter are just alias for where. I have tried but it doesn't work
– jinhwan
Nov 21 '18 at 7:43
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53406889%2fspark-udf-result-can-do-show-but-cant-do-filter%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Could the error be due to some udf input being to short? Or empty?
– Shaido
Nov 21 '18 at 8:03
Can you update your question with some input data?
– martinarroyo
Nov 21 '18 at 8:29
TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType
– sramalingam24
Nov 21 '18 at 8:35
Before you do the filtering of course
– sramalingam24
Nov 21 '18 at 8:36
2
Without knowing for sure, I'm guessing that
show()
works the same way in pyspark as it does in regular spark (scala). In normal spark, ashow
- unlike acollect
will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by yourshow
command but will indeed be processed by yourfilter(...).collect()
command. Maybe try to do adf_punct.collect()
(no filter) instead ofdf_punct.show()
to verify this?– Glennie Helles Sindholt
Nov 21 '18 at 9:20