OutOfMemoryError while doing join operation in flink
We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error
2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
From the exception view in flink job manager dashboard, we could see that this is happening at a join operation.
When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.
Dataset R2 has records with keys occurring at most once.
Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only.
Hadoop version : 2.7.1
Beam verision : 2.8.0
Flink Runner version : 2.8.0
Let me know what more information should I fetch and post here in order for you to help me resolve this.
apache-flink apache-beam
add a comment |
We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error
2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
From the exception view in flink job manager dashboard, we could see that this is happening at a join operation.
When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.
Dataset R2 has records with keys occurring at most once.
Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only.
Hadoop version : 2.7.1
Beam verision : 2.8.0
Flink Runner version : 2.8.0
Let me know what more information should I fetch and post here in order for you to help me resolve this.
apache-flink apache-beam
WhichDataSet
function do you use to do your join? If you useDataSet#join
you should be able to provide aJoinHint
. I thinkBROADCAST_HASH_SECOND
is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.
– Arthur
Nov 22 '18 at 17:56
Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .
– akshay
Nov 22 '18 at 18:06
add a comment |
We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error
2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
From the exception view in flink job manager dashboard, we could see that this is happening at a join operation.
When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.
Dataset R2 has records with keys occurring at most once.
Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only.
Hadoop version : 2.7.1
Beam verision : 2.8.0
Flink Runner version : 2.8.0
Let me know what more information should I fetch and post here in order for you to help me resolve this.
apache-flink apache-beam
We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error
2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
From the exception view in flink job manager dashboard, we could see that this is happening at a join operation.
When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.
Dataset R2 has records with keys occurring at most once.
Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only.
Hadoop version : 2.7.1
Beam verision : 2.8.0
Flink Runner version : 2.8.0
Let me know what more information should I fetch and post here in order for you to help me resolve this.
apache-flink apache-beam
apache-flink apache-beam
asked Nov 22 '18 at 5:50
akshayakshay
143
143
WhichDataSet
function do you use to do your join? If you useDataSet#join
you should be able to provide aJoinHint
. I thinkBROADCAST_HASH_SECOND
is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.
– Arthur
Nov 22 '18 at 17:56
Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .
– akshay
Nov 22 '18 at 18:06
add a comment |
WhichDataSet
function do you use to do your join? If you useDataSet#join
you should be able to provide aJoinHint
. I thinkBROADCAST_HASH_SECOND
is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.
– Arthur
Nov 22 '18 at 17:56
Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .
– akshay
Nov 22 '18 at 18:06
Which
DataSet
function do you use to do your join? If you use DataSet#join
you should be able to provide a JoinHint
. I think BROADCAST_HASH_SECOND
is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.– Arthur
Nov 22 '18 at 17:56
Which
DataSet
function do you use to do your join? If you use DataSet#join
you should be able to provide a JoinHint
. I think BROADCAST_HASH_SECOND
is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.– Arthur
Nov 22 '18 at 17:56
Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .
– akshay
Nov 22 '18 at 18:06
Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .
– akshay
Nov 22 '18 at 18:06
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424624%2foutofmemoryerror-while-doing-join-operation-in-flink%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424624%2foutofmemoryerror-while-doing-join-operation-in-flink%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Which
DataSet
function do you use to do your join? If you useDataSet#join
you should be able to provide aJoinHint
. I thinkBROADCAST_HASH_SECOND
is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.– Arthur
Nov 22 '18 at 17:56
Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .
– akshay
Nov 22 '18 at 18:06