How to get the globally declared MapState value in RichCoMapFunction [ Apache Flink ]?
I'm implementing the Flink datastream for some real time data calculation. So that i'm getting datastream value from two type of source. And i need to do some transformation based on some key. When i'm using RichCoMapFunction, Mapstate is not visible to globally. My program as follows
class Transformer extends RichCoMapFunction[(String, Map[String, String]), (String, Map[String, String]), Map[String, String]] {
private var sourceMap1: MapState[String, Map[String, String]] = _
private var sourceMap2: MapState[String, Map[String, String]] = _
override def map1(in1: (String, Map[String, String])): Map[String, String] = {
sourceMap1.put(in1._2("key"), in1._2)
println(sourceMap1.keys()) // Working with updated values
println(sourceMap2.keys()) // Return empty value always
return in1._2
}
override def map2(in2: (String, Map[String, String])): Map[String, String] = {
sourceMap2.put(in2._2("key"), in2._2)
println(sourceMap1.keys()) // Return empty value always
println(sourceMap2.keys()) // Working with updated values
return in2._2
}
override def open(parameters: Configuration): Unit = {
val desc1: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap1", classOf[String], classOf[Map[String, String]])
sourceMap1 = getRuntimeContext.getMapState(desc1)
val desc2: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap2", classOf[String], classOf[Map[String, String]])
sourceMap2 = getRuntimeContext.getMapState(desc2)
}
}
I need to access sourceMap2 in map1 function since its declared as global. But when i'm trying to print the keys of sourceMap2 in map1 function it's always return as empty value. But if i'm printing the sourceMap1 in map1 function means it will print all the added keys.
scala apache-flink flink-streaming
add a comment |
I'm implementing the Flink datastream for some real time data calculation. So that i'm getting datastream value from two type of source. And i need to do some transformation based on some key. When i'm using RichCoMapFunction, Mapstate is not visible to globally. My program as follows
class Transformer extends RichCoMapFunction[(String, Map[String, String]), (String, Map[String, String]), Map[String, String]] {
private var sourceMap1: MapState[String, Map[String, String]] = _
private var sourceMap2: MapState[String, Map[String, String]] = _
override def map1(in1: (String, Map[String, String])): Map[String, String] = {
sourceMap1.put(in1._2("key"), in1._2)
println(sourceMap1.keys()) // Working with updated values
println(sourceMap2.keys()) // Return empty value always
return in1._2
}
override def map2(in2: (String, Map[String, String])): Map[String, String] = {
sourceMap2.put(in2._2("key"), in2._2)
println(sourceMap1.keys()) // Return empty value always
println(sourceMap2.keys()) // Working with updated values
return in2._2
}
override def open(parameters: Configuration): Unit = {
val desc1: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap1", classOf[String], classOf[Map[String, String]])
sourceMap1 = getRuntimeContext.getMapState(desc1)
val desc2: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap2", classOf[String], classOf[Map[String, String]])
sourceMap2 = getRuntimeContext.getMapState(desc2)
}
}
I need to access sourceMap2 in map1 function since its declared as global. But when i'm trying to print the keys of sourceMap2 in map1 function it's always return as empty value. But if i'm printing the sourceMap1 in map1 function means it will print all the added keys.
scala apache-flink flink-streaming
add a comment |
I'm implementing the Flink datastream for some real time data calculation. So that i'm getting datastream value from two type of source. And i need to do some transformation based on some key. When i'm using RichCoMapFunction, Mapstate is not visible to globally. My program as follows
class Transformer extends RichCoMapFunction[(String, Map[String, String]), (String, Map[String, String]), Map[String, String]] {
private var sourceMap1: MapState[String, Map[String, String]] = _
private var sourceMap2: MapState[String, Map[String, String]] = _
override def map1(in1: (String, Map[String, String])): Map[String, String] = {
sourceMap1.put(in1._2("key"), in1._2)
println(sourceMap1.keys()) // Working with updated values
println(sourceMap2.keys()) // Return empty value always
return in1._2
}
override def map2(in2: (String, Map[String, String])): Map[String, String] = {
sourceMap2.put(in2._2("key"), in2._2)
println(sourceMap1.keys()) // Return empty value always
println(sourceMap2.keys()) // Working with updated values
return in2._2
}
override def open(parameters: Configuration): Unit = {
val desc1: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap1", classOf[String], classOf[Map[String, String]])
sourceMap1 = getRuntimeContext.getMapState(desc1)
val desc2: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap2", classOf[String], classOf[Map[String, String]])
sourceMap2 = getRuntimeContext.getMapState(desc2)
}
}
I need to access sourceMap2 in map1 function since its declared as global. But when i'm trying to print the keys of sourceMap2 in map1 function it's always return as empty value. But if i'm printing the sourceMap1 in map1 function means it will print all the added keys.
scala apache-flink flink-streaming
I'm implementing the Flink datastream for some real time data calculation. So that i'm getting datastream value from two type of source. And i need to do some transformation based on some key. When i'm using RichCoMapFunction, Mapstate is not visible to globally. My program as follows
class Transformer extends RichCoMapFunction[(String, Map[String, String]), (String, Map[String, String]), Map[String, String]] {
private var sourceMap1: MapState[String, Map[String, String]] = _
private var sourceMap2: MapState[String, Map[String, String]] = _
override def map1(in1: (String, Map[String, String])): Map[String, String] = {
sourceMap1.put(in1._2("key"), in1._2)
println(sourceMap1.keys()) // Working with updated values
println(sourceMap2.keys()) // Return empty value always
return in1._2
}
override def map2(in2: (String, Map[String, String])): Map[String, String] = {
sourceMap2.put(in2._2("key"), in2._2)
println(sourceMap1.keys()) // Return empty value always
println(sourceMap2.keys()) // Working with updated values
return in2._2
}
override def open(parameters: Configuration): Unit = {
val desc1: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap1", classOf[String], classOf[Map[String, String]])
sourceMap1 = getRuntimeContext.getMapState(desc1)
val desc2: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap2", classOf[String], classOf[Map[String, String]])
sourceMap2 = getRuntimeContext.getMapState(desc2)
}
}
I need to access sourceMap2 in map1 function since its declared as global. But when i'm trying to print the keys of sourceMap2 in map1 function it's always return as empty value. But if i'm printing the sourceMap1 in map1 function means it will print all the added keys.
scala apache-flink flink-streaming
scala apache-flink flink-streaming
asked Jan 2 at 12:40
ThirunavukkarasuThirunavukkarasu
568
568
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
When using keyed state, Flink will store a separate state value for each key value. This means that if you have a stateful mapper m
with state s
and you process records (x1, y1)
and (x2, y2)
where x
is the key, Flink will store s(x1) = (x1, v1)
and s(x2) = (x2, v2)
in its state backend.
When processing (x2, y2)
, then you only have access to s(x2)
and it is not possible to access s(x1)
.
I assume that this is the reason why you see presumably empty MapState
. The incoming records for map1
and map2
will have different keys
and, therefore, you access the sourceMap2
in map1
for a key (not the map key but the keyBy
key) for which no key-value pairs have been stored. The same applies to map2
where you access sourceMap1
under a key for which no key-value pairs have been stored yet.
add a comment |
Your Transformer class is being applied to two connected, keyed streams. sourceMap1 and sourceMap2 are keyed state, meaning that you have a separate, nested hash map for every key of the two connected streams. One pair of these maps is in scope each time map1 or map2 is called, i.e., the pair corresponding to the key of the item being mapped.
If instead you want to have global state, shared across all the keys, have a look at the broadcast state pattern.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54006579%2fhow-to-get-the-globally-declared-mapstate-value-in-richcomapfunction-apache-fl%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
When using keyed state, Flink will store a separate state value for each key value. This means that if you have a stateful mapper m
with state s
and you process records (x1, y1)
and (x2, y2)
where x
is the key, Flink will store s(x1) = (x1, v1)
and s(x2) = (x2, v2)
in its state backend.
When processing (x2, y2)
, then you only have access to s(x2)
and it is not possible to access s(x1)
.
I assume that this is the reason why you see presumably empty MapState
. The incoming records for map1
and map2
will have different keys
and, therefore, you access the sourceMap2
in map1
for a key (not the map key but the keyBy
key) for which no key-value pairs have been stored. The same applies to map2
where you access sourceMap1
under a key for which no key-value pairs have been stored yet.
add a comment |
When using keyed state, Flink will store a separate state value for each key value. This means that if you have a stateful mapper m
with state s
and you process records (x1, y1)
and (x2, y2)
where x
is the key, Flink will store s(x1) = (x1, v1)
and s(x2) = (x2, v2)
in its state backend.
When processing (x2, y2)
, then you only have access to s(x2)
and it is not possible to access s(x1)
.
I assume that this is the reason why you see presumably empty MapState
. The incoming records for map1
and map2
will have different keys
and, therefore, you access the sourceMap2
in map1
for a key (not the map key but the keyBy
key) for which no key-value pairs have been stored. The same applies to map2
where you access sourceMap1
under a key for which no key-value pairs have been stored yet.
add a comment |
When using keyed state, Flink will store a separate state value for each key value. This means that if you have a stateful mapper m
with state s
and you process records (x1, y1)
and (x2, y2)
where x
is the key, Flink will store s(x1) = (x1, v1)
and s(x2) = (x2, v2)
in its state backend.
When processing (x2, y2)
, then you only have access to s(x2)
and it is not possible to access s(x1)
.
I assume that this is the reason why you see presumably empty MapState
. The incoming records for map1
and map2
will have different keys
and, therefore, you access the sourceMap2
in map1
for a key (not the map key but the keyBy
key) for which no key-value pairs have been stored. The same applies to map2
where you access sourceMap1
under a key for which no key-value pairs have been stored yet.
When using keyed state, Flink will store a separate state value for each key value. This means that if you have a stateful mapper m
with state s
and you process records (x1, y1)
and (x2, y2)
where x
is the key, Flink will store s(x1) = (x1, v1)
and s(x2) = (x2, v2)
in its state backend.
When processing (x2, y2)
, then you only have access to s(x2)
and it is not possible to access s(x1)
.
I assume that this is the reason why you see presumably empty MapState
. The incoming records for map1
and map2
will have different keys
and, therefore, you access the sourceMap2
in map1
for a key (not the map key but the keyBy
key) for which no key-value pairs have been stored. The same applies to map2
where you access sourceMap1
under a key for which no key-value pairs have been stored yet.
answered Jan 2 at 15:52


Till RohrmannTill Rohrmann
9,60111237
9,60111237
add a comment |
add a comment |
Your Transformer class is being applied to two connected, keyed streams. sourceMap1 and sourceMap2 are keyed state, meaning that you have a separate, nested hash map for every key of the two connected streams. One pair of these maps is in scope each time map1 or map2 is called, i.e., the pair corresponding to the key of the item being mapped.
If instead you want to have global state, shared across all the keys, have a look at the broadcast state pattern.
add a comment |
Your Transformer class is being applied to two connected, keyed streams. sourceMap1 and sourceMap2 are keyed state, meaning that you have a separate, nested hash map for every key of the two connected streams. One pair of these maps is in scope each time map1 or map2 is called, i.e., the pair corresponding to the key of the item being mapped.
If instead you want to have global state, shared across all the keys, have a look at the broadcast state pattern.
add a comment |
Your Transformer class is being applied to two connected, keyed streams. sourceMap1 and sourceMap2 are keyed state, meaning that you have a separate, nested hash map for every key of the two connected streams. One pair of these maps is in scope each time map1 or map2 is called, i.e., the pair corresponding to the key of the item being mapped.
If instead you want to have global state, shared across all the keys, have a look at the broadcast state pattern.
Your Transformer class is being applied to two connected, keyed streams. sourceMap1 and sourceMap2 are keyed state, meaning that you have a separate, nested hash map for every key of the two connected streams. One pair of these maps is in scope each time map1 or map2 is called, i.e., the pair corresponding to the key of the item being mapped.
If instead you want to have global state, shared across all the keys, have a look at the broadcast state pattern.
answered Jan 2 at 16:36


David AndersonDavid Anderson
6,56421424
6,56421424
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54006579%2fhow-to-get-the-globally-declared-mapstate-value-in-richcomapfunction-apache-fl%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown