How to get the globally declared MapState value in RichCoMapFunction [ Apache Flink ]?












1















I'm implementing the Flink datastream for some real time data calculation. So that i'm getting datastream value from two type of source. And i need to do some transformation based on some key. When i'm using RichCoMapFunction, Mapstate is not visible to globally. My program as follows



 class Transformer extends RichCoMapFunction[(String, Map[String, String]), (String, Map[String, String]), Map[String, String]] {

private var sourceMap1: MapState[String, Map[String, String]] = _

private var sourceMap2: MapState[String, Map[String, String]] = _

override def map1(in1: (String, Map[String, String])): Map[String, String] = {
sourceMap1.put(in1._2("key"), in1._2)
println(sourceMap1.keys()) // Working with updated values
println(sourceMap2.keys()) // Return empty value always
return in1._2
}

override def map2(in2: (String, Map[String, String])): Map[String, String] = {
sourceMap2.put(in2._2("key"), in2._2)
println(sourceMap1.keys()) // Return empty value always
println(sourceMap2.keys()) // Working with updated values
return in2._2
}

override def open(parameters: Configuration): Unit = {
val desc1: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap1", classOf[String], classOf[Map[String, String]])
sourceMap1 = getRuntimeContext.getMapState(desc1)
val desc2: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap2", classOf[String], classOf[Map[String, String]])
sourceMap2 = getRuntimeContext.getMapState(desc2)

}
}


I need to access sourceMap2 in map1 function since its declared as global. But when i'm trying to print the keys of sourceMap2 in map1 function it's always return as empty value. But if i'm printing the sourceMap1 in map1 function means it will print all the added keys.










share|improve this question



























    1















    I'm implementing the Flink datastream for some real time data calculation. So that i'm getting datastream value from two type of source. And i need to do some transformation based on some key. When i'm using RichCoMapFunction, Mapstate is not visible to globally. My program as follows



     class Transformer extends RichCoMapFunction[(String, Map[String, String]), (String, Map[String, String]), Map[String, String]] {

    private var sourceMap1: MapState[String, Map[String, String]] = _

    private var sourceMap2: MapState[String, Map[String, String]] = _

    override def map1(in1: (String, Map[String, String])): Map[String, String] = {
    sourceMap1.put(in1._2("key"), in1._2)
    println(sourceMap1.keys()) // Working with updated values
    println(sourceMap2.keys()) // Return empty value always
    return in1._2
    }

    override def map2(in2: (String, Map[String, String])): Map[String, String] = {
    sourceMap2.put(in2._2("key"), in2._2)
    println(sourceMap1.keys()) // Return empty value always
    println(sourceMap2.keys()) // Working with updated values
    return in2._2
    }

    override def open(parameters: Configuration): Unit = {
    val desc1: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap1", classOf[String], classOf[Map[String, String]])
    sourceMap1 = getRuntimeContext.getMapState(desc1)
    val desc2: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap2", classOf[String], classOf[Map[String, String]])
    sourceMap2 = getRuntimeContext.getMapState(desc2)

    }
    }


    I need to access sourceMap2 in map1 function since its declared as global. But when i'm trying to print the keys of sourceMap2 in map1 function it's always return as empty value. But if i'm printing the sourceMap1 in map1 function means it will print all the added keys.










    share|improve this question

























      1












      1








      1








      I'm implementing the Flink datastream for some real time data calculation. So that i'm getting datastream value from two type of source. And i need to do some transformation based on some key. When i'm using RichCoMapFunction, Mapstate is not visible to globally. My program as follows



       class Transformer extends RichCoMapFunction[(String, Map[String, String]), (String, Map[String, String]), Map[String, String]] {

      private var sourceMap1: MapState[String, Map[String, String]] = _

      private var sourceMap2: MapState[String, Map[String, String]] = _

      override def map1(in1: (String, Map[String, String])): Map[String, String] = {
      sourceMap1.put(in1._2("key"), in1._2)
      println(sourceMap1.keys()) // Working with updated values
      println(sourceMap2.keys()) // Return empty value always
      return in1._2
      }

      override def map2(in2: (String, Map[String, String])): Map[String, String] = {
      sourceMap2.put(in2._2("key"), in2._2)
      println(sourceMap1.keys()) // Return empty value always
      println(sourceMap2.keys()) // Working with updated values
      return in2._2
      }

      override def open(parameters: Configuration): Unit = {
      val desc1: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap1", classOf[String], classOf[Map[String, String]])
      sourceMap1 = getRuntimeContext.getMapState(desc1)
      val desc2: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap2", classOf[String], classOf[Map[String, String]])
      sourceMap2 = getRuntimeContext.getMapState(desc2)

      }
      }


      I need to access sourceMap2 in map1 function since its declared as global. But when i'm trying to print the keys of sourceMap2 in map1 function it's always return as empty value. But if i'm printing the sourceMap1 in map1 function means it will print all the added keys.










      share|improve this question














      I'm implementing the Flink datastream for some real time data calculation. So that i'm getting datastream value from two type of source. And i need to do some transformation based on some key. When i'm using RichCoMapFunction, Mapstate is not visible to globally. My program as follows



       class Transformer extends RichCoMapFunction[(String, Map[String, String]), (String, Map[String, String]), Map[String, String]] {

      private var sourceMap1: MapState[String, Map[String, String]] = _

      private var sourceMap2: MapState[String, Map[String, String]] = _

      override def map1(in1: (String, Map[String, String])): Map[String, String] = {
      sourceMap1.put(in1._2("key"), in1._2)
      println(sourceMap1.keys()) // Working with updated values
      println(sourceMap2.keys()) // Return empty value always
      return in1._2
      }

      override def map2(in2: (String, Map[String, String])): Map[String, String] = {
      sourceMap2.put(in2._2("key"), in2._2)
      println(sourceMap1.keys()) // Return empty value always
      println(sourceMap2.keys()) // Working with updated values
      return in2._2
      }

      override def open(parameters: Configuration): Unit = {
      val desc1: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap1", classOf[String], classOf[Map[String, String]])
      sourceMap1 = getRuntimeContext.getMapState(desc1)
      val desc2: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap2", classOf[String], classOf[Map[String, String]])
      sourceMap2 = getRuntimeContext.getMapState(desc2)

      }
      }


      I need to access sourceMap2 in map1 function since its declared as global. But when i'm trying to print the keys of sourceMap2 in map1 function it's always return as empty value. But if i'm printing the sourceMap1 in map1 function means it will print all the added keys.







      scala apache-flink flink-streaming






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Jan 2 at 12:40









      ThirunavukkarasuThirunavukkarasu

      568




      568
























          2 Answers
          2






          active

          oldest

          votes


















          0














          When using keyed state, Flink will store a separate state value for each key value. This means that if you have a stateful mapper m with state s and you process records (x1, y1) and (x2, y2) where x is the key, Flink will store s(x1) = (x1, v1) and s(x2) = (x2, v2) in its state backend.



          When processing (x2, y2), then you only have access to s(x2) and it is not possible to access s(x1).



          I assume that this is the reason why you see presumably empty MapState. The incoming records for map1 and map2 will have different keys and, therefore, you access the sourceMap2 in map1 for a key (not the map key but the keyBy key) for which no key-value pairs have been stored. The same applies to map2 where you access sourceMap1 under a key for which no key-value pairs have been stored yet.






          share|improve this answer































            0














            Your Transformer class is being applied to two connected, keyed streams. sourceMap1 and sourceMap2 are keyed state, meaning that you have a separate, nested hash map for every key of the two connected streams. One pair of these maps is in scope each time map1 or map2 is called, i.e., the pair corresponding to the key of the item being mapped.



            If instead you want to have global state, shared across all the keys, have a look at the broadcast state pattern.






            share|improve this answer























              Your Answer






              StackExchange.ifUsing("editor", function () {
              StackExchange.using("externalEditor", function () {
              StackExchange.using("snippets", function () {
              StackExchange.snippets.init();
              });
              });
              }, "code-snippets");

              StackExchange.ready(function() {
              var channelOptions = {
              tags: "".split(" "),
              id: "1"
              };
              initTagRenderer("".split(" "), "".split(" "), channelOptions);

              StackExchange.using("externalEditor", function() {
              // Have to fire editor after snippets, if snippets enabled
              if (StackExchange.settings.snippets.snippetsEnabled) {
              StackExchange.using("snippets", function() {
              createEditor();
              });
              }
              else {
              createEditor();
              }
              });

              function createEditor() {
              StackExchange.prepareEditor({
              heartbeatType: 'answer',
              autoActivateHeartbeat: false,
              convertImagesToLinks: true,
              noModals: true,
              showLowRepImageUploadWarning: true,
              reputationToPostImages: 10,
              bindNavPrevention: true,
              postfix: "",
              imageUploader: {
              brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
              contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
              allowUrls: true
              },
              onDemand: true,
              discardSelector: ".discard-answer"
              ,immediatelyShowMarkdownHelp:true
              });


              }
              });














              draft saved

              draft discarded


















              StackExchange.ready(
              function () {
              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54006579%2fhow-to-get-the-globally-declared-mapstate-value-in-richcomapfunction-apache-fl%23new-answer', 'question_page');
              }
              );

              Post as a guest















              Required, but never shown

























              2 Answers
              2






              active

              oldest

              votes








              2 Answers
              2






              active

              oldest

              votes









              active

              oldest

              votes






              active

              oldest

              votes









              0














              When using keyed state, Flink will store a separate state value for each key value. This means that if you have a stateful mapper m with state s and you process records (x1, y1) and (x2, y2) where x is the key, Flink will store s(x1) = (x1, v1) and s(x2) = (x2, v2) in its state backend.



              When processing (x2, y2), then you only have access to s(x2) and it is not possible to access s(x1).



              I assume that this is the reason why you see presumably empty MapState. The incoming records for map1 and map2 will have different keys and, therefore, you access the sourceMap2 in map1 for a key (not the map key but the keyBy key) for which no key-value pairs have been stored. The same applies to map2 where you access sourceMap1 under a key for which no key-value pairs have been stored yet.






              share|improve this answer




























                0














                When using keyed state, Flink will store a separate state value for each key value. This means that if you have a stateful mapper m with state s and you process records (x1, y1) and (x2, y2) where x is the key, Flink will store s(x1) = (x1, v1) and s(x2) = (x2, v2) in its state backend.



                When processing (x2, y2), then you only have access to s(x2) and it is not possible to access s(x1).



                I assume that this is the reason why you see presumably empty MapState. The incoming records for map1 and map2 will have different keys and, therefore, you access the sourceMap2 in map1 for a key (not the map key but the keyBy key) for which no key-value pairs have been stored. The same applies to map2 where you access sourceMap1 under a key for which no key-value pairs have been stored yet.






                share|improve this answer


























                  0












                  0








                  0







                  When using keyed state, Flink will store a separate state value for each key value. This means that if you have a stateful mapper m with state s and you process records (x1, y1) and (x2, y2) where x is the key, Flink will store s(x1) = (x1, v1) and s(x2) = (x2, v2) in its state backend.



                  When processing (x2, y2), then you only have access to s(x2) and it is not possible to access s(x1).



                  I assume that this is the reason why you see presumably empty MapState. The incoming records for map1 and map2 will have different keys and, therefore, you access the sourceMap2 in map1 for a key (not the map key but the keyBy key) for which no key-value pairs have been stored. The same applies to map2 where you access sourceMap1 under a key for which no key-value pairs have been stored yet.






                  share|improve this answer













                  When using keyed state, Flink will store a separate state value for each key value. This means that if you have a stateful mapper m with state s and you process records (x1, y1) and (x2, y2) where x is the key, Flink will store s(x1) = (x1, v1) and s(x2) = (x2, v2) in its state backend.



                  When processing (x2, y2), then you only have access to s(x2) and it is not possible to access s(x1).



                  I assume that this is the reason why you see presumably empty MapState. The incoming records for map1 and map2 will have different keys and, therefore, you access the sourceMap2 in map1 for a key (not the map key but the keyBy key) for which no key-value pairs have been stored. The same applies to map2 where you access sourceMap1 under a key for which no key-value pairs have been stored yet.







                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Jan 2 at 15:52









                  Till RohrmannTill Rohrmann

                  9,60111237




                  9,60111237

























                      0














                      Your Transformer class is being applied to two connected, keyed streams. sourceMap1 and sourceMap2 are keyed state, meaning that you have a separate, nested hash map for every key of the two connected streams. One pair of these maps is in scope each time map1 or map2 is called, i.e., the pair corresponding to the key of the item being mapped.



                      If instead you want to have global state, shared across all the keys, have a look at the broadcast state pattern.






                      share|improve this answer




























                        0














                        Your Transformer class is being applied to two connected, keyed streams. sourceMap1 and sourceMap2 are keyed state, meaning that you have a separate, nested hash map for every key of the two connected streams. One pair of these maps is in scope each time map1 or map2 is called, i.e., the pair corresponding to the key of the item being mapped.



                        If instead you want to have global state, shared across all the keys, have a look at the broadcast state pattern.






                        share|improve this answer


























                          0












                          0








                          0







                          Your Transformer class is being applied to two connected, keyed streams. sourceMap1 and sourceMap2 are keyed state, meaning that you have a separate, nested hash map for every key of the two connected streams. One pair of these maps is in scope each time map1 or map2 is called, i.e., the pair corresponding to the key of the item being mapped.



                          If instead you want to have global state, shared across all the keys, have a look at the broadcast state pattern.






                          share|improve this answer













                          Your Transformer class is being applied to two connected, keyed streams. sourceMap1 and sourceMap2 are keyed state, meaning that you have a separate, nested hash map for every key of the two connected streams. One pair of these maps is in scope each time map1 or map2 is called, i.e., the pair corresponding to the key of the item being mapped.



                          If instead you want to have global state, shared across all the keys, have a look at the broadcast state pattern.







                          share|improve this answer












                          share|improve this answer



                          share|improve this answer










                          answered Jan 2 at 16:36









                          David AndersonDavid Anderson

                          6,56421424




                          6,56421424






























                              draft saved

                              draft discarded




















































                              Thanks for contributing an answer to Stack Overflow!


                              • Please be sure to answer the question. Provide details and share your research!

                              But avoid



                              • Asking for help, clarification, or responding to other answers.

                              • Making statements based on opinion; back them up with references or personal experience.


                              To learn more, see our tips on writing great answers.




                              draft saved


                              draft discarded














                              StackExchange.ready(
                              function () {
                              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54006579%2fhow-to-get-the-globally-declared-mapstate-value-in-richcomapfunction-apache-fl%23new-answer', 'question_page');
                              }
                              );

                              Post as a guest















                              Required, but never shown





















































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown

































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown







                              Popular posts from this blog

                              MongoDB - Not Authorized To Execute Command

                              Npm cannot find a required file even through it is in the searched directory

                              in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith