Understnding kafka stream groupBy and window












0















I am not able to understand the concept of groupBy/groupById and windowing in kafka streaming. My goal is to aggregate stream data over some time period (e.g. 5 seconds). My streaming data looks something like:



{"value":0,"time":1533875665509}
{"value":10,"time":1533875667511}
{"value":8,"time":1533875669512}


The time is in milliseconds (epoch). Here my timestamp is in my message and not in key. And I want to average the value of 5 seconds window.



Here is code that I am trying but it seems I am unable to get it work



builder.<String, String>stream("my_topic")
.map((key, val) -> { TimeVal tv = TimeVal.fromJson(val); return new KeyValue<Long, Double>(tv.time, tv.value);})
.groupByKey(Serialized.with(Serdes.Long(), Serdes.Double()))
.windowedBy(TimeWindows.of(5000))
.count()
.toStream()
.foreach((key, val) -> System.out.println(key + " " + val));


This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like



[1533877059029@1533877055000/1533877060000] 1
[1533877061031@1533877060000/1533877065000] 1
[1533877063034@1533877060000/1533877065000] 1
[1533877065035@1533877065000/1533877070000] 1
[1533877067039@1533877065000/1533877070000] 1


This output does not make sense to me.



Related code:



public class MessageTimeExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
String str = (String)record.value();
TimeVal tv = TimeVal.fromJson(str);
return tv.time;
}
}

public class TimeVal
{
final public long time;
final public double value;
public TimeVal(long tm, double val) {
this.time = tm;
this.value = val;
}
public static TimeVal fromJson(String val) {
Gson gson = new GsonBuilder().create();
TimeVal tv = gson.fromJson(val, TimeVal.class);
return tv;
}
}


Questions:



Why do you need to pass serializer/deserializer to group by. Some of the overloads also take ValueStore, what is that? When grouped, how the data looks in the grouped stream?



How window stream is related to group stream?



The above, I was expecting to print in streaming way. That means buffer for every 5 seconds and then count and then print. It only prints once press Ctrl+c on command prompt i.e. it prints and then exits










share|improve this question





























    0















    I am not able to understand the concept of groupBy/groupById and windowing in kafka streaming. My goal is to aggregate stream data over some time period (e.g. 5 seconds). My streaming data looks something like:



    {"value":0,"time":1533875665509}
    {"value":10,"time":1533875667511}
    {"value":8,"time":1533875669512}


    The time is in milliseconds (epoch). Here my timestamp is in my message and not in key. And I want to average the value of 5 seconds window.



    Here is code that I am trying but it seems I am unable to get it work



    builder.<String, String>stream("my_topic")
    .map((key, val) -> { TimeVal tv = TimeVal.fromJson(val); return new KeyValue<Long, Double>(tv.time, tv.value);})
    .groupByKey(Serialized.with(Serdes.Long(), Serdes.Double()))
    .windowedBy(TimeWindows.of(5000))
    .count()
    .toStream()
    .foreach((key, val) -> System.out.println(key + " " + val));


    This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like



    [1533877059029@1533877055000/1533877060000] 1
    [1533877061031@1533877060000/1533877065000] 1
    [1533877063034@1533877060000/1533877065000] 1
    [1533877065035@1533877065000/1533877070000] 1
    [1533877067039@1533877065000/1533877070000] 1


    This output does not make sense to me.



    Related code:



    public class MessageTimeExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
    String str = (String)record.value();
    TimeVal tv = TimeVal.fromJson(str);
    return tv.time;
    }
    }

    public class TimeVal
    {
    final public long time;
    final public double value;
    public TimeVal(long tm, double val) {
    this.time = tm;
    this.value = val;
    }
    public static TimeVal fromJson(String val) {
    Gson gson = new GsonBuilder().create();
    TimeVal tv = gson.fromJson(val, TimeVal.class);
    return tv;
    }
    }


    Questions:



    Why do you need to pass serializer/deserializer to group by. Some of the overloads also take ValueStore, what is that? When grouped, how the data looks in the grouped stream?



    How window stream is related to group stream?



    The above, I was expecting to print in streaming way. That means buffer for every 5 seconds and then count and then print. It only prints once press Ctrl+c on command prompt i.e. it prints and then exits










    share|improve this question



























      0












      0








      0








      I am not able to understand the concept of groupBy/groupById and windowing in kafka streaming. My goal is to aggregate stream data over some time period (e.g. 5 seconds). My streaming data looks something like:



      {"value":0,"time":1533875665509}
      {"value":10,"time":1533875667511}
      {"value":8,"time":1533875669512}


      The time is in milliseconds (epoch). Here my timestamp is in my message and not in key. And I want to average the value of 5 seconds window.



      Here is code that I am trying but it seems I am unable to get it work



      builder.<String, String>stream("my_topic")
      .map((key, val) -> { TimeVal tv = TimeVal.fromJson(val); return new KeyValue<Long, Double>(tv.time, tv.value);})
      .groupByKey(Serialized.with(Serdes.Long(), Serdes.Double()))
      .windowedBy(TimeWindows.of(5000))
      .count()
      .toStream()
      .foreach((key, val) -> System.out.println(key + " " + val));


      This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like



      [1533877059029@1533877055000/1533877060000] 1
      [1533877061031@1533877060000/1533877065000] 1
      [1533877063034@1533877060000/1533877065000] 1
      [1533877065035@1533877065000/1533877070000] 1
      [1533877067039@1533877065000/1533877070000] 1


      This output does not make sense to me.



      Related code:



      public class MessageTimeExtractor implements TimestampExtractor {
      @Override
      public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
      String str = (String)record.value();
      TimeVal tv = TimeVal.fromJson(str);
      return tv.time;
      }
      }

      public class TimeVal
      {
      final public long time;
      final public double value;
      public TimeVal(long tm, double val) {
      this.time = tm;
      this.value = val;
      }
      public static TimeVal fromJson(String val) {
      Gson gson = new GsonBuilder().create();
      TimeVal tv = gson.fromJson(val, TimeVal.class);
      return tv;
      }
      }


      Questions:



      Why do you need to pass serializer/deserializer to group by. Some of the overloads also take ValueStore, what is that? When grouped, how the data looks in the grouped stream?



      How window stream is related to group stream?



      The above, I was expecting to print in streaming way. That means buffer for every 5 seconds and then count and then print. It only prints once press Ctrl+c on command prompt i.e. it prints and then exits










      share|improve this question
















      I am not able to understand the concept of groupBy/groupById and windowing in kafka streaming. My goal is to aggregate stream data over some time period (e.g. 5 seconds). My streaming data looks something like:



      {"value":0,"time":1533875665509}
      {"value":10,"time":1533875667511}
      {"value":8,"time":1533875669512}


      The time is in milliseconds (epoch). Here my timestamp is in my message and not in key. And I want to average the value of 5 seconds window.



      Here is code that I am trying but it seems I am unable to get it work



      builder.<String, String>stream("my_topic")
      .map((key, val) -> { TimeVal tv = TimeVal.fromJson(val); return new KeyValue<Long, Double>(tv.time, tv.value);})
      .groupByKey(Serialized.with(Serdes.Long(), Serdes.Double()))
      .windowedBy(TimeWindows.of(5000))
      .count()
      .toStream()
      .foreach((key, val) -> System.out.println(key + " " + val));


      This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like



      [1533877059029@1533877055000/1533877060000] 1
      [1533877061031@1533877060000/1533877065000] 1
      [1533877063034@1533877060000/1533877065000] 1
      [1533877065035@1533877065000/1533877070000] 1
      [1533877067039@1533877065000/1533877070000] 1


      This output does not make sense to me.



      Related code:



      public class MessageTimeExtractor implements TimestampExtractor {
      @Override
      public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
      String str = (String)record.value();
      TimeVal tv = TimeVal.fromJson(str);
      return tv.time;
      }
      }

      public class TimeVal
      {
      final public long time;
      final public double value;
      public TimeVal(long tm, double val) {
      this.time = tm;
      this.value = val;
      }
      public static TimeVal fromJson(String val) {
      Gson gson = new GsonBuilder().create();
      TimeVal tv = gson.fromJson(val, TimeVal.class);
      return tv;
      }
      }


      Questions:



      Why do you need to pass serializer/deserializer to group by. Some of the overloads also take ValueStore, what is that? When grouped, how the data looks in the grouped stream?



      How window stream is related to group stream?



      The above, I was expecting to print in streaming way. That means buffer for every 5 seconds and then count and then print. It only prints once press Ctrl+c on command prompt i.e. it prints and then exits







      java apache-kafka apache-kafka-streams stream-processing






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 14 at 23:16









      simPod

      2,70194284




      2,70194284










      asked Aug 10 '18 at 5:16









      x64x64

      5819




      5819
























          2 Answers
          2






          active

          oldest

          votes


















          3














          It seems you don't have keys in your input data (correct me if this is wrong), and it further seems, that you want to do global aggregation?



          In general, grouping is for splitting a stream into sub-streams. Those sub-streams are build by key (ie, one logical sub-stream per key). You set your timestamp as key in your code snippet an thus generate a sub-stream per timestamps. I assume this is not intended.



          If you want to go a global aggregation, you will need to map all record to a single substream, ie, assign the same key to all records in groupBy(). Note, that global aggregations don't scale as the aggregation must be computed by a single thread. Thus, this will only work for small workloads.



          Windowing is applied to each generated sub-stream to build the windows, and the aggregation is computed per window. The windows are build base on the timestamp returned by the Timestamp extractor. It seems you have an implementation that extracts the timestamp for the value for this purpose already.




          This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like




          By default, Kafka Streams uses some internal caching and the cache will be flushed on commit -- this happens every 30 seconds by default, or when you stop your application. You would need to disable caching to see result earlier (cf. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)




          Why do you need to pass serializer/deserializer to group by.




          Because data needs to be redistributed and this happens via a topic in Kafka. Note, that Kafka Streams is build for a distributed setup, with multiple instances of the same application running in parallel to scale out horizontally.



          Btw: we might also be interesting in this blog post about the execution model of Kafka Streams: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/






          share|improve this answer
























          • I do have key in my input data. It is the "time" in the example messages. Regarding the buffering, if I do not do groupBy and windowedBy and count, then it keeps printing. So I do not believe there is cashing issue. Also, as I understand from this answer, there is no good way to achieve what I want. But I believe that I have a very simple use case.

            – x64
            Aug 10 '18 at 6:06













          • "time" seems not to be a good fit for a key... Note, that the key is used for data partitioning and parallel processing. Also, caching applies to KTables only -- if you don't aggregate, there is no caching ;) "there is no good way to achieve what I want" -- would not say that, but I am not 100% sure what you what and what your requirements are.

            – Matthias J. Sax
            Aug 10 '18 at 14:54











          • Here is my use case. I have sensor data coming from device into a topic. I have one topic for one sensor. The sensor data contains "timestamp" and "value". Now I want to average the sensor data every N seconds and put into another topic which will be sinked database.

            – x64
            Aug 10 '18 at 19:58











          • It might be a better design, to have one topic for all sensors and put a sensor ID as key? Than, you can read this topic, use the sensor ID as key, apply a N second window, compute the average and write the result to a topic.

            – Matthias J. Sax
            Aug 10 '18 at 21:25











          • There is a problem in having same topic for all sensors. In the application, there is a client (consumer) which may be interested in only sensor at a time. It will unnecessarily get flooded with all sensors data (at least, it will get flooded with all messaged in a partition) which is not desirable.

            – x64
            Aug 11 '18 at 21:18



















          0














          It seems like you misunderstand the nature of window DSL.



          It works for internal message timestamps handled by kafka platform, not for arbitrary properties in your specific message type that encode time information. Also, this window does not group into intervals - it is a sliding window. It means any aggregation you get is for the last 5 seconds before the current message.



          Also, you need the same key for all group elements to be combined into the same group, for example, null. In your example key is a timestamp which is kind of entry-unique, so there will be only a single element in a group.






          share|improve this answer

























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f51779405%2funderstnding-kafka-stream-groupby-and-window%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            3














            It seems you don't have keys in your input data (correct me if this is wrong), and it further seems, that you want to do global aggregation?



            In general, grouping is for splitting a stream into sub-streams. Those sub-streams are build by key (ie, one logical sub-stream per key). You set your timestamp as key in your code snippet an thus generate a sub-stream per timestamps. I assume this is not intended.



            If you want to go a global aggregation, you will need to map all record to a single substream, ie, assign the same key to all records in groupBy(). Note, that global aggregations don't scale as the aggregation must be computed by a single thread. Thus, this will only work for small workloads.



            Windowing is applied to each generated sub-stream to build the windows, and the aggregation is computed per window. The windows are build base on the timestamp returned by the Timestamp extractor. It seems you have an implementation that extracts the timestamp for the value for this purpose already.




            This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like




            By default, Kafka Streams uses some internal caching and the cache will be flushed on commit -- this happens every 30 seconds by default, or when you stop your application. You would need to disable caching to see result earlier (cf. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)




            Why do you need to pass serializer/deserializer to group by.




            Because data needs to be redistributed and this happens via a topic in Kafka. Note, that Kafka Streams is build for a distributed setup, with multiple instances of the same application running in parallel to scale out horizontally.



            Btw: we might also be interesting in this blog post about the execution model of Kafka Streams: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/






            share|improve this answer
























            • I do have key in my input data. It is the "time" in the example messages. Regarding the buffering, if I do not do groupBy and windowedBy and count, then it keeps printing. So I do not believe there is cashing issue. Also, as I understand from this answer, there is no good way to achieve what I want. But I believe that I have a very simple use case.

              – x64
              Aug 10 '18 at 6:06













            • "time" seems not to be a good fit for a key... Note, that the key is used for data partitioning and parallel processing. Also, caching applies to KTables only -- if you don't aggregate, there is no caching ;) "there is no good way to achieve what I want" -- would not say that, but I am not 100% sure what you what and what your requirements are.

              – Matthias J. Sax
              Aug 10 '18 at 14:54











            • Here is my use case. I have sensor data coming from device into a topic. I have one topic for one sensor. The sensor data contains "timestamp" and "value". Now I want to average the sensor data every N seconds and put into another topic which will be sinked database.

              – x64
              Aug 10 '18 at 19:58











            • It might be a better design, to have one topic for all sensors and put a sensor ID as key? Than, you can read this topic, use the sensor ID as key, apply a N second window, compute the average and write the result to a topic.

              – Matthias J. Sax
              Aug 10 '18 at 21:25











            • There is a problem in having same topic for all sensors. In the application, there is a client (consumer) which may be interested in only sensor at a time. It will unnecessarily get flooded with all sensors data (at least, it will get flooded with all messaged in a partition) which is not desirable.

              – x64
              Aug 11 '18 at 21:18
















            3














            It seems you don't have keys in your input data (correct me if this is wrong), and it further seems, that you want to do global aggregation?



            In general, grouping is for splitting a stream into sub-streams. Those sub-streams are build by key (ie, one logical sub-stream per key). You set your timestamp as key in your code snippet an thus generate a sub-stream per timestamps. I assume this is not intended.



            If you want to go a global aggregation, you will need to map all record to a single substream, ie, assign the same key to all records in groupBy(). Note, that global aggregations don't scale as the aggregation must be computed by a single thread. Thus, this will only work for small workloads.



            Windowing is applied to each generated sub-stream to build the windows, and the aggregation is computed per window. The windows are build base on the timestamp returned by the Timestamp extractor. It seems you have an implementation that extracts the timestamp for the value for this purpose already.




            This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like




            By default, Kafka Streams uses some internal caching and the cache will be flushed on commit -- this happens every 30 seconds by default, or when you stop your application. You would need to disable caching to see result earlier (cf. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)




            Why do you need to pass serializer/deserializer to group by.




            Because data needs to be redistributed and this happens via a topic in Kafka. Note, that Kafka Streams is build for a distributed setup, with multiple instances of the same application running in parallel to scale out horizontally.



            Btw: we might also be interesting in this blog post about the execution model of Kafka Streams: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/






            share|improve this answer
























            • I do have key in my input data. It is the "time" in the example messages. Regarding the buffering, if I do not do groupBy and windowedBy and count, then it keeps printing. So I do not believe there is cashing issue. Also, as I understand from this answer, there is no good way to achieve what I want. But I believe that I have a very simple use case.

              – x64
              Aug 10 '18 at 6:06













            • "time" seems not to be a good fit for a key... Note, that the key is used for data partitioning and parallel processing. Also, caching applies to KTables only -- if you don't aggregate, there is no caching ;) "there is no good way to achieve what I want" -- would not say that, but I am not 100% sure what you what and what your requirements are.

              – Matthias J. Sax
              Aug 10 '18 at 14:54











            • Here is my use case. I have sensor data coming from device into a topic. I have one topic for one sensor. The sensor data contains "timestamp" and "value". Now I want to average the sensor data every N seconds and put into another topic which will be sinked database.

              – x64
              Aug 10 '18 at 19:58











            • It might be a better design, to have one topic for all sensors and put a sensor ID as key? Than, you can read this topic, use the sensor ID as key, apply a N second window, compute the average and write the result to a topic.

              – Matthias J. Sax
              Aug 10 '18 at 21:25











            • There is a problem in having same topic for all sensors. In the application, there is a client (consumer) which may be interested in only sensor at a time. It will unnecessarily get flooded with all sensors data (at least, it will get flooded with all messaged in a partition) which is not desirable.

              – x64
              Aug 11 '18 at 21:18














            3












            3








            3







            It seems you don't have keys in your input data (correct me if this is wrong), and it further seems, that you want to do global aggregation?



            In general, grouping is for splitting a stream into sub-streams. Those sub-streams are build by key (ie, one logical sub-stream per key). You set your timestamp as key in your code snippet an thus generate a sub-stream per timestamps. I assume this is not intended.



            If you want to go a global aggregation, you will need to map all record to a single substream, ie, assign the same key to all records in groupBy(). Note, that global aggregations don't scale as the aggregation must be computed by a single thread. Thus, this will only work for small workloads.



            Windowing is applied to each generated sub-stream to build the windows, and the aggregation is computed per window. The windows are build base on the timestamp returned by the Timestamp extractor. It seems you have an implementation that extracts the timestamp for the value for this purpose already.




            This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like




            By default, Kafka Streams uses some internal caching and the cache will be flushed on commit -- this happens every 30 seconds by default, or when you stop your application. You would need to disable caching to see result earlier (cf. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)




            Why do you need to pass serializer/deserializer to group by.




            Because data needs to be redistributed and this happens via a topic in Kafka. Note, that Kafka Streams is build for a distributed setup, with multiple instances of the same application running in parallel to scale out horizontally.



            Btw: we might also be interesting in this blog post about the execution model of Kafka Streams: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/






            share|improve this answer













            It seems you don't have keys in your input data (correct me if this is wrong), and it further seems, that you want to do global aggregation?



            In general, grouping is for splitting a stream into sub-streams. Those sub-streams are build by key (ie, one logical sub-stream per key). You set your timestamp as key in your code snippet an thus generate a sub-stream per timestamps. I assume this is not intended.



            If you want to go a global aggregation, you will need to map all record to a single substream, ie, assign the same key to all records in groupBy(). Note, that global aggregations don't scale as the aggregation must be computed by a single thread. Thus, this will only work for small workloads.



            Windowing is applied to each generated sub-stream to build the windows, and the aggregation is computed per window. The windows are build base on the timestamp returned by the Timestamp extractor. It seems you have an implementation that extracts the timestamp for the value for this purpose already.




            This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like




            By default, Kafka Streams uses some internal caching and the cache will be flushed on commit -- this happens every 30 seconds by default, or when you stop your application. You would need to disable caching to see result earlier (cf. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)




            Why do you need to pass serializer/deserializer to group by.




            Because data needs to be redistributed and this happens via a topic in Kafka. Note, that Kafka Streams is build for a distributed setup, with multiple instances of the same application running in parallel to scale out horizontally.



            Btw: we might also be interesting in this blog post about the execution model of Kafka Streams: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/







            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Aug 10 '18 at 6:00









            Matthias J. SaxMatthias J. Sax

            30.5k45380




            30.5k45380













            • I do have key in my input data. It is the "time" in the example messages. Regarding the buffering, if I do not do groupBy and windowedBy and count, then it keeps printing. So I do not believe there is cashing issue. Also, as I understand from this answer, there is no good way to achieve what I want. But I believe that I have a very simple use case.

              – x64
              Aug 10 '18 at 6:06













            • "time" seems not to be a good fit for a key... Note, that the key is used for data partitioning and parallel processing. Also, caching applies to KTables only -- if you don't aggregate, there is no caching ;) "there is no good way to achieve what I want" -- would not say that, but I am not 100% sure what you what and what your requirements are.

              – Matthias J. Sax
              Aug 10 '18 at 14:54











            • Here is my use case. I have sensor data coming from device into a topic. I have one topic for one sensor. The sensor data contains "timestamp" and "value". Now I want to average the sensor data every N seconds and put into another topic which will be sinked database.

              – x64
              Aug 10 '18 at 19:58











            • It might be a better design, to have one topic for all sensors and put a sensor ID as key? Than, you can read this topic, use the sensor ID as key, apply a N second window, compute the average and write the result to a topic.

              – Matthias J. Sax
              Aug 10 '18 at 21:25











            • There is a problem in having same topic for all sensors. In the application, there is a client (consumer) which may be interested in only sensor at a time. It will unnecessarily get flooded with all sensors data (at least, it will get flooded with all messaged in a partition) which is not desirable.

              – x64
              Aug 11 '18 at 21:18



















            • I do have key in my input data. It is the "time" in the example messages. Regarding the buffering, if I do not do groupBy and windowedBy and count, then it keeps printing. So I do not believe there is cashing issue. Also, as I understand from this answer, there is no good way to achieve what I want. But I believe that I have a very simple use case.

              – x64
              Aug 10 '18 at 6:06













            • "time" seems not to be a good fit for a key... Note, that the key is used for data partitioning and parallel processing. Also, caching applies to KTables only -- if you don't aggregate, there is no caching ;) "there is no good way to achieve what I want" -- would not say that, but I am not 100% sure what you what and what your requirements are.

              – Matthias J. Sax
              Aug 10 '18 at 14:54











            • Here is my use case. I have sensor data coming from device into a topic. I have one topic for one sensor. The sensor data contains "timestamp" and "value". Now I want to average the sensor data every N seconds and put into another topic which will be sinked database.

              – x64
              Aug 10 '18 at 19:58











            • It might be a better design, to have one topic for all sensors and put a sensor ID as key? Than, you can read this topic, use the sensor ID as key, apply a N second window, compute the average and write the result to a topic.

              – Matthias J. Sax
              Aug 10 '18 at 21:25











            • There is a problem in having same topic for all sensors. In the application, there is a client (consumer) which may be interested in only sensor at a time. It will unnecessarily get flooded with all sensors data (at least, it will get flooded with all messaged in a partition) which is not desirable.

              – x64
              Aug 11 '18 at 21:18

















            I do have key in my input data. It is the "time" in the example messages. Regarding the buffering, if I do not do groupBy and windowedBy and count, then it keeps printing. So I do not believe there is cashing issue. Also, as I understand from this answer, there is no good way to achieve what I want. But I believe that I have a very simple use case.

            – x64
            Aug 10 '18 at 6:06







            I do have key in my input data. It is the "time" in the example messages. Regarding the buffering, if I do not do groupBy and windowedBy and count, then it keeps printing. So I do not believe there is cashing issue. Also, as I understand from this answer, there is no good way to achieve what I want. But I believe that I have a very simple use case.

            – x64
            Aug 10 '18 at 6:06















            "time" seems not to be a good fit for a key... Note, that the key is used for data partitioning and parallel processing. Also, caching applies to KTables only -- if you don't aggregate, there is no caching ;) "there is no good way to achieve what I want" -- would not say that, but I am not 100% sure what you what and what your requirements are.

            – Matthias J. Sax
            Aug 10 '18 at 14:54





            "time" seems not to be a good fit for a key... Note, that the key is used for data partitioning and parallel processing. Also, caching applies to KTables only -- if you don't aggregate, there is no caching ;) "there is no good way to achieve what I want" -- would not say that, but I am not 100% sure what you what and what your requirements are.

            – Matthias J. Sax
            Aug 10 '18 at 14:54













            Here is my use case. I have sensor data coming from device into a topic. I have one topic for one sensor. The sensor data contains "timestamp" and "value". Now I want to average the sensor data every N seconds and put into another topic which will be sinked database.

            – x64
            Aug 10 '18 at 19:58





            Here is my use case. I have sensor data coming from device into a topic. I have one topic for one sensor. The sensor data contains "timestamp" and "value". Now I want to average the sensor data every N seconds and put into another topic which will be sinked database.

            – x64
            Aug 10 '18 at 19:58













            It might be a better design, to have one topic for all sensors and put a sensor ID as key? Than, you can read this topic, use the sensor ID as key, apply a N second window, compute the average and write the result to a topic.

            – Matthias J. Sax
            Aug 10 '18 at 21:25





            It might be a better design, to have one topic for all sensors and put a sensor ID as key? Than, you can read this topic, use the sensor ID as key, apply a N second window, compute the average and write the result to a topic.

            – Matthias J. Sax
            Aug 10 '18 at 21:25













            There is a problem in having same topic for all sensors. In the application, there is a client (consumer) which may be interested in only sensor at a time. It will unnecessarily get flooded with all sensors data (at least, it will get flooded with all messaged in a partition) which is not desirable.

            – x64
            Aug 11 '18 at 21:18





            There is a problem in having same topic for all sensors. In the application, there is a client (consumer) which may be interested in only sensor at a time. It will unnecessarily get flooded with all sensors data (at least, it will get flooded with all messaged in a partition) which is not desirable.

            – x64
            Aug 11 '18 at 21:18













            0














            It seems like you misunderstand the nature of window DSL.



            It works for internal message timestamps handled by kafka platform, not for arbitrary properties in your specific message type that encode time information. Also, this window does not group into intervals - it is a sliding window. It means any aggregation you get is for the last 5 seconds before the current message.



            Also, you need the same key for all group elements to be combined into the same group, for example, null. In your example key is a timestamp which is kind of entry-unique, so there will be only a single element in a group.






            share|improve this answer






























              0














              It seems like you misunderstand the nature of window DSL.



              It works for internal message timestamps handled by kafka platform, not for arbitrary properties in your specific message type that encode time information. Also, this window does not group into intervals - it is a sliding window. It means any aggregation you get is for the last 5 seconds before the current message.



              Also, you need the same key for all group elements to be combined into the same group, for example, null. In your example key is a timestamp which is kind of entry-unique, so there will be only a single element in a group.






              share|improve this answer




























                0












                0








                0







                It seems like you misunderstand the nature of window DSL.



                It works for internal message timestamps handled by kafka platform, not for arbitrary properties in your specific message type that encode time information. Also, this window does not group into intervals - it is a sliding window. It means any aggregation you get is for the last 5 seconds before the current message.



                Also, you need the same key for all group elements to be combined into the same group, for example, null. In your example key is a timestamp which is kind of entry-unique, so there will be only a single element in a group.






                share|improve this answer















                It seems like you misunderstand the nature of window DSL.



                It works for internal message timestamps handled by kafka platform, not for arbitrary properties in your specific message type that encode time information. Also, this window does not group into intervals - it is a sliding window. It means any aggregation you get is for the last 5 seconds before the current message.



                Also, you need the same key for all group elements to be combined into the same group, for example, null. In your example key is a timestamp which is kind of entry-unique, so there will be only a single element in a group.







                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Nov 21 '18 at 17:27

























                answered Nov 21 '18 at 7:14









                Ivan KlassIvan Klass

                4,56712023




                4,56712023






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f51779405%2funderstnding-kafka-stream-groupby-and-window%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    MongoDB - Not Authorized To Execute Command

                    How to fix TextFormField cause rebuild widget in Flutter

                    in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith