Distributing data socket among kafka cluster nodes












0















I want to get data from socket and put it to kafka topic that my flink program can read data from topic and process it. I can do that on one node. But I want to have a kafka cluster with at least three different nodes(different IP address) and poll data from socket to distribute it among nodes.I do not know how to do this and change this code. My simple program is in following:



public class WordCount {

public static void main(String args) throws Exception {

kafka_test objKafka=new kafka_test();
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
int myport = 9999;
String hostname = "localhost";
// set up the execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();


// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);

DataStream<String> stream = env.socketTextStream(hostname,myport);

stream.addSink(objKafka.createStringProducer("testFlink",
"localhost:9092"));

DataStream<String> text =
env.addSource(objKafka.createStringConsumerForTopic("testFlink",
"localhost:9092", "test"));
DataStream<Tuple2<String, Long>> counts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out)
{
// normalize and split the line
String words = value.toLowerCase().split("\W+");

// emit the pairs
for (String word : words) {
if (!word.isEmpty()) {
out.collect(new Tuple2<String, Long>(word, 1L));
}
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output
to specify output path.");
counts.print();
}
// execute program
env.execute("Streaming WordCount");

}//main
}

public class kafka_test {
public FlinkKafkaConsumer<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup) {
// ************************** KAFKA Properties ******
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
topic, new SimpleStringSchema(), props);
myconsumer.setStartFromLatest();

return myconsumer;
}

public FlinkKafkaProducer<String> createStringProducer(
String topic, String kafkaAddress) {

return new FlinkKafkaProducer<>(kafkaAddress,
topic, new SimpleStringSchema());
}
}


Would you please guide me how to broadcast a socket stream data between different kafka nodes?



Any help would be appreciated.










share|improve this question





























    0















    I want to get data from socket and put it to kafka topic that my flink program can read data from topic and process it. I can do that on one node. But I want to have a kafka cluster with at least three different nodes(different IP address) and poll data from socket to distribute it among nodes.I do not know how to do this and change this code. My simple program is in following:



    public class WordCount {

    public static void main(String args) throws Exception {

    kafka_test objKafka=new kafka_test();
    // Checking input parameters
    final ParameterTool params = ParameterTool.fromArgs(args);
    int myport = 9999;
    String hostname = "localhost";
    // set up the execution environment
    final StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();


    // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    DataStream<String> stream = env.socketTextStream(hostname,myport);

    stream.addSink(objKafka.createStringProducer("testFlink",
    "localhost:9092"));

    DataStream<String> text =
    env.addSource(objKafka.createStringConsumerForTopic("testFlink",
    "localhost:9092", "test"));
    DataStream<Tuple2<String, Long>> counts = text
    .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Long>> out)
    {
    // normalize and split the line
    String words = value.toLowerCase().split("\W+");

    // emit the pairs
    for (String word : words) {
    if (!word.isEmpty()) {
    out.collect(new Tuple2<String, Long>(word, 1L));
    }
    }
    }
    })
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1);
    // emit result
    if (params.has("output")) {
    counts.writeAsText(params.get("output"));
    } else {
    System.out.println("Printing result to stdout. Use --output
    to specify output path.");
    counts.print();
    }
    // execute program
    env.execute("Streaming WordCount");

    }//main
    }

    public class kafka_test {
    public FlinkKafkaConsumer<String> createStringConsumerForTopic(
    String topic, String kafkaAddress, String kafkaGroup) {
    // ************************** KAFKA Properties ******
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id", kafkaGroup);
    FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
    topic, new SimpleStringSchema(), props);
    myconsumer.setStartFromLatest();

    return myconsumer;
    }

    public FlinkKafkaProducer<String> createStringProducer(
    String topic, String kafkaAddress) {

    return new FlinkKafkaProducer<>(kafkaAddress,
    topic, new SimpleStringSchema());
    }
    }


    Would you please guide me how to broadcast a socket stream data between different kafka nodes?



    Any help would be appreciated.










    share|improve this question



























      0












      0








      0








      I want to get data from socket and put it to kafka topic that my flink program can read data from topic and process it. I can do that on one node. But I want to have a kafka cluster with at least three different nodes(different IP address) and poll data from socket to distribute it among nodes.I do not know how to do this and change this code. My simple program is in following:



      public class WordCount {

      public static void main(String args) throws Exception {

      kafka_test objKafka=new kafka_test();
      // Checking input parameters
      final ParameterTool params = ParameterTool.fromArgs(args);
      int myport = 9999;
      String hostname = "localhost";
      // set up the execution environment
      final StreamExecutionEnvironment env =
      StreamExecutionEnvironment.getExecutionEnvironment();


      // make parameters available in the web interface
      env.getConfig().setGlobalJobParameters(params);

      DataStream<String> stream = env.socketTextStream(hostname,myport);

      stream.addSink(objKafka.createStringProducer("testFlink",
      "localhost:9092"));

      DataStream<String> text =
      env.addSource(objKafka.createStringConsumerForTopic("testFlink",
      "localhost:9092", "test"));
      DataStream<Tuple2<String, Long>> counts = text
      .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
      @Override
      public void flatMap(String value, Collector<Tuple2<String, Long>> out)
      {
      // normalize and split the line
      String words = value.toLowerCase().split("\W+");

      // emit the pairs
      for (String word : words) {
      if (!word.isEmpty()) {
      out.collect(new Tuple2<String, Long>(word, 1L));
      }
      }
      }
      })
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1);
      // emit result
      if (params.has("output")) {
      counts.writeAsText(params.get("output"));
      } else {
      System.out.println("Printing result to stdout. Use --output
      to specify output path.");
      counts.print();
      }
      // execute program
      env.execute("Streaming WordCount");

      }//main
      }

      public class kafka_test {
      public FlinkKafkaConsumer<String> createStringConsumerForTopic(
      String topic, String kafkaAddress, String kafkaGroup) {
      // ************************** KAFKA Properties ******
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", kafkaAddress);
      props.setProperty("group.id", kafkaGroup);
      FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
      topic, new SimpleStringSchema(), props);
      myconsumer.setStartFromLatest();

      return myconsumer;
      }

      public FlinkKafkaProducer<String> createStringProducer(
      String topic, String kafkaAddress) {

      return new FlinkKafkaProducer<>(kafkaAddress,
      topic, new SimpleStringSchema());
      }
      }


      Would you please guide me how to broadcast a socket stream data between different kafka nodes?



      Any help would be appreciated.










      share|improve this question
















      I want to get data from socket and put it to kafka topic that my flink program can read data from topic and process it. I can do that on one node. But I want to have a kafka cluster with at least three different nodes(different IP address) and poll data from socket to distribute it among nodes.I do not know how to do this and change this code. My simple program is in following:



      public class WordCount {

      public static void main(String args) throws Exception {

      kafka_test objKafka=new kafka_test();
      // Checking input parameters
      final ParameterTool params = ParameterTool.fromArgs(args);
      int myport = 9999;
      String hostname = "localhost";
      // set up the execution environment
      final StreamExecutionEnvironment env =
      StreamExecutionEnvironment.getExecutionEnvironment();


      // make parameters available in the web interface
      env.getConfig().setGlobalJobParameters(params);

      DataStream<String> stream = env.socketTextStream(hostname,myport);

      stream.addSink(objKafka.createStringProducer("testFlink",
      "localhost:9092"));

      DataStream<String> text =
      env.addSource(objKafka.createStringConsumerForTopic("testFlink",
      "localhost:9092", "test"));
      DataStream<Tuple2<String, Long>> counts = text
      .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
      @Override
      public void flatMap(String value, Collector<Tuple2<String, Long>> out)
      {
      // normalize and split the line
      String words = value.toLowerCase().split("\W+");

      // emit the pairs
      for (String word : words) {
      if (!word.isEmpty()) {
      out.collect(new Tuple2<String, Long>(word, 1L));
      }
      }
      }
      })
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1);
      // emit result
      if (params.has("output")) {
      counts.writeAsText(params.get("output"));
      } else {
      System.out.println("Printing result to stdout. Use --output
      to specify output path.");
      counts.print();
      }
      // execute program
      env.execute("Streaming WordCount");

      }//main
      }

      public class kafka_test {
      public FlinkKafkaConsumer<String> createStringConsumerForTopic(
      String topic, String kafkaAddress, String kafkaGroup) {
      // ************************** KAFKA Properties ******
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", kafkaAddress);
      props.setProperty("group.id", kafkaGroup);
      FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
      topic, new SimpleStringSchema(), props);
      myconsumer.setStartFromLatest();

      return myconsumer;
      }

      public FlinkKafkaProducer<String> createStringProducer(
      String topic, String kafkaAddress) {

      return new FlinkKafkaProducer<>(kafkaAddress,
      topic, new SimpleStringSchema());
      }
      }


      Would you please guide me how to broadcast a socket stream data between different kafka nodes?



      Any help would be appreciated.







      apache-kafka kafka-consumer-api flink-streaming kafka-producer-api






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 7 at 7:08







      M_Gh

















      asked Jan 2 at 12:03









      M_GhM_Gh

      83110




      83110
























          1 Answer
          1






          active

          oldest

          votes


















          1














          I think your code is correct. Kafka will take care of the "distribution" of the data. How data will be distributed among Kafka brokers will depend on the topic configuration.



          Check the answer here to better understand Kafka topics and partitions.



          Lets say you have 3 Kafka brokers. Then if you create your topic with 3 replicas and 3 partitions



          > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic


          This will cause that your topic will have 3 partitions and each partition will be stored 3 times in your cluster. With 3 brokers you will get stored 1 partition and 2 replicas on each broker.



          Then you just have to create your Kafka Sink



          FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
          "broker1:9092,broker2:9092,broker3:9092",
          "my-topic",
          new SimpleStringSchema());

          stream.addSink(myProducer);





          share|improve this answer
























          • Dear @belbo thanks for your answer. But suppose I have three brokers with different IP address. So, how data distributed among brokers? Would you please tell me if I have to use your code (which is above) in all brokers?

            – M_Gh
            Jan 3 at 5:00






          • 1





            Not in the broker. You have two clusters. Kafka cluster - this will consist of multiple brokers, each with different IP. Then Flink cluster - consisting of multiple nodes/servers with different IPs. Code I pasted above is for your Flink job wich will run on Flink cluster. This job will create Kafka producer with IPs of Kafka brokers (broker1:9092,broker2:9092,broker3:9092) and will distribute the data among these Kafka brokers.

            – belo
            Jan 3 at 8:24













          • Dear @belbo, Sorry to bother you. In fact, I pump my data from a CSV file to the socket 9999 with this command "cat file.csv | nc -lk 9999". If I send data to the port 9999 of all brokers IP, it is possible that each broker get the same data. how can I distribute all data to three broker? Would you please tell me if Flink master node does that for me? thanks a gain.

            – M_Gh
            Jan 3 at 9:21











          • You want to distribute data among Flink nodes, not Kafka brokers. I'm bit confused, because nc -lk means you will listening for data, not sending it. Anyway to achieve what you need I guess, you'd need to put some kind of load balancer between netcat and Flink nodes which will take care of distributing data among Flink instances or write a shell script to take care of sending different lines of file to different Flink nodes. But for the production use I'd suggest to use something more robust - maybe Apache NiFi or something.

            – belo
            Jan 3 at 9:55






          • 1





            You don't have to do anything to distribute data among Kafka brokers. Kafka producer, which is part of your Flink job will take care of it. And how data will be distributed depends on the number of partitions of the topic you will be writing to.

            – belo
            Jan 3 at 11:29











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54006023%2fdistributing-data-socket-among-kafka-cluster-nodes%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          I think your code is correct. Kafka will take care of the "distribution" of the data. How data will be distributed among Kafka brokers will depend on the topic configuration.



          Check the answer here to better understand Kafka topics and partitions.



          Lets say you have 3 Kafka brokers. Then if you create your topic with 3 replicas and 3 partitions



          > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic


          This will cause that your topic will have 3 partitions and each partition will be stored 3 times in your cluster. With 3 brokers you will get stored 1 partition and 2 replicas on each broker.



          Then you just have to create your Kafka Sink



          FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
          "broker1:9092,broker2:9092,broker3:9092",
          "my-topic",
          new SimpleStringSchema());

          stream.addSink(myProducer);





          share|improve this answer
























          • Dear @belbo thanks for your answer. But suppose I have three brokers with different IP address. So, how data distributed among brokers? Would you please tell me if I have to use your code (which is above) in all brokers?

            – M_Gh
            Jan 3 at 5:00






          • 1





            Not in the broker. You have two clusters. Kafka cluster - this will consist of multiple brokers, each with different IP. Then Flink cluster - consisting of multiple nodes/servers with different IPs. Code I pasted above is for your Flink job wich will run on Flink cluster. This job will create Kafka producer with IPs of Kafka brokers (broker1:9092,broker2:9092,broker3:9092) and will distribute the data among these Kafka brokers.

            – belo
            Jan 3 at 8:24













          • Dear @belbo, Sorry to bother you. In fact, I pump my data from a CSV file to the socket 9999 with this command "cat file.csv | nc -lk 9999". If I send data to the port 9999 of all brokers IP, it is possible that each broker get the same data. how can I distribute all data to three broker? Would you please tell me if Flink master node does that for me? thanks a gain.

            – M_Gh
            Jan 3 at 9:21











          • You want to distribute data among Flink nodes, not Kafka brokers. I'm bit confused, because nc -lk means you will listening for data, not sending it. Anyway to achieve what you need I guess, you'd need to put some kind of load balancer between netcat and Flink nodes which will take care of distributing data among Flink instances or write a shell script to take care of sending different lines of file to different Flink nodes. But for the production use I'd suggest to use something more robust - maybe Apache NiFi or something.

            – belo
            Jan 3 at 9:55






          • 1





            You don't have to do anything to distribute data among Kafka brokers. Kafka producer, which is part of your Flink job will take care of it. And how data will be distributed depends on the number of partitions of the topic you will be writing to.

            – belo
            Jan 3 at 11:29
















          1














          I think your code is correct. Kafka will take care of the "distribution" of the data. How data will be distributed among Kafka brokers will depend on the topic configuration.



          Check the answer here to better understand Kafka topics and partitions.



          Lets say you have 3 Kafka brokers. Then if you create your topic with 3 replicas and 3 partitions



          > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic


          This will cause that your topic will have 3 partitions and each partition will be stored 3 times in your cluster. With 3 brokers you will get stored 1 partition and 2 replicas on each broker.



          Then you just have to create your Kafka Sink



          FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
          "broker1:9092,broker2:9092,broker3:9092",
          "my-topic",
          new SimpleStringSchema());

          stream.addSink(myProducer);





          share|improve this answer
























          • Dear @belbo thanks for your answer. But suppose I have three brokers with different IP address. So, how data distributed among brokers? Would you please tell me if I have to use your code (which is above) in all brokers?

            – M_Gh
            Jan 3 at 5:00






          • 1





            Not in the broker. You have two clusters. Kafka cluster - this will consist of multiple brokers, each with different IP. Then Flink cluster - consisting of multiple nodes/servers with different IPs. Code I pasted above is for your Flink job wich will run on Flink cluster. This job will create Kafka producer with IPs of Kafka brokers (broker1:9092,broker2:9092,broker3:9092) and will distribute the data among these Kafka brokers.

            – belo
            Jan 3 at 8:24













          • Dear @belbo, Sorry to bother you. In fact, I pump my data from a CSV file to the socket 9999 with this command "cat file.csv | nc -lk 9999". If I send data to the port 9999 of all brokers IP, it is possible that each broker get the same data. how can I distribute all data to three broker? Would you please tell me if Flink master node does that for me? thanks a gain.

            – M_Gh
            Jan 3 at 9:21











          • You want to distribute data among Flink nodes, not Kafka brokers. I'm bit confused, because nc -lk means you will listening for data, not sending it. Anyway to achieve what you need I guess, you'd need to put some kind of load balancer between netcat and Flink nodes which will take care of distributing data among Flink instances or write a shell script to take care of sending different lines of file to different Flink nodes. But for the production use I'd suggest to use something more robust - maybe Apache NiFi or something.

            – belo
            Jan 3 at 9:55






          • 1





            You don't have to do anything to distribute data among Kafka brokers. Kafka producer, which is part of your Flink job will take care of it. And how data will be distributed depends on the number of partitions of the topic you will be writing to.

            – belo
            Jan 3 at 11:29














          1












          1








          1







          I think your code is correct. Kafka will take care of the "distribution" of the data. How data will be distributed among Kafka brokers will depend on the topic configuration.



          Check the answer here to better understand Kafka topics and partitions.



          Lets say you have 3 Kafka brokers. Then if you create your topic with 3 replicas and 3 partitions



          > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic


          This will cause that your topic will have 3 partitions and each partition will be stored 3 times in your cluster. With 3 brokers you will get stored 1 partition and 2 replicas on each broker.



          Then you just have to create your Kafka Sink



          FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
          "broker1:9092,broker2:9092,broker3:9092",
          "my-topic",
          new SimpleStringSchema());

          stream.addSink(myProducer);





          share|improve this answer













          I think your code is correct. Kafka will take care of the "distribution" of the data. How data will be distributed among Kafka brokers will depend on the topic configuration.



          Check the answer here to better understand Kafka topics and partitions.



          Lets say you have 3 Kafka brokers. Then if you create your topic with 3 replicas and 3 partitions



          > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic


          This will cause that your topic will have 3 partitions and each partition will be stored 3 times in your cluster. With 3 brokers you will get stored 1 partition and 2 replicas on each broker.



          Then you just have to create your Kafka Sink



          FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
          "broker1:9092,broker2:9092,broker3:9092",
          "my-topic",
          new SimpleStringSchema());

          stream.addSink(myProducer);






          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Jan 2 at 15:23









          belobelo

          587




          587













          • Dear @belbo thanks for your answer. But suppose I have three brokers with different IP address. So, how data distributed among brokers? Would you please tell me if I have to use your code (which is above) in all brokers?

            – M_Gh
            Jan 3 at 5:00






          • 1





            Not in the broker. You have two clusters. Kafka cluster - this will consist of multiple brokers, each with different IP. Then Flink cluster - consisting of multiple nodes/servers with different IPs. Code I pasted above is for your Flink job wich will run on Flink cluster. This job will create Kafka producer with IPs of Kafka brokers (broker1:9092,broker2:9092,broker3:9092) and will distribute the data among these Kafka brokers.

            – belo
            Jan 3 at 8:24













          • Dear @belbo, Sorry to bother you. In fact, I pump my data from a CSV file to the socket 9999 with this command "cat file.csv | nc -lk 9999". If I send data to the port 9999 of all brokers IP, it is possible that each broker get the same data. how can I distribute all data to three broker? Would you please tell me if Flink master node does that for me? thanks a gain.

            – M_Gh
            Jan 3 at 9:21











          • You want to distribute data among Flink nodes, not Kafka brokers. I'm bit confused, because nc -lk means you will listening for data, not sending it. Anyway to achieve what you need I guess, you'd need to put some kind of load balancer between netcat and Flink nodes which will take care of distributing data among Flink instances or write a shell script to take care of sending different lines of file to different Flink nodes. But for the production use I'd suggest to use something more robust - maybe Apache NiFi or something.

            – belo
            Jan 3 at 9:55






          • 1





            You don't have to do anything to distribute data among Kafka brokers. Kafka producer, which is part of your Flink job will take care of it. And how data will be distributed depends on the number of partitions of the topic you will be writing to.

            – belo
            Jan 3 at 11:29



















          • Dear @belbo thanks for your answer. But suppose I have three brokers with different IP address. So, how data distributed among brokers? Would you please tell me if I have to use your code (which is above) in all brokers?

            – M_Gh
            Jan 3 at 5:00






          • 1





            Not in the broker. You have two clusters. Kafka cluster - this will consist of multiple brokers, each with different IP. Then Flink cluster - consisting of multiple nodes/servers with different IPs. Code I pasted above is for your Flink job wich will run on Flink cluster. This job will create Kafka producer with IPs of Kafka brokers (broker1:9092,broker2:9092,broker3:9092) and will distribute the data among these Kafka brokers.

            – belo
            Jan 3 at 8:24













          • Dear @belbo, Sorry to bother you. In fact, I pump my data from a CSV file to the socket 9999 with this command "cat file.csv | nc -lk 9999". If I send data to the port 9999 of all brokers IP, it is possible that each broker get the same data. how can I distribute all data to three broker? Would you please tell me if Flink master node does that for me? thanks a gain.

            – M_Gh
            Jan 3 at 9:21











          • You want to distribute data among Flink nodes, not Kafka brokers. I'm bit confused, because nc -lk means you will listening for data, not sending it. Anyway to achieve what you need I guess, you'd need to put some kind of load balancer between netcat and Flink nodes which will take care of distributing data among Flink instances or write a shell script to take care of sending different lines of file to different Flink nodes. But for the production use I'd suggest to use something more robust - maybe Apache NiFi or something.

            – belo
            Jan 3 at 9:55






          • 1





            You don't have to do anything to distribute data among Kafka brokers. Kafka producer, which is part of your Flink job will take care of it. And how data will be distributed depends on the number of partitions of the topic you will be writing to.

            – belo
            Jan 3 at 11:29

















          Dear @belbo thanks for your answer. But suppose I have three brokers with different IP address. So, how data distributed among brokers? Would you please tell me if I have to use your code (which is above) in all brokers?

          – M_Gh
          Jan 3 at 5:00





          Dear @belbo thanks for your answer. But suppose I have three brokers with different IP address. So, how data distributed among brokers? Would you please tell me if I have to use your code (which is above) in all brokers?

          – M_Gh
          Jan 3 at 5:00




          1




          1





          Not in the broker. You have two clusters. Kafka cluster - this will consist of multiple brokers, each with different IP. Then Flink cluster - consisting of multiple nodes/servers with different IPs. Code I pasted above is for your Flink job wich will run on Flink cluster. This job will create Kafka producer with IPs of Kafka brokers (broker1:9092,broker2:9092,broker3:9092) and will distribute the data among these Kafka brokers.

          – belo
          Jan 3 at 8:24







          Not in the broker. You have two clusters. Kafka cluster - this will consist of multiple brokers, each with different IP. Then Flink cluster - consisting of multiple nodes/servers with different IPs. Code I pasted above is for your Flink job wich will run on Flink cluster. This job will create Kafka producer with IPs of Kafka brokers (broker1:9092,broker2:9092,broker3:9092) and will distribute the data among these Kafka brokers.

          – belo
          Jan 3 at 8:24















          Dear @belbo, Sorry to bother you. In fact, I pump my data from a CSV file to the socket 9999 with this command "cat file.csv | nc -lk 9999". If I send data to the port 9999 of all brokers IP, it is possible that each broker get the same data. how can I distribute all data to three broker? Would you please tell me if Flink master node does that for me? thanks a gain.

          – M_Gh
          Jan 3 at 9:21





          Dear @belbo, Sorry to bother you. In fact, I pump my data from a CSV file to the socket 9999 with this command "cat file.csv | nc -lk 9999". If I send data to the port 9999 of all brokers IP, it is possible that each broker get the same data. how can I distribute all data to three broker? Would you please tell me if Flink master node does that for me? thanks a gain.

          – M_Gh
          Jan 3 at 9:21













          You want to distribute data among Flink nodes, not Kafka brokers. I'm bit confused, because nc -lk means you will listening for data, not sending it. Anyway to achieve what you need I guess, you'd need to put some kind of load balancer between netcat and Flink nodes which will take care of distributing data among Flink instances or write a shell script to take care of sending different lines of file to different Flink nodes. But for the production use I'd suggest to use something more robust - maybe Apache NiFi or something.

          – belo
          Jan 3 at 9:55





          You want to distribute data among Flink nodes, not Kafka brokers. I'm bit confused, because nc -lk means you will listening for data, not sending it. Anyway to achieve what you need I guess, you'd need to put some kind of load balancer between netcat and Flink nodes which will take care of distributing data among Flink instances or write a shell script to take care of sending different lines of file to different Flink nodes. But for the production use I'd suggest to use something more robust - maybe Apache NiFi or something.

          – belo
          Jan 3 at 9:55




          1




          1





          You don't have to do anything to distribute data among Kafka brokers. Kafka producer, which is part of your Flink job will take care of it. And how data will be distributed depends on the number of partitions of the topic you will be writing to.

          – belo
          Jan 3 at 11:29





          You don't have to do anything to distribute data among Kafka brokers. Kafka producer, which is part of your Flink job will take care of it. And how data will be distributed depends on the number of partitions of the topic you will be writing to.

          – belo
          Jan 3 at 11:29




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54006023%2fdistributing-data-socket-among-kafka-cluster-nodes%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          MongoDB - Not Authorized To Execute Command

          in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith

          Npm cannot find a required file even through it is in the searched directory