Flink Data Stream CSV Writer not writing data to CSV file





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







0















I am new to apache flink and trying to learn data streams. I am reading student data which has 3 columns(Name,Subject and Marks) from a csv file. I have applied filter on marks and only selecting those records where marks >40.
I am trying to write this data to csv file but program runs successfully and csv file remains empty. No data gets written to csv file.



I tried with different syntax for writing csv file but none of them worked for me. I am running this locally through eclipse. Write to text file works fine.



DataStream<String> text = env.readFile(format, params.get("input"), 
FileProcessingMode.PROCESS_CONTINUOUSLY,100);
DataStream<String> filtered = text.filter(new FilterFunction<String>(){
public boolean filter(String value) {
String tokens = value.split(",");
return Integer.parseInt(tokens[2]) >= 40;
}
});
filtered.writeAsText("testFilter",WriteMode.OVERWRITE);
DataStream<Tuple2<String, Integer>> tokenized = filtered
.map(new MapFunction<String, Tuple2<String, Integer>>(){
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2("Test", Integer.valueOf(1));
}
});
tokenized.print();
tokenized.writeAsCsv("file:///home/Test/Desktop/output.csv",
WriteMode.OVERWRITE, "/n", ",");
try {
env.execute();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}




Below is my input CSV format:



Name1,Subj1,30
Name1,Subj2,40
Name1,Subj3,40
Name1,Subj4,40




Tokenized.print() prints all correct records.










share|improve this question































    0















    I am new to apache flink and trying to learn data streams. I am reading student data which has 3 columns(Name,Subject and Marks) from a csv file. I have applied filter on marks and only selecting those records where marks >40.
    I am trying to write this data to csv file but program runs successfully and csv file remains empty. No data gets written to csv file.



    I tried with different syntax for writing csv file but none of them worked for me. I am running this locally through eclipse. Write to text file works fine.



    DataStream<String> text = env.readFile(format, params.get("input"), 
    FileProcessingMode.PROCESS_CONTINUOUSLY,100);
    DataStream<String> filtered = text.filter(new FilterFunction<String>(){
    public boolean filter(String value) {
    String tokens = value.split(",");
    return Integer.parseInt(tokens[2]) >= 40;
    }
    });
    filtered.writeAsText("testFilter",WriteMode.OVERWRITE);
    DataStream<Tuple2<String, Integer>> tokenized = filtered
    .map(new MapFunction<String, Tuple2<String, Integer>>(){
    public Tuple2<String, Integer> map(String value) throws Exception {
    return new Tuple2("Test", Integer.valueOf(1));
    }
    });
    tokenized.print();
    tokenized.writeAsCsv("file:///home/Test/Desktop/output.csv",
    WriteMode.OVERWRITE, "/n", ",");
    try {
    env.execute();
    } catch (Exception e1) {
    e1.printStackTrace();
    }
    }
    }




    Below is my input CSV format:



    Name1,Subj1,30
    Name1,Subj2,40
    Name1,Subj3,40
    Name1,Subj4,40




    Tokenized.print() prints all correct records.










    share|improve this question



























      0












      0








      0








      I am new to apache flink and trying to learn data streams. I am reading student data which has 3 columns(Name,Subject and Marks) from a csv file. I have applied filter on marks and only selecting those records where marks >40.
      I am trying to write this data to csv file but program runs successfully and csv file remains empty. No data gets written to csv file.



      I tried with different syntax for writing csv file but none of them worked for me. I am running this locally through eclipse. Write to text file works fine.



      DataStream<String> text = env.readFile(format, params.get("input"), 
      FileProcessingMode.PROCESS_CONTINUOUSLY,100);
      DataStream<String> filtered = text.filter(new FilterFunction<String>(){
      public boolean filter(String value) {
      String tokens = value.split(",");
      return Integer.parseInt(tokens[2]) >= 40;
      }
      });
      filtered.writeAsText("testFilter",WriteMode.OVERWRITE);
      DataStream<Tuple2<String, Integer>> tokenized = filtered
      .map(new MapFunction<String, Tuple2<String, Integer>>(){
      public Tuple2<String, Integer> map(String value) throws Exception {
      return new Tuple2("Test", Integer.valueOf(1));
      }
      });
      tokenized.print();
      tokenized.writeAsCsv("file:///home/Test/Desktop/output.csv",
      WriteMode.OVERWRITE, "/n", ",");
      try {
      env.execute();
      } catch (Exception e1) {
      e1.printStackTrace();
      }
      }
      }




      Below is my input CSV format:



      Name1,Subj1,30
      Name1,Subj2,40
      Name1,Subj3,40
      Name1,Subj4,40




      Tokenized.print() prints all correct records.










      share|improve this question
















      I am new to apache flink and trying to learn data streams. I am reading student data which has 3 columns(Name,Subject and Marks) from a csv file. I have applied filter on marks and only selecting those records where marks >40.
      I am trying to write this data to csv file but program runs successfully and csv file remains empty. No data gets written to csv file.



      I tried with different syntax for writing csv file but none of them worked for me. I am running this locally through eclipse. Write to text file works fine.



      DataStream<String> text = env.readFile(format, params.get("input"), 
      FileProcessingMode.PROCESS_CONTINUOUSLY,100);
      DataStream<String> filtered = text.filter(new FilterFunction<String>(){
      public boolean filter(String value) {
      String tokens = value.split(",");
      return Integer.parseInt(tokens[2]) >= 40;
      }
      });
      filtered.writeAsText("testFilter",WriteMode.OVERWRITE);
      DataStream<Tuple2<String, Integer>> tokenized = filtered
      .map(new MapFunction<String, Tuple2<String, Integer>>(){
      public Tuple2<String, Integer> map(String value) throws Exception {
      return new Tuple2("Test", Integer.valueOf(1));
      }
      });
      tokenized.print();
      tokenized.writeAsCsv("file:///home/Test/Desktop/output.csv",
      WriteMode.OVERWRITE, "/n", ",");
      try {
      env.execute();
      } catch (Exception e1) {
      e1.printStackTrace();
      }
      }
      }




      Below is my input CSV format:



      Name1,Subj1,30
      Name1,Subj2,40
      Name1,Subj3,40
      Name1,Subj4,40




      Tokenized.print() prints all correct records.







      csv apache-flink






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 3 at 7:36









      Stedy

      5,347144969




      5,347144969










      asked Jan 3 at 7:16









      Omkar SabaneOmkar Sabane

      1




      1
























          2 Answers
          2






          active

          oldest

          votes


















          0














          I did a little experimenting, and found that this job works just fine:



          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.core.fs.FileSystem;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

          public class WriteCSV {
          public static void main(String args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

          env.setParallelism(1);

          env.fromElements(new Tuple2<>("abc", 1), new Tuple2<>("def", 2))
          .writeAsCsv("file:///tmp/test.csv", FileSystem.WriteMode.OVERWRITE, "n", ",");

          env.execute();
          }
          }


          If I don't set the parallelism to 1, then the results are different. In that case, test.csv is a directory containing four files, each written by one of the four parallel subtasks.



          I'm not sure what's wrong in your case, but maybe you can work backwards from this example (assuming it works for you).






          share|improve this answer































            0














            You should remove tokenized.print(); before tokenized.writeAsCsv();.



            It will consume the data the print();.






            share|improve this answer


























              Your Answer






              StackExchange.ifUsing("editor", function () {
              StackExchange.using("externalEditor", function () {
              StackExchange.using("snippets", function () {
              StackExchange.snippets.init();
              });
              });
              }, "code-snippets");

              StackExchange.ready(function() {
              var channelOptions = {
              tags: "".split(" "),
              id: "1"
              };
              initTagRenderer("".split(" "), "".split(" "), channelOptions);

              StackExchange.using("externalEditor", function() {
              // Have to fire editor after snippets, if snippets enabled
              if (StackExchange.settings.snippets.snippetsEnabled) {
              StackExchange.using("snippets", function() {
              createEditor();
              });
              }
              else {
              createEditor();
              }
              });

              function createEditor() {
              StackExchange.prepareEditor({
              heartbeatType: 'answer',
              autoActivateHeartbeat: false,
              convertImagesToLinks: true,
              noModals: true,
              showLowRepImageUploadWarning: true,
              reputationToPostImages: 10,
              bindNavPrevention: true,
              postfix: "",
              imageUploader: {
              brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
              contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
              allowUrls: true
              },
              onDemand: true,
              discardSelector: ".discard-answer"
              ,immediatelyShowMarkdownHelp:true
              });


              }
              });














              draft saved

              draft discarded


















              StackExchange.ready(
              function () {
              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54017890%2fflink-data-stream-csv-writer-not-writing-data-to-csv-file%23new-answer', 'question_page');
              }
              );

              Post as a guest















              Required, but never shown

























              2 Answers
              2






              active

              oldest

              votes








              2 Answers
              2






              active

              oldest

              votes









              active

              oldest

              votes






              active

              oldest

              votes









              0














              I did a little experimenting, and found that this job works just fine:



              import org.apache.flink.api.java.tuple.Tuple2;
              import org.apache.flink.core.fs.FileSystem;
              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

              public class WriteCSV {
              public static void main(String args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

              env.setParallelism(1);

              env.fromElements(new Tuple2<>("abc", 1), new Tuple2<>("def", 2))
              .writeAsCsv("file:///tmp/test.csv", FileSystem.WriteMode.OVERWRITE, "n", ",");

              env.execute();
              }
              }


              If I don't set the parallelism to 1, then the results are different. In that case, test.csv is a directory containing four files, each written by one of the four parallel subtasks.



              I'm not sure what's wrong in your case, but maybe you can work backwards from this example (assuming it works for you).






              share|improve this answer




























                0














                I did a little experimenting, and found that this job works just fine:



                import org.apache.flink.api.java.tuple.Tuple2;
                import org.apache.flink.core.fs.FileSystem;
                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

                public class WriteCSV {
                public static void main(String args) throws Exception {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                env.setParallelism(1);

                env.fromElements(new Tuple2<>("abc", 1), new Tuple2<>("def", 2))
                .writeAsCsv("file:///tmp/test.csv", FileSystem.WriteMode.OVERWRITE, "n", ",");

                env.execute();
                }
                }


                If I don't set the parallelism to 1, then the results are different. In that case, test.csv is a directory containing four files, each written by one of the four parallel subtasks.



                I'm not sure what's wrong in your case, but maybe you can work backwards from this example (assuming it works for you).






                share|improve this answer


























                  0












                  0








                  0







                  I did a little experimenting, and found that this job works just fine:



                  import org.apache.flink.api.java.tuple.Tuple2;
                  import org.apache.flink.core.fs.FileSystem;
                  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

                  public class WriteCSV {
                  public static void main(String args) throws Exception {
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                  env.setParallelism(1);

                  env.fromElements(new Tuple2<>("abc", 1), new Tuple2<>("def", 2))
                  .writeAsCsv("file:///tmp/test.csv", FileSystem.WriteMode.OVERWRITE, "n", ",");

                  env.execute();
                  }
                  }


                  If I don't set the parallelism to 1, then the results are different. In that case, test.csv is a directory containing four files, each written by one of the four parallel subtasks.



                  I'm not sure what's wrong in your case, but maybe you can work backwards from this example (assuming it works for you).






                  share|improve this answer













                  I did a little experimenting, and found that this job works just fine:



                  import org.apache.flink.api.java.tuple.Tuple2;
                  import org.apache.flink.core.fs.FileSystem;
                  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

                  public class WriteCSV {
                  public static void main(String args) throws Exception {
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                  env.setParallelism(1);

                  env.fromElements(new Tuple2<>("abc", 1), new Tuple2<>("def", 2))
                  .writeAsCsv("file:///tmp/test.csv", FileSystem.WriteMode.OVERWRITE, "n", ",");

                  env.execute();
                  }
                  }


                  If I don't set the parallelism to 1, then the results are different. In that case, test.csv is a directory containing four files, each written by one of the four parallel subtasks.



                  I'm not sure what's wrong in your case, but maybe you can work backwards from this example (assuming it works for you).







                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Jan 3 at 8:13









                  David AndersonDavid Anderson

                  6,71821424




                  6,71821424

























                      0














                      You should remove tokenized.print(); before tokenized.writeAsCsv();.



                      It will consume the data the print();.






                      share|improve this answer






























                        0














                        You should remove tokenized.print(); before tokenized.writeAsCsv();.



                        It will consume the data the print();.






                        share|improve this answer




























                          0












                          0








                          0







                          You should remove tokenized.print(); before tokenized.writeAsCsv();.



                          It will consume the data the print();.






                          share|improve this answer















                          You should remove tokenized.print(); before tokenized.writeAsCsv();.



                          It will consume the data the print();.







                          share|improve this answer














                          share|improve this answer



                          share|improve this answer








                          edited Feb 20 at 18:08









                          double-beep

                          3,10641432




                          3,10641432










                          answered Feb 20 at 17:43









                          Aakarsh GuptaAakarsh Gupta

                          415




                          415






























                              draft saved

                              draft discarded




















































                              Thanks for contributing an answer to Stack Overflow!


                              • Please be sure to answer the question. Provide details and share your research!

                              But avoid



                              • Asking for help, clarification, or responding to other answers.

                              • Making statements based on opinion; back them up with references or personal experience.


                              To learn more, see our tips on writing great answers.




                              draft saved


                              draft discarded














                              StackExchange.ready(
                              function () {
                              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54017890%2fflink-data-stream-csv-writer-not-writing-data-to-csv-file%23new-answer', 'question_page');
                              }
                              );

                              Post as a guest















                              Required, but never shown





















































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown

































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown







                              Popular posts from this blog

                              MongoDB - Not Authorized To Execute Command

                              How to fix TextFormField cause rebuild widget in Flutter

                              Npm cannot find a required file even through it is in the searched directory