KafkaTridentSpoutOpaque Repeated consumption the last message
I use storm+kafka+protobuf to build my stream process system.
The problem is KafkaTridentSpoutOpaque repeatedly consumes the last message. I want just one consumer for every message in kafka.
Followings are some details:
Java Dependency
storm-kafka-client 1.2.2
storm-core 1.2.2
kafka_2.10 0.10.2.0
Component
kafka_2.12-2.0.0
apache-storm-1.2.2
Build KafkaTridentSpoutOpaque instance code
protected static KafkaSpoutConfig<String, byte> newKafkaSpoutConfig(String bootstrapServers, String topic) {
KafkaSpoutConfig.Builder<String, byte> builder = new KafkaSpoutConfig.Builder<>(bootstrapServers, topic);
return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, "stormKafkaSpoutGroup")
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
.setRecordTranslator(new JustValueFunc(), new Fields("str"))
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.setProcessingGuarantee(AT_MOST_ONCE)
.build();
}
private static KafkaTridentSpoutOpaque<String, byte> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte> spoutConfig) {
return new KafkaTridentSpoutOpaque<>(spoutConfig);
}
private static class JustValueFunc implements Func<ConsumerRecord<String, byte>, List<Object>>, Serializable {
@Override
public List<Object> apply(ConsumerRecord<String, byte> record) {
Values res = null;
try {
res = new Values(PbMiddlewareTransfer.Record.parseFrom(record.value()));
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return res;
}
}
Here is my topology code
public static void main(String args) throws Exception {
StormTopology topology = getTridentTopology();
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("storm-kafka-client-spout-test", conf, topology);
}
public static StormTopology getTridentTopology() {
final TridentTopology tridentTopology = new TridentTopology();
KafkaSpoutConfig<String, byte> spoutConfig = newKafkaSpoutConfig("192.168.0.202:9092", "test-2");
ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig);
final Stream spoutStream = tridentTopology.newStream("spout", spout).parallelismHint(1);
spoutStream.each(spoutStream.getOutputFields(), new Debug("##### fastest driver"));
return tridentTopology.build();
}
Output Log
./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:42 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:44 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:03 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:14 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:20 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:56 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:24:33 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:27:35 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:23 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:29:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:07 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:34:34 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:35:36 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:39:31 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:40:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:16 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:43:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:44:26 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:52:43 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:54:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:55:13 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:59:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
I just produce one message in kafka and it should be only one single output, but indeed there are many. And it will repeat about every 45 minutes.
Any help is appreciated.
Thanks.
java apache-kafka apache-storm trident
add a comment |
I use storm+kafka+protobuf to build my stream process system.
The problem is KafkaTridentSpoutOpaque repeatedly consumes the last message. I want just one consumer for every message in kafka.
Followings are some details:
Java Dependency
storm-kafka-client 1.2.2
storm-core 1.2.2
kafka_2.10 0.10.2.0
Component
kafka_2.12-2.0.0
apache-storm-1.2.2
Build KafkaTridentSpoutOpaque instance code
protected static KafkaSpoutConfig<String, byte> newKafkaSpoutConfig(String bootstrapServers, String topic) {
KafkaSpoutConfig.Builder<String, byte> builder = new KafkaSpoutConfig.Builder<>(bootstrapServers, topic);
return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, "stormKafkaSpoutGroup")
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
.setRecordTranslator(new JustValueFunc(), new Fields("str"))
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.setProcessingGuarantee(AT_MOST_ONCE)
.build();
}
private static KafkaTridentSpoutOpaque<String, byte> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte> spoutConfig) {
return new KafkaTridentSpoutOpaque<>(spoutConfig);
}
private static class JustValueFunc implements Func<ConsumerRecord<String, byte>, List<Object>>, Serializable {
@Override
public List<Object> apply(ConsumerRecord<String, byte> record) {
Values res = null;
try {
res = new Values(PbMiddlewareTransfer.Record.parseFrom(record.value()));
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return res;
}
}
Here is my topology code
public static void main(String args) throws Exception {
StormTopology topology = getTridentTopology();
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("storm-kafka-client-spout-test", conf, topology);
}
public static StormTopology getTridentTopology() {
final TridentTopology tridentTopology = new TridentTopology();
KafkaSpoutConfig<String, byte> spoutConfig = newKafkaSpoutConfig("192.168.0.202:9092", "test-2");
ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig);
final Stream spoutStream = tridentTopology.newStream("spout", spout).parallelismHint(1);
spoutStream.each(spoutStream.getOutputFields(), new Debug("##### fastest driver"));
return tridentTopology.build();
}
Output Log
./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:42 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:44 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:03 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:14 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:20 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:56 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:24:33 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:27:35 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:23 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:29:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:07 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:34:34 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:35:36 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:39:31 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:40:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:16 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:43:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:44:26 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:52:43 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:54:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:55:13 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:59:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
I just produce one message in kafka and it should be only one single output, but indeed there are many. And it will repeat about every 45 minutes.
Any help is appreciated.
Thanks.
java apache-kafka apache-storm trident
What isUNCOMMITTED_EARLIEST
? Are you sure consumed offsets are being committed?
– cricket_007
Nov 20 '18 at 5:53
The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST --- this is describe of UNCOMMITTED_EARLIEST. And I'm sure it is committed, because when I restart the topology, it is just comsumer the new message
– ChenBo
Nov 20 '18 at 6:05
add a comment |
I use storm+kafka+protobuf to build my stream process system.
The problem is KafkaTridentSpoutOpaque repeatedly consumes the last message. I want just one consumer for every message in kafka.
Followings are some details:
Java Dependency
storm-kafka-client 1.2.2
storm-core 1.2.2
kafka_2.10 0.10.2.0
Component
kafka_2.12-2.0.0
apache-storm-1.2.2
Build KafkaTridentSpoutOpaque instance code
protected static KafkaSpoutConfig<String, byte> newKafkaSpoutConfig(String bootstrapServers, String topic) {
KafkaSpoutConfig.Builder<String, byte> builder = new KafkaSpoutConfig.Builder<>(bootstrapServers, topic);
return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, "stormKafkaSpoutGroup")
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
.setRecordTranslator(new JustValueFunc(), new Fields("str"))
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.setProcessingGuarantee(AT_MOST_ONCE)
.build();
}
private static KafkaTridentSpoutOpaque<String, byte> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte> spoutConfig) {
return new KafkaTridentSpoutOpaque<>(spoutConfig);
}
private static class JustValueFunc implements Func<ConsumerRecord<String, byte>, List<Object>>, Serializable {
@Override
public List<Object> apply(ConsumerRecord<String, byte> record) {
Values res = null;
try {
res = new Values(PbMiddlewareTransfer.Record.parseFrom(record.value()));
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return res;
}
}
Here is my topology code
public static void main(String args) throws Exception {
StormTopology topology = getTridentTopology();
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("storm-kafka-client-spout-test", conf, topology);
}
public static StormTopology getTridentTopology() {
final TridentTopology tridentTopology = new TridentTopology();
KafkaSpoutConfig<String, byte> spoutConfig = newKafkaSpoutConfig("192.168.0.202:9092", "test-2");
ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig);
final Stream spoutStream = tridentTopology.newStream("spout", spout).parallelismHint(1);
spoutStream.each(spoutStream.getOutputFields(), new Debug("##### fastest driver"));
return tridentTopology.build();
}
Output Log
./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:42 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:44 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:03 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:14 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:20 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:56 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:24:33 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:27:35 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:23 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:29:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:07 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:34:34 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:35:36 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:39:31 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:40:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:16 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:43:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:44:26 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:52:43 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:54:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:55:13 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:59:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
I just produce one message in kafka and it should be only one single output, but indeed there are many. And it will repeat about every 45 minutes.
Any help is appreciated.
Thanks.
java apache-kafka apache-storm trident
I use storm+kafka+protobuf to build my stream process system.
The problem is KafkaTridentSpoutOpaque repeatedly consumes the last message. I want just one consumer for every message in kafka.
Followings are some details:
Java Dependency
storm-kafka-client 1.2.2
storm-core 1.2.2
kafka_2.10 0.10.2.0
Component
kafka_2.12-2.0.0
apache-storm-1.2.2
Build KafkaTridentSpoutOpaque instance code
protected static KafkaSpoutConfig<String, byte> newKafkaSpoutConfig(String bootstrapServers, String topic) {
KafkaSpoutConfig.Builder<String, byte> builder = new KafkaSpoutConfig.Builder<>(bootstrapServers, topic);
return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, "stormKafkaSpoutGroup")
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
.setRecordTranslator(new JustValueFunc(), new Fields("str"))
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.setProcessingGuarantee(AT_MOST_ONCE)
.build();
}
private static KafkaTridentSpoutOpaque<String, byte> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte> spoutConfig) {
return new KafkaTridentSpoutOpaque<>(spoutConfig);
}
private static class JustValueFunc implements Func<ConsumerRecord<String, byte>, List<Object>>, Serializable {
@Override
public List<Object> apply(ConsumerRecord<String, byte> record) {
Values res = null;
try {
res = new Values(PbMiddlewareTransfer.Record.parseFrom(record.value()));
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return res;
}
}
Here is my topology code
public static void main(String args) throws Exception {
StormTopology topology = getTridentTopology();
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("storm-kafka-client-spout-test", conf, topology);
}
public static StormTopology getTridentTopology() {
final TridentTopology tridentTopology = new TridentTopology();
KafkaSpoutConfig<String, byte> spoutConfig = newKafkaSpoutConfig("192.168.0.202:9092", "test-2");
ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig);
final Stream spoutStream = tridentTopology.newStream("spout", spout).parallelismHint(1);
spoutStream.each(spoutStream.getOutputFields(), new Debug("##### fastest driver"));
return tridentTopology.build();
}
Output Log
./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:42 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:44 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:03 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:14 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:20 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:56 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:24:33 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:27:35 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:23 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:29:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:07 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:34:34 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:35:36 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:39:31 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:40:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:16 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:43:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:44:26 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:52:43 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:54:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:55:13 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:59:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
I just produce one message in kafka and it should be only one single output, but indeed there are many. And it will repeat about every 45 minutes.
Any help is appreciated.
Thanks.
java apache-kafka apache-storm trident
java apache-kafka apache-storm trident
edited Nov 20 '18 at 5:53
cricket_007
80.2k1142110
80.2k1142110
asked Nov 20 '18 at 3:56
ChenBoChenBo
12
12
What isUNCOMMITTED_EARLIEST
? Are you sure consumed offsets are being committed?
– cricket_007
Nov 20 '18 at 5:53
The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST --- this is describe of UNCOMMITTED_EARLIEST. And I'm sure it is committed, because when I restart the topology, it is just comsumer the new message
– ChenBo
Nov 20 '18 at 6:05
add a comment |
What isUNCOMMITTED_EARLIEST
? Are you sure consumed offsets are being committed?
– cricket_007
Nov 20 '18 at 5:53
The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST --- this is describe of UNCOMMITTED_EARLIEST. And I'm sure it is committed, because when I restart the topology, it is just comsumer the new message
– ChenBo
Nov 20 '18 at 6:05
What is
UNCOMMITTED_EARLIEST
? Are you sure consumed offsets are being committed?– cricket_007
Nov 20 '18 at 5:53
What is
UNCOMMITTED_EARLIEST
? Are you sure consumed offsets are being committed?– cricket_007
Nov 20 '18 at 5:53
The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST --- this is describe of UNCOMMITTED_EARLIEST. And I'm sure it is committed, because when I restart the topology, it is just comsumer the new message
– ChenBo
Nov 20 '18 at 6:05
The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST --- this is describe of UNCOMMITTED_EARLIEST. And I'm sure it is committed, because when I restart the topology, it is just comsumer the new message
– ChenBo
Nov 20 '18 at 6:05
add a comment |
1 Answer
1
active
oldest
votes
Setting max spout pending value very high causes this. Try setting with low
value say 1.
setMaxSpoutPending:that set the max number unfinish task can emit. there are some advice about how to set this option.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53385995%2fkafkatridentspoutopaque-repeated-consumption-the-last-message%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Setting max spout pending value very high causes this. Try setting with low
value say 1.
setMaxSpoutPending:that set the max number unfinish task can emit. there are some advice about how to set this option.
add a comment |
Setting max spout pending value very high causes this. Try setting with low
value say 1.
setMaxSpoutPending:that set the max number unfinish task can emit. there are some advice about how to set this option.
add a comment |
Setting max spout pending value very high causes this. Try setting with low
value say 1.
setMaxSpoutPending:that set the max number unfinish task can emit. there are some advice about how to set this option.
Setting max spout pending value very high causes this. Try setting with low
value say 1.
setMaxSpoutPending:that set the max number unfinish task can emit. there are some advice about how to set this option.
answered Nov 21 '18 at 6:24
ChenBoChenBo
12
12
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53385995%2fkafkatridentspoutopaque-repeated-consumption-the-last-message%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
What is
UNCOMMITTED_EARLIEST
? Are you sure consumed offsets are being committed?– cricket_007
Nov 20 '18 at 5:53
The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST --- this is describe of UNCOMMITTED_EARLIEST. And I'm sure it is committed, because when I restart the topology, it is just comsumer the new message
– ChenBo
Nov 20 '18 at 6:05