spark UDF result can do 'show', but can't do 'filter'












1















spark UDF works when I do show(), but it gives me error when I do filter on UDF result.



udf function



def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False

cip = udf(chkInterPunctuation, BooleanType())


show() works



df_punct = dfs.withColumn("in_length", length("input")).
withColumn("out_length", length("output")).withColumn("cip", cip(col("input")))
df_punct.show()


enter image description here



but it gives me error when I do filter



df_punct.where(col("cip") == True).show()


these are filter error



---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-171-e206ffd07f75> in <module>()
----> 1 df_punct.where(col("cip") == True).collect()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in collect(self)
308 """
309 with SCCallSiteSync(self._sc) as css:
--> 310 port = self._jdf.collectToPython()
311 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
312

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JJavaError(
311 "An error occurred while calling {0}{1}{2}.n".
--> 312 format(target_id, ".", name), value)
313 else:
314 raise Py4JError(

Py4JJavaError: An error occurred while calling o3378.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 40 in stage 238.0 failed 1 times, most recent failure: Lost task 40.0 in stage 238.0 (TID 8862, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more


my googling suggest that py4j error usually occur when UDF function doesn't return proper value or it has error. but my UDF function always return true or false. in addition, spark query returns right value when I do show. it doesn't make sense for me. What can be possible cause?



Thank you for in advance










share|improve this question

























  • Could the error be due to some udf input being to short? Or empty?

    – Shaido
    Nov 21 '18 at 8:03











  • Can you update your question with some input data?

    – martinarroyo
    Nov 21 '18 at 8:29











  • TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType

    – sramalingam24
    Nov 21 '18 at 8:35











  • Before you do the filtering of course

    – sramalingam24
    Nov 21 '18 at 8:36






  • 2





    Without knowing for sure, I'm guessing that show() works the same way in pyspark as it does in regular spark (scala). In normal spark, a show - unlike a collect will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by your show command but will indeed be processed by your filter(...).collect() command. Maybe try to do a df_punct.collect() (no filter) instead of df_punct.show() to verify this?

    – Glennie Helles Sindholt
    Nov 21 '18 at 9:20
















1















spark UDF works when I do show(), but it gives me error when I do filter on UDF result.



udf function



def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False

cip = udf(chkInterPunctuation, BooleanType())


show() works



df_punct = dfs.withColumn("in_length", length("input")).
withColumn("out_length", length("output")).withColumn("cip", cip(col("input")))
df_punct.show()


enter image description here



but it gives me error when I do filter



df_punct.where(col("cip") == True).show()


these are filter error



---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-171-e206ffd07f75> in <module>()
----> 1 df_punct.where(col("cip") == True).collect()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in collect(self)
308 """
309 with SCCallSiteSync(self._sc) as css:
--> 310 port = self._jdf.collectToPython()
311 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
312

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JJavaError(
311 "An error occurred while calling {0}{1}{2}.n".
--> 312 format(target_id, ".", name), value)
313 else:
314 raise Py4JError(

Py4JJavaError: An error occurred while calling o3378.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 40 in stage 238.0 failed 1 times, most recent failure: Lost task 40.0 in stage 238.0 (TID 8862, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more


my googling suggest that py4j error usually occur when UDF function doesn't return proper value or it has error. but my UDF function always return true or false. in addition, spark query returns right value when I do show. it doesn't make sense for me. What can be possible cause?



Thank you for in advance










share|improve this question

























  • Could the error be due to some udf input being to short? Or empty?

    – Shaido
    Nov 21 '18 at 8:03











  • Can you update your question with some input data?

    – martinarroyo
    Nov 21 '18 at 8:29











  • TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType

    – sramalingam24
    Nov 21 '18 at 8:35











  • Before you do the filtering of course

    – sramalingam24
    Nov 21 '18 at 8:36






  • 2





    Without knowing for sure, I'm guessing that show() works the same way in pyspark as it does in regular spark (scala). In normal spark, a show - unlike a collect will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by your show command but will indeed be processed by your filter(...).collect() command. Maybe try to do a df_punct.collect() (no filter) instead of df_punct.show() to verify this?

    – Glennie Helles Sindholt
    Nov 21 '18 at 9:20














1












1








1








spark UDF works when I do show(), but it gives me error when I do filter on UDF result.



udf function



def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False

cip = udf(chkInterPunctuation, BooleanType())


show() works



df_punct = dfs.withColumn("in_length", length("input")).
withColumn("out_length", length("output")).withColumn("cip", cip(col("input")))
df_punct.show()


enter image description here



but it gives me error when I do filter



df_punct.where(col("cip") == True).show()


these are filter error



---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-171-e206ffd07f75> in <module>()
----> 1 df_punct.where(col("cip") == True).collect()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in collect(self)
308 """
309 with SCCallSiteSync(self._sc) as css:
--> 310 port = self._jdf.collectToPython()
311 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
312

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JJavaError(
311 "An error occurred while calling {0}{1}{2}.n".
--> 312 format(target_id, ".", name), value)
313 else:
314 raise Py4JError(

Py4JJavaError: An error occurred while calling o3378.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 40 in stage 238.0 failed 1 times, most recent failure: Lost task 40.0 in stage 238.0 (TID 8862, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more


my googling suggest that py4j error usually occur when UDF function doesn't return proper value or it has error. but my UDF function always return true or false. in addition, spark query returns right value when I do show. it doesn't make sense for me. What can be possible cause?



Thank you for in advance










share|improve this question
















spark UDF works when I do show(), but it gives me error when I do filter on UDF result.



udf function



def chkInterPunctuation(sent) :
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False

cip = udf(chkInterPunctuation, BooleanType())


show() works



df_punct = dfs.withColumn("in_length", length("input")).
withColumn("out_length", length("output")).withColumn("cip", cip(col("input")))
df_punct.show()


enter image description here



but it gives me error when I do filter



df_punct.where(col("cip") == True).show()


these are filter error



---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-171-e206ffd07f75> in <module>()
----> 1 df_punct.where(col("cip") == True).collect()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in collect(self)
308 """
309 with SCCallSiteSync(self._sc) as css:
--> 310 port = self._jdf.collectToPython()
311 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
312

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JJavaError(
311 "An error occurred while calling {0}{1}{2}.n".
--> 312 format(target_id, ".", name), value)
313 else:
314 raise Py4JError(

Py4JJavaError: An error occurred while calling o3378.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 40 in stage 238.0 failed 1 times, most recent failure: Lost task 40.0 in stage 238.0 (TID 8862, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more


my googling suggest that py4j error usually occur when UDF function doesn't return proper value or it has error. but my UDF function always return true or false. in addition, spark query returns right value when I do show. it doesn't make sense for me. What can be possible cause?



Thank you for in advance







python apache-spark pyspark apache-spark-sql






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 21 '18 at 7:29









Ali AzG

6651616




6651616










asked Nov 21 '18 at 7:08









jinhwanjinhwan

3551319




3551319













  • Could the error be due to some udf input being to short? Or empty?

    – Shaido
    Nov 21 '18 at 8:03











  • Can you update your question with some input data?

    – martinarroyo
    Nov 21 '18 at 8:29











  • TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType

    – sramalingam24
    Nov 21 '18 at 8:35











  • Before you do the filtering of course

    – sramalingam24
    Nov 21 '18 at 8:36






  • 2





    Without knowing for sure, I'm guessing that show() works the same way in pyspark as it does in regular spark (scala). In normal spark, a show - unlike a collect will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by your show command but will indeed be processed by your filter(...).collect() command. Maybe try to do a df_punct.collect() (no filter) instead of df_punct.show() to verify this?

    – Glennie Helles Sindholt
    Nov 21 '18 at 9:20



















  • Could the error be due to some udf input being to short? Or empty?

    – Shaido
    Nov 21 '18 at 8:03











  • Can you update your question with some input data?

    – martinarroyo
    Nov 21 '18 at 8:29











  • TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType

    – sramalingam24
    Nov 21 '18 at 8:35











  • Before you do the filtering of course

    – sramalingam24
    Nov 21 '18 at 8:36






  • 2





    Without knowing for sure, I'm guessing that show() works the same way in pyspark as it does in regular spark (scala). In normal spark, a show - unlike a collect will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by your show command but will indeed be processed by your filter(...).collect() command. Maybe try to do a df_punct.collect() (no filter) instead of df_punct.show() to verify this?

    – Glennie Helles Sindholt
    Nov 21 '18 at 9:20

















Could the error be due to some udf input being to short? Or empty?

– Shaido
Nov 21 '18 at 8:03





Could the error be due to some udf input being to short? Or empty?

– Shaido
Nov 21 '18 at 8:03













Can you update your question with some input data?

– martinarroyo
Nov 21 '18 at 8:29





Can you update your question with some input data?

– martinarroyo
Nov 21 '18 at 8:29













TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType

– sramalingam24
Nov 21 '18 at 8:35





TypeError: 'NoneType' object has no attribute ' getItem, seems to me you just need to cast the cip column to BooleanType

– sramalingam24
Nov 21 '18 at 8:35













Before you do the filtering of course

– sramalingam24
Nov 21 '18 at 8:36





Before you do the filtering of course

– sramalingam24
Nov 21 '18 at 8:36




2




2





Without knowing for sure, I'm guessing that show() works the same way in pyspark as it does in regular spark (scala). In normal spark, a show - unlike a collect will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by your show command but will indeed be processed by your filter(...).collect() command. Maybe try to do a df_punct.collect() (no filter) instead of df_punct.show() to verify this?

– Glennie Helles Sindholt
Nov 21 '18 at 9:20





Without knowing for sure, I'm guessing that show() works the same way in pyspark as it does in regular spark (scala). In normal spark, a show - unlike a collect will not force evaluation of the entire dataframe but only a small part of the data. Just enough to display a tiny fraction of the result set. I suspect that there might be a problem with input data in some row that is not processed by your show command but will indeed be processed by your filter(...).collect() command. Maybe try to do a df_punct.collect() (no filter) instead of df_punct.show() to verify this?

– Glennie Helles Sindholt
Nov 21 '18 at 9:20












4 Answers
4






active

oldest

votes


















2














That happens because you don't correct for NULL presence. Try:



def chkInterPunctuation(sent) :
if not sent: return # In None return
for char in sent[1:-2] :
if char in [""", "'", ".", "!", "?"] :
return True
return False





share|improve this answer































    0














    Issue is resolved by updating to spark2.4.0. (I used 2.0.0)






    share|improve this answer































      0














      Here is a working example:



      df = spark.sql(
      """
      select 'gjgjgjgjgjg' as word
      union
      select 'lalalal?ala' as word
      union
      select 'ryryry.ryry' as word
      """)

      df.createOrReplaceTempView("words")

      from pyspark.sql.types import BooleanType
      from pyspark.sql.functions import col
      def chkInterPunctuation(sent) :
      for char in sent[1:-2] :
      if char in [""", "'", ".", "!", "?"] :
      return True
      return False

      cip = udf(chkInterPunctuation, BooleanType())

      udf_df = df.withColumn("cip", cip(col("word")))

      udf_df.where("cip = true").show()


      Regardless, you can do this without a udf:



      spark.sql("""
      SELECT *
      FROM words
      WHERE word LIKE '%.%'
      OR word LIKE '%?%'
      """).show()





      share|improve this answer































        -1














        I think it's because you use show() after applying where().
        I suggest first apply filter and save it in a new variable and then do show().



        df_punct_filtered = df_punct.where(col("cip") == True)

        df_punct_filtered.show()


        UPDATE:
        or you can use filter() function instead of where():



        df_punct_filtered = df_punct.filter(df_punct.cip == True)

        df_punct_filtered.show()





        share|improve this answer


























        • I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.

          – jinhwan
          Nov 21 '18 at 7:32













        • @jinhwan I've updated my answer. check that out.

          – Ali AzG
          Nov 21 '18 at 7:34













        • sorry but filter are just alias for where. I have tried but it doesn't work

          – jinhwan
          Nov 21 '18 at 7:43











        Your Answer






        StackExchange.ifUsing("editor", function () {
        StackExchange.using("externalEditor", function () {
        StackExchange.using("snippets", function () {
        StackExchange.snippets.init();
        });
        });
        }, "code-snippets");

        StackExchange.ready(function() {
        var channelOptions = {
        tags: "".split(" "),
        id: "1"
        };
        initTagRenderer("".split(" "), "".split(" "), channelOptions);

        StackExchange.using("externalEditor", function() {
        // Have to fire editor after snippets, if snippets enabled
        if (StackExchange.settings.snippets.snippetsEnabled) {
        StackExchange.using("snippets", function() {
        createEditor();
        });
        }
        else {
        createEditor();
        }
        });

        function createEditor() {
        StackExchange.prepareEditor({
        heartbeatType: 'answer',
        autoActivateHeartbeat: false,
        convertImagesToLinks: true,
        noModals: true,
        showLowRepImageUploadWarning: true,
        reputationToPostImages: 10,
        bindNavPrevention: true,
        postfix: "",
        imageUploader: {
        brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
        contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
        allowUrls: true
        },
        onDemand: true,
        discardSelector: ".discard-answer"
        ,immediatelyShowMarkdownHelp:true
        });


        }
        });














        draft saved

        draft discarded


















        StackExchange.ready(
        function () {
        StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53406889%2fspark-udf-result-can-do-show-but-cant-do-filter%23new-answer', 'question_page');
        }
        );

        Post as a guest















        Required, but never shown

























        4 Answers
        4






        active

        oldest

        votes








        4 Answers
        4






        active

        oldest

        votes









        active

        oldest

        votes






        active

        oldest

        votes









        2














        That happens because you don't correct for NULL presence. Try:



        def chkInterPunctuation(sent) :
        if not sent: return # In None return
        for char in sent[1:-2] :
        if char in [""", "'", ".", "!", "?"] :
        return True
        return False





        share|improve this answer




























          2














          That happens because you don't correct for NULL presence. Try:



          def chkInterPunctuation(sent) :
          if not sent: return # In None return
          for char in sent[1:-2] :
          if char in [""", "'", ".", "!", "?"] :
          return True
          return False





          share|improve this answer


























            2












            2








            2







            That happens because you don't correct for NULL presence. Try:



            def chkInterPunctuation(sent) :
            if not sent: return # In None return
            for char in sent[1:-2] :
            if char in [""", "'", ".", "!", "?"] :
            return True
            return False





            share|improve this answer













            That happens because you don't correct for NULL presence. Try:



            def chkInterPunctuation(sent) :
            if not sent: return # In None return
            for char in sent[1:-2] :
            if char in [""", "'", ".", "!", "?"] :
            return True
            return False






            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Nov 21 '18 at 10:51









            user10685334user10685334

            211




            211

























                0














                Issue is resolved by updating to spark2.4.0. (I used 2.0.0)






                share|improve this answer




























                  0














                  Issue is resolved by updating to spark2.4.0. (I used 2.0.0)






                  share|improve this answer


























                    0












                    0








                    0







                    Issue is resolved by updating to spark2.4.0. (I used 2.0.0)






                    share|improve this answer













                    Issue is resolved by updating to spark2.4.0. (I used 2.0.0)







                    share|improve this answer












                    share|improve this answer



                    share|improve this answer










                    answered Nov 21 '18 at 9:17









                    jinhwanjinhwan

                    3551319




                    3551319























                        0














                        Here is a working example:



                        df = spark.sql(
                        """
                        select 'gjgjgjgjgjg' as word
                        union
                        select 'lalalal?ala' as word
                        union
                        select 'ryryry.ryry' as word
                        """)

                        df.createOrReplaceTempView("words")

                        from pyspark.sql.types import BooleanType
                        from pyspark.sql.functions import col
                        def chkInterPunctuation(sent) :
                        for char in sent[1:-2] :
                        if char in [""", "'", ".", "!", "?"] :
                        return True
                        return False

                        cip = udf(chkInterPunctuation, BooleanType())

                        udf_df = df.withColumn("cip", cip(col("word")))

                        udf_df.where("cip = true").show()


                        Regardless, you can do this without a udf:



                        spark.sql("""
                        SELECT *
                        FROM words
                        WHERE word LIKE '%.%'
                        OR word LIKE '%?%'
                        """).show()





                        share|improve this answer




























                          0














                          Here is a working example:



                          df = spark.sql(
                          """
                          select 'gjgjgjgjgjg' as word
                          union
                          select 'lalalal?ala' as word
                          union
                          select 'ryryry.ryry' as word
                          """)

                          df.createOrReplaceTempView("words")

                          from pyspark.sql.types import BooleanType
                          from pyspark.sql.functions import col
                          def chkInterPunctuation(sent) :
                          for char in sent[1:-2] :
                          if char in [""", "'", ".", "!", "?"] :
                          return True
                          return False

                          cip = udf(chkInterPunctuation, BooleanType())

                          udf_df = df.withColumn("cip", cip(col("word")))

                          udf_df.where("cip = true").show()


                          Regardless, you can do this without a udf:



                          spark.sql("""
                          SELECT *
                          FROM words
                          WHERE word LIKE '%.%'
                          OR word LIKE '%?%'
                          """).show()





                          share|improve this answer


























                            0












                            0








                            0







                            Here is a working example:



                            df = spark.sql(
                            """
                            select 'gjgjgjgjgjg' as word
                            union
                            select 'lalalal?ala' as word
                            union
                            select 'ryryry.ryry' as word
                            """)

                            df.createOrReplaceTempView("words")

                            from pyspark.sql.types import BooleanType
                            from pyspark.sql.functions import col
                            def chkInterPunctuation(sent) :
                            for char in sent[1:-2] :
                            if char in [""", "'", ".", "!", "?"] :
                            return True
                            return False

                            cip = udf(chkInterPunctuation, BooleanType())

                            udf_df = df.withColumn("cip", cip(col("word")))

                            udf_df.where("cip = true").show()


                            Regardless, you can do this without a udf:



                            spark.sql("""
                            SELECT *
                            FROM words
                            WHERE word LIKE '%.%'
                            OR word LIKE '%?%'
                            """).show()





                            share|improve this answer













                            Here is a working example:



                            df = spark.sql(
                            """
                            select 'gjgjgjgjgjg' as word
                            union
                            select 'lalalal?ala' as word
                            union
                            select 'ryryry.ryry' as word
                            """)

                            df.createOrReplaceTempView("words")

                            from pyspark.sql.types import BooleanType
                            from pyspark.sql.functions import col
                            def chkInterPunctuation(sent) :
                            for char in sent[1:-2] :
                            if char in [""", "'", ".", "!", "?"] :
                            return True
                            return False

                            cip = udf(chkInterPunctuation, BooleanType())

                            udf_df = df.withColumn("cip", cip(col("word")))

                            udf_df.where("cip = true").show()


                            Regardless, you can do this without a udf:



                            spark.sql("""
                            SELECT *
                            FROM words
                            WHERE word LIKE '%.%'
                            OR word LIKE '%?%'
                            """).show()






                            share|improve this answer












                            share|improve this answer



                            share|improve this answer










                            answered Nov 21 '18 at 12:54









                            VitaliyVitaliy

                            4,44932644




                            4,44932644























                                -1














                                I think it's because you use show() after applying where().
                                I suggest first apply filter and save it in a new variable and then do show().



                                df_punct_filtered = df_punct.where(col("cip") == True)

                                df_punct_filtered.show()


                                UPDATE:
                                or you can use filter() function instead of where():



                                df_punct_filtered = df_punct.filter(df_punct.cip == True)

                                df_punct_filtered.show()





                                share|improve this answer


























                                • I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.

                                  – jinhwan
                                  Nov 21 '18 at 7:32













                                • @jinhwan I've updated my answer. check that out.

                                  – Ali AzG
                                  Nov 21 '18 at 7:34













                                • sorry but filter are just alias for where. I have tried but it doesn't work

                                  – jinhwan
                                  Nov 21 '18 at 7:43
















                                -1














                                I think it's because you use show() after applying where().
                                I suggest first apply filter and save it in a new variable and then do show().



                                df_punct_filtered = df_punct.where(col("cip") == True)

                                df_punct_filtered.show()


                                UPDATE:
                                or you can use filter() function instead of where():



                                df_punct_filtered = df_punct.filter(df_punct.cip == True)

                                df_punct_filtered.show()





                                share|improve this answer


























                                • I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.

                                  – jinhwan
                                  Nov 21 '18 at 7:32













                                • @jinhwan I've updated my answer. check that out.

                                  – Ali AzG
                                  Nov 21 '18 at 7:34













                                • sorry but filter are just alias for where. I have tried but it doesn't work

                                  – jinhwan
                                  Nov 21 '18 at 7:43














                                -1












                                -1








                                -1







                                I think it's because you use show() after applying where().
                                I suggest first apply filter and save it in a new variable and then do show().



                                df_punct_filtered = df_punct.where(col("cip") == True)

                                df_punct_filtered.show()


                                UPDATE:
                                or you can use filter() function instead of where():



                                df_punct_filtered = df_punct.filter(df_punct.cip == True)

                                df_punct_filtered.show()





                                share|improve this answer















                                I think it's because you use show() after applying where().
                                I suggest first apply filter and save it in a new variable and then do show().



                                df_punct_filtered = df_punct.where(col("cip") == True)

                                df_punct_filtered.show()


                                UPDATE:
                                or you can use filter() function instead of where():



                                df_punct_filtered = df_punct.filter(df_punct.cip == True)

                                df_punct_filtered.show()






                                share|improve this answer














                                share|improve this answer



                                share|improve this answer








                                edited Nov 21 '18 at 7:34

























                                answered Nov 21 '18 at 7:17









                                Ali AzGAli AzG

                                6651616




                                6651616













                                • I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.

                                  – jinhwan
                                  Nov 21 '18 at 7:32













                                • @jinhwan I've updated my answer. check that out.

                                  – Ali AzG
                                  Nov 21 '18 at 7:34













                                • sorry but filter are just alias for where. I have tried but it doesn't work

                                  – jinhwan
                                  Nov 21 '18 at 7:43



















                                • I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.

                                  – jinhwan
                                  Nov 21 '18 at 7:32













                                • @jinhwan I've updated my answer. check that out.

                                  – Ali AzG
                                  Nov 21 '18 at 7:34













                                • sorry but filter are just alias for where. I have tried but it doesn't work

                                  – jinhwan
                                  Nov 21 '18 at 7:43

















                                I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.

                                – jinhwan
                                Nov 21 '18 at 7:32







                                I have tried your suggestion. but it gives me same error. In my opinion, Spark do lazy evaluation, so declare new variable doesn't change meaning.

                                – jinhwan
                                Nov 21 '18 at 7:32















                                @jinhwan I've updated my answer. check that out.

                                – Ali AzG
                                Nov 21 '18 at 7:34







                                @jinhwan I've updated my answer. check that out.

                                – Ali AzG
                                Nov 21 '18 at 7:34















                                sorry but filter are just alias for where. I have tried but it doesn't work

                                – jinhwan
                                Nov 21 '18 at 7:43





                                sorry but filter are just alias for where. I have tried but it doesn't work

                                – jinhwan
                                Nov 21 '18 at 7:43


















                                draft saved

                                draft discarded




















































                                Thanks for contributing an answer to Stack Overflow!


                                • Please be sure to answer the question. Provide details and share your research!

                                But avoid



                                • Asking for help, clarification, or responding to other answers.

                                • Making statements based on opinion; back them up with references or personal experience.


                                To learn more, see our tips on writing great answers.




                                draft saved


                                draft discarded














                                StackExchange.ready(
                                function () {
                                StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53406889%2fspark-udf-result-can-do-show-but-cant-do-filter%23new-answer', 'question_page');
                                }
                                );

                                Post as a guest















                                Required, but never shown





















































                                Required, but never shown














                                Required, but never shown












                                Required, but never shown







                                Required, but never shown

































                                Required, but never shown














                                Required, but never shown












                                Required, but never shown







                                Required, but never shown







                                Popular posts from this blog

                                MongoDB - Not Authorized To Execute Command

                                in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith

                                How to fix TextFormField cause rebuild widget in Flutter