OutOfMemoryError while doing join operation in flink












0















We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error



2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)


From the exception view in flink job manager dashboard, we could see that this is happening at a join operation.

When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.
Dataset R2 has records with keys occurring at most once.

Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only.



Hadoop version : 2.7.1
Beam verision : 2.8.0
Flink Runner version : 2.8.0



Let me know what more information should I fetch and post here in order for you to help me resolve this.










share|improve this question























  • Which DataSet function do you use to do your join? If you use DataSet#join you should be able to provide a JoinHint. I think BROADCAST_HASH_SECOND is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.

    – Arthur
    Nov 22 '18 at 17:56











  • Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .

    – akshay
    Nov 22 '18 at 18:06
















0















We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error



2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)


From the exception view in flink job manager dashboard, we could see that this is happening at a join operation.

When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.
Dataset R2 has records with keys occurring at most once.

Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only.



Hadoop version : 2.7.1
Beam verision : 2.8.0
Flink Runner version : 2.8.0



Let me know what more information should I fetch and post here in order for you to help me resolve this.










share|improve this question























  • Which DataSet function do you use to do your join? If you use DataSet#join you should be able to provide a JoinHint. I think BROADCAST_HASH_SECOND is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.

    – Arthur
    Nov 22 '18 at 17:56











  • Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .

    – akshay
    Nov 22 '18 at 18:06














0












0








0








We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error



2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)


From the exception view in flink job manager dashboard, we could see that this is happening at a join operation.

When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.
Dataset R2 has records with keys occurring at most once.

Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only.



Hadoop version : 2.7.1
Beam verision : 2.8.0
Flink Runner version : 2.8.0



Let me know what more information should I fetch and post here in order for you to help me resolve this.










share|improve this question














We are converting one of our pig pipelines to flink using apache beam. The pig pipeline reads two different data sets (R1 & R2) from hdfs, enriches them, joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it has few keys with lot of records. When we converted the pig pipeline to apache beam and ran it using flink on a production yarn cluster, we got the following error



2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) (25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
... 9 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)


From the exception view in flink job manager dashboard, we could see that this is happening at a join operation.

When I say R1 dataset is skewed, there are some keys with number of occurrences as high as 8,000,000 , while most of the keys occur just once.
Dataset R2 has records with keys occurring at most once.

Also, if we exclude such keys which has high number of occurrences, the pipeline runs absolutely fine which proves it is happening due these few keys only.



Hadoop version : 2.7.1
Beam verision : 2.8.0
Flink Runner version : 2.8.0



Let me know what more information should I fetch and post here in order for you to help me resolve this.







apache-flink apache-beam






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 22 '18 at 5:50









akshayakshay

143




143













  • Which DataSet function do you use to do your join? If you use DataSet#join you should be able to provide a JoinHint. I think BROADCAST_HASH_SECOND is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.

    – Arthur
    Nov 22 '18 at 17:56











  • Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .

    – akshay
    Nov 22 '18 at 18:06



















  • Which DataSet function do you use to do your join? If you use DataSet#join you should be able to provide a JoinHint. I think BROADCAST_HASH_SECOND is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.

    – Arthur
    Nov 22 '18 at 17:56











  • Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .

    – akshay
    Nov 22 '18 at 18:06

















Which DataSet function do you use to do your join? If you use DataSet#join you should be able to provide a JoinHint. I think BROADCAST_HASH_SECOND is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.

– Arthur
Nov 22 '18 at 17:56





Which DataSet function do you use to do your join? If you use DataSet#join you should be able to provide a JoinHint. I think BROADCAST_HASH_SECOND is what you need. Though I'm not 100% sure it will help you, because in general it's helpful when the second data set is much smaller than the first. In your case it's different, the distribution of keys is different.

– Arthur
Nov 22 '18 at 17:56













Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .

– akshay
Nov 22 '18 at 18:06





Since I am using apache beam to write the pipeline, it does not give freedom to use the JointHint as suggested. Anyways, I cannot use the BROADCAST_HASH_SECOND as suggested by you as the second dataset is much bigger than the memory I can allocate to task manager. In my case, the record after join is too big to fit in a memory of a task manager (which is given 8GBs) . This makes me wonder if flink has even a way to handle this. As far as I know pig and spark does that .

– akshay
Nov 22 '18 at 18:06












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424624%2foutofmemoryerror-while-doing-join-operation-in-flink%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424624%2foutofmemoryerror-while-doing-join-operation-in-flink%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

MongoDB - Not Authorized To Execute Command

How to fix TextFormField cause rebuild widget in Flutter

Npm cannot find a required file even through it is in the searched directory