Send a json payload to Apache Kafka topic in Spring
I need to send (Post) Json payload to Apache Kafka topic but I am receiving the following error :-
"message": "Can't convert value of class com.xyz.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer"
Also Spring shows class cast exception :-
java.lang.ClassCastException: com.xyz.User cannot be cast to java.lang.String
Following is my modal,kafka config and controller
public class User {
private String firstname;
private String email;
public User() {}
public User(String firstname, String email) {
super();
this.firstname = firstname;
this.email = email;
}
public String getFirstname() {
return firstname;
}
public void setFirstname(String firstname) {
this.firstname = firstname;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "UserModel [firstname=" + firstname + ", email=" + email + "]";
}
}
Kafka Configuration
package config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.xyz.User;
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, User> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Controller
@RestController
@RequestMapping("/kafka")
public class UserController {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, user);
Json Payload send from postman
{
"firstname" : "xyz",
"email" : "xyz@gmail.com.com"
}
java json apache-kafka kafka-producer-api spring-kafka
|
show 5 more comments
I need to send (Post) Json payload to Apache Kafka topic but I am receiving the following error :-
"message": "Can't convert value of class com.xyz.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer"
Also Spring shows class cast exception :-
java.lang.ClassCastException: com.xyz.User cannot be cast to java.lang.String
Following is my modal,kafka config and controller
public class User {
private String firstname;
private String email;
public User() {}
public User(String firstname, String email) {
super();
this.firstname = firstname;
this.email = email;
}
public String getFirstname() {
return firstname;
}
public void setFirstname(String firstname) {
this.firstname = firstname;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "UserModel [firstname=" + firstname + ", email=" + email + "]";
}
}
Kafka Configuration
package config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.xyz.User;
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, User> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Controller
@RestController
@RequestMapping("/kafka")
public class UserController {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, user);
Json Payload send from postman
{
"firstname" : "xyz",
"email" : "xyz@gmail.com.com"
}
java json apache-kafka kafka-producer-api spring-kafka
the producer is configured in xml? can you post that one as well?
– aurelius
Nov 20 '18 at 8:23
@aurelius , I am using spring-boot , i don't have any xml configurations for producer.
– Prashant Raghav
Nov 20 '18 at 8:27
then you need the KafkaTemplate, which is used to send the messages
– aurelius
Nov 20 '18 at 8:29
yes i am trying to send via kafkatemplate kafkaTemplate.send(TOPIC, user);
– Prashant Raghav
Nov 20 '18 at 8:30
can you post that code as well, I think you did not change the type of the message that you send, from string to User
– aurelius
Nov 20 '18 at 8:30
|
show 5 more comments
I need to send (Post) Json payload to Apache Kafka topic but I am receiving the following error :-
"message": "Can't convert value of class com.xyz.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer"
Also Spring shows class cast exception :-
java.lang.ClassCastException: com.xyz.User cannot be cast to java.lang.String
Following is my modal,kafka config and controller
public class User {
private String firstname;
private String email;
public User() {}
public User(String firstname, String email) {
super();
this.firstname = firstname;
this.email = email;
}
public String getFirstname() {
return firstname;
}
public void setFirstname(String firstname) {
this.firstname = firstname;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "UserModel [firstname=" + firstname + ", email=" + email + "]";
}
}
Kafka Configuration
package config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.xyz.User;
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, User> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Controller
@RestController
@RequestMapping("/kafka")
public class UserController {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, user);
Json Payload send from postman
{
"firstname" : "xyz",
"email" : "xyz@gmail.com.com"
}
java json apache-kafka kafka-producer-api spring-kafka
I need to send (Post) Json payload to Apache Kafka topic but I am receiving the following error :-
"message": "Can't convert value of class com.xyz.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer"
Also Spring shows class cast exception :-
java.lang.ClassCastException: com.xyz.User cannot be cast to java.lang.String
Following is my modal,kafka config and controller
public class User {
private String firstname;
private String email;
public User() {}
public User(String firstname, String email) {
super();
this.firstname = firstname;
this.email = email;
}
public String getFirstname() {
return firstname;
}
public void setFirstname(String firstname) {
this.firstname = firstname;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "UserModel [firstname=" + firstname + ", email=" + email + "]";
}
}
Kafka Configuration
package config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.xyz.User;
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, User> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Controller
@RestController
@RequestMapping("/kafka")
public class UserController {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, user);
Json Payload send from postman
{
"firstname" : "xyz",
"email" : "xyz@gmail.com.com"
}
java json apache-kafka kafka-producer-api spring-kafka
java json apache-kafka kafka-producer-api spring-kafka
edited Nov 20 '18 at 15:11
cricket_007
80.6k1142110
80.6k1142110
asked Nov 20 '18 at 8:07
Prashant RaghavPrashant Raghav
84
84
the producer is configured in xml? can you post that one as well?
– aurelius
Nov 20 '18 at 8:23
@aurelius , I am using spring-boot , i don't have any xml configurations for producer.
– Prashant Raghav
Nov 20 '18 at 8:27
then you need the KafkaTemplate, which is used to send the messages
– aurelius
Nov 20 '18 at 8:29
yes i am trying to send via kafkatemplate kafkaTemplate.send(TOPIC, user);
– Prashant Raghav
Nov 20 '18 at 8:30
can you post that code as well, I think you did not change the type of the message that you send, from string to User
– aurelius
Nov 20 '18 at 8:30
|
show 5 more comments
the producer is configured in xml? can you post that one as well?
– aurelius
Nov 20 '18 at 8:23
@aurelius , I am using spring-boot , i don't have any xml configurations for producer.
– Prashant Raghav
Nov 20 '18 at 8:27
then you need the KafkaTemplate, which is used to send the messages
– aurelius
Nov 20 '18 at 8:29
yes i am trying to send via kafkatemplate kafkaTemplate.send(TOPIC, user);
– Prashant Raghav
Nov 20 '18 at 8:30
can you post that code as well, I think you did not change the type of the message that you send, from string to User
– aurelius
Nov 20 '18 at 8:30
the producer is configured in xml? can you post that one as well?
– aurelius
Nov 20 '18 at 8:23
the producer is configured in xml? can you post that one as well?
– aurelius
Nov 20 '18 at 8:23
@aurelius , I am using spring-boot , i don't have any xml configurations for producer.
– Prashant Raghav
Nov 20 '18 at 8:27
@aurelius , I am using spring-boot , i don't have any xml configurations for producer.
– Prashant Raghav
Nov 20 '18 at 8:27
then you need the KafkaTemplate, which is used to send the messages
– aurelius
Nov 20 '18 at 8:29
then you need the KafkaTemplate, which is used to send the messages
– aurelius
Nov 20 '18 at 8:29
yes i am trying to send via kafkatemplate kafkaTemplate.send(TOPIC, user);
– Prashant Raghav
Nov 20 '18 at 8:30
yes i am trying to send via kafkatemplate kafkaTemplate.send(TOPIC, user);
– Prashant Raghav
Nov 20 '18 at 8:30
can you post that code as well, I think you did not change the type of the message that you send, from string to User
– aurelius
Nov 20 '18 at 8:30
can you post that code as well, I think you did not change the type of the message that you send, from string to User
– aurelius
Nov 20 '18 at 8:30
|
show 5 more comments
3 Answers
3
active
oldest
votes
Able to reproduce your error at my local setup.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String
If we look at log then it says value.serializer
value still referring to default StringSerializer
rather than expected is JsonSerializer
which in turn says your producer configuration are not getting into effect. In short your KafkaCOnfiguration
class is not being referred.
Your KafkaConfiguration
class is in some config
package and User
class in some com.xyz package. So solution would be to make sure that it gets picks up your configuration. Most probably that package may not getting scanned for configuration/beans definition. If you move KafkaConfiguration to root package of your application then your original code should work fine.
If you say that your KafkaTemplate object getting injected then it's actually not. The one which is getting injected is defined by Spring kafka Autoconfiguration.
add a comment |
Convert your User object to the String in producer before sending to KafkaTemplate. And convert String to User object at the consumer's end.
Producer:
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, new ObjectMapper().writeValueAsString(user));
}
Consumer:
User user = new ObjectMapper().readValue(kafkaMessage, User.class);
not working :- The method send(String, User) in the type KafkaTemplate<String,User> is not applicable for the arguments (String, String)
– Prashant Raghav
Nov 20 '18 at 8:54
Change your autowired statement to @Autowired private KafkaTemplate<String, String> kafkaTemplate;
– Pranay Kumbhalkar
Nov 20 '18 at 8:57
I recommended always use KafkaTemplate<String, String>. because you can send any type of object using this and convert back to the consumer side.
– Pranay Kumbhalkar
Nov 20 '18 at 8:59
can you please elaborate why KafkaTemplate <String,String> is preferred and what options do we have to send JSON data to Kafka Bus
– Prashant Raghav
Nov 20 '18 at 9:09
Should decide whether a given topic is going to holdbinary
orString
data, and depending on that, how it will be further encoded. For example, you could have a topic named Schema that containsJSON-encoded objects
stored asStrings
. If you use JSON and a loosely-typed language like JavaScript, it could be tempting to store different objects with different schemas on the same topic. With JavaScript, you can just call JSON.parse(...), take a peek at the resulting object, and figure out what you want to do with it.
– Pranay Kumbhalkar
Nov 20 '18 at 9:40
|
show 2 more comments
Following worked for me.
public class UserController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = null;
try {
json = ow.writeValueAsString(user);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
kafkaTemplate.send(TOPIC, json);
}
For performance reasons, move the ObjectWriter declaration outside of the method, and don't pretty print the message
– cricket_007
Nov 20 '18 at 15:06
However, Spring should already know how to use Jackson to convert your objects codenotfound.com/…
– cricket_007
Nov 20 '18 at 15:10
Yes. you don't need to manually serialze. JsonSerializer internally usage ObjectMapper so it will work with User object as well. Refer my answer to correct your config and more details
– bittu
Nov 20 '18 at 19:27
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53388646%2fsend-a-json-payload-to-apache-kafka-topic-in-spring%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
Able to reproduce your error at my local setup.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String
If we look at log then it says value.serializer
value still referring to default StringSerializer
rather than expected is JsonSerializer
which in turn says your producer configuration are not getting into effect. In short your KafkaCOnfiguration
class is not being referred.
Your KafkaConfiguration
class is in some config
package and User
class in some com.xyz package. So solution would be to make sure that it gets picks up your configuration. Most probably that package may not getting scanned for configuration/beans definition. If you move KafkaConfiguration to root package of your application then your original code should work fine.
If you say that your KafkaTemplate object getting injected then it's actually not. The one which is getting injected is defined by Spring kafka Autoconfiguration.
add a comment |
Able to reproduce your error at my local setup.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String
If we look at log then it says value.serializer
value still referring to default StringSerializer
rather than expected is JsonSerializer
which in turn says your producer configuration are not getting into effect. In short your KafkaCOnfiguration
class is not being referred.
Your KafkaConfiguration
class is in some config
package and User
class in some com.xyz package. So solution would be to make sure that it gets picks up your configuration. Most probably that package may not getting scanned for configuration/beans definition. If you move KafkaConfiguration to root package of your application then your original code should work fine.
If you say that your KafkaTemplate object getting injected then it's actually not. The one which is getting injected is defined by Spring kafka Autoconfiguration.
add a comment |
Able to reproduce your error at my local setup.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String
If we look at log then it says value.serializer
value still referring to default StringSerializer
rather than expected is JsonSerializer
which in turn says your producer configuration are not getting into effect. In short your KafkaCOnfiguration
class is not being referred.
Your KafkaConfiguration
class is in some config
package and User
class in some com.xyz package. So solution would be to make sure that it gets picks up your configuration. Most probably that package may not getting scanned for configuration/beans definition. If you move KafkaConfiguration to root package of your application then your original code should work fine.
If you say that your KafkaTemplate object getting injected then it's actually not. The one which is getting injected is defined by Spring kafka Autoconfiguration.
Able to reproduce your error at my local setup.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String
If we look at log then it says value.serializer
value still referring to default StringSerializer
rather than expected is JsonSerializer
which in turn says your producer configuration are not getting into effect. In short your KafkaCOnfiguration
class is not being referred.
Your KafkaConfiguration
class is in some config
package and User
class in some com.xyz package. So solution would be to make sure that it gets picks up your configuration. Most probably that package may not getting scanned for configuration/beans definition. If you move KafkaConfiguration to root package of your application then your original code should work fine.
If you say that your KafkaTemplate object getting injected then it's actually not. The one which is getting injected is defined by Spring kafka Autoconfiguration.
edited Nov 20 '18 at 19:38
answered Nov 20 '18 at 19:20
bittubittu
342110
342110
add a comment |
add a comment |
Convert your User object to the String in producer before sending to KafkaTemplate. And convert String to User object at the consumer's end.
Producer:
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, new ObjectMapper().writeValueAsString(user));
}
Consumer:
User user = new ObjectMapper().readValue(kafkaMessage, User.class);
not working :- The method send(String, User) in the type KafkaTemplate<String,User> is not applicable for the arguments (String, String)
– Prashant Raghav
Nov 20 '18 at 8:54
Change your autowired statement to @Autowired private KafkaTemplate<String, String> kafkaTemplate;
– Pranay Kumbhalkar
Nov 20 '18 at 8:57
I recommended always use KafkaTemplate<String, String>. because you can send any type of object using this and convert back to the consumer side.
– Pranay Kumbhalkar
Nov 20 '18 at 8:59
can you please elaborate why KafkaTemplate <String,String> is preferred and what options do we have to send JSON data to Kafka Bus
– Prashant Raghav
Nov 20 '18 at 9:09
Should decide whether a given topic is going to holdbinary
orString
data, and depending on that, how it will be further encoded. For example, you could have a topic named Schema that containsJSON-encoded objects
stored asStrings
. If you use JSON and a loosely-typed language like JavaScript, it could be tempting to store different objects with different schemas on the same topic. With JavaScript, you can just call JSON.parse(...), take a peek at the resulting object, and figure out what you want to do with it.
– Pranay Kumbhalkar
Nov 20 '18 at 9:40
|
show 2 more comments
Convert your User object to the String in producer before sending to KafkaTemplate. And convert String to User object at the consumer's end.
Producer:
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, new ObjectMapper().writeValueAsString(user));
}
Consumer:
User user = new ObjectMapper().readValue(kafkaMessage, User.class);
not working :- The method send(String, User) in the type KafkaTemplate<String,User> is not applicable for the arguments (String, String)
– Prashant Raghav
Nov 20 '18 at 8:54
Change your autowired statement to @Autowired private KafkaTemplate<String, String> kafkaTemplate;
– Pranay Kumbhalkar
Nov 20 '18 at 8:57
I recommended always use KafkaTemplate<String, String>. because you can send any type of object using this and convert back to the consumer side.
– Pranay Kumbhalkar
Nov 20 '18 at 8:59
can you please elaborate why KafkaTemplate <String,String> is preferred and what options do we have to send JSON data to Kafka Bus
– Prashant Raghav
Nov 20 '18 at 9:09
Should decide whether a given topic is going to holdbinary
orString
data, and depending on that, how it will be further encoded. For example, you could have a topic named Schema that containsJSON-encoded objects
stored asStrings
. If you use JSON and a loosely-typed language like JavaScript, it could be tempting to store different objects with different schemas on the same topic. With JavaScript, you can just call JSON.parse(...), take a peek at the resulting object, and figure out what you want to do with it.
– Pranay Kumbhalkar
Nov 20 '18 at 9:40
|
show 2 more comments
Convert your User object to the String in producer before sending to KafkaTemplate. And convert String to User object at the consumer's end.
Producer:
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, new ObjectMapper().writeValueAsString(user));
}
Consumer:
User user = new ObjectMapper().readValue(kafkaMessage, User.class);
Convert your User object to the String in producer before sending to KafkaTemplate. And convert String to User object at the consumer's end.
Producer:
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, new ObjectMapper().writeValueAsString(user));
}
Consumer:
User user = new ObjectMapper().readValue(kafkaMessage, User.class);
answered Nov 20 '18 at 8:38


Pranay KumbhalkarPranay Kumbhalkar
596417
596417
not working :- The method send(String, User) in the type KafkaTemplate<String,User> is not applicable for the arguments (String, String)
– Prashant Raghav
Nov 20 '18 at 8:54
Change your autowired statement to @Autowired private KafkaTemplate<String, String> kafkaTemplate;
– Pranay Kumbhalkar
Nov 20 '18 at 8:57
I recommended always use KafkaTemplate<String, String>. because you can send any type of object using this and convert back to the consumer side.
– Pranay Kumbhalkar
Nov 20 '18 at 8:59
can you please elaborate why KafkaTemplate <String,String> is preferred and what options do we have to send JSON data to Kafka Bus
– Prashant Raghav
Nov 20 '18 at 9:09
Should decide whether a given topic is going to holdbinary
orString
data, and depending on that, how it will be further encoded. For example, you could have a topic named Schema that containsJSON-encoded objects
stored asStrings
. If you use JSON and a loosely-typed language like JavaScript, it could be tempting to store different objects with different schemas on the same topic. With JavaScript, you can just call JSON.parse(...), take a peek at the resulting object, and figure out what you want to do with it.
– Pranay Kumbhalkar
Nov 20 '18 at 9:40
|
show 2 more comments
not working :- The method send(String, User) in the type KafkaTemplate<String,User> is not applicable for the arguments (String, String)
– Prashant Raghav
Nov 20 '18 at 8:54
Change your autowired statement to @Autowired private KafkaTemplate<String, String> kafkaTemplate;
– Pranay Kumbhalkar
Nov 20 '18 at 8:57
I recommended always use KafkaTemplate<String, String>. because you can send any type of object using this and convert back to the consumer side.
– Pranay Kumbhalkar
Nov 20 '18 at 8:59
can you please elaborate why KafkaTemplate <String,String> is preferred and what options do we have to send JSON data to Kafka Bus
– Prashant Raghav
Nov 20 '18 at 9:09
Should decide whether a given topic is going to holdbinary
orString
data, and depending on that, how it will be further encoded. For example, you could have a topic named Schema that containsJSON-encoded objects
stored asStrings
. If you use JSON and a loosely-typed language like JavaScript, it could be tempting to store different objects with different schemas on the same topic. With JavaScript, you can just call JSON.parse(...), take a peek at the resulting object, and figure out what you want to do with it.
– Pranay Kumbhalkar
Nov 20 '18 at 9:40
not working :- The method send(String, User) in the type KafkaTemplate<String,User> is not applicable for the arguments (String, String)
– Prashant Raghav
Nov 20 '18 at 8:54
not working :- The method send(String, User) in the type KafkaTemplate<String,User> is not applicable for the arguments (String, String)
– Prashant Raghav
Nov 20 '18 at 8:54
Change your autowired statement to @Autowired private KafkaTemplate<String, String> kafkaTemplate;
– Pranay Kumbhalkar
Nov 20 '18 at 8:57
Change your autowired statement to @Autowired private KafkaTemplate<String, String> kafkaTemplate;
– Pranay Kumbhalkar
Nov 20 '18 at 8:57
I recommended always use KafkaTemplate<String, String>. because you can send any type of object using this and convert back to the consumer side.
– Pranay Kumbhalkar
Nov 20 '18 at 8:59
I recommended always use KafkaTemplate<String, String>. because you can send any type of object using this and convert back to the consumer side.
– Pranay Kumbhalkar
Nov 20 '18 at 8:59
can you please elaborate why KafkaTemplate <String,String> is preferred and what options do we have to send JSON data to Kafka Bus
– Prashant Raghav
Nov 20 '18 at 9:09
can you please elaborate why KafkaTemplate <String,String> is preferred and what options do we have to send JSON data to Kafka Bus
– Prashant Raghav
Nov 20 '18 at 9:09
Should decide whether a given topic is going to hold
binary
or String
data, and depending on that, how it will be further encoded. For example, you could have a topic named Schema that contains JSON-encoded objects
stored as Strings
. If you use JSON and a loosely-typed language like JavaScript, it could be tempting to store different objects with different schemas on the same topic. With JavaScript, you can just call JSON.parse(...), take a peek at the resulting object, and figure out what you want to do with it.– Pranay Kumbhalkar
Nov 20 '18 at 9:40
Should decide whether a given topic is going to hold
binary
or String
data, and depending on that, how it will be further encoded. For example, you could have a topic named Schema that contains JSON-encoded objects
stored as Strings
. If you use JSON and a loosely-typed language like JavaScript, it could be tempting to store different objects with different schemas on the same topic. With JavaScript, you can just call JSON.parse(...), take a peek at the resulting object, and figure out what you want to do with it.– Pranay Kumbhalkar
Nov 20 '18 at 9:40
|
show 2 more comments
Following worked for me.
public class UserController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = null;
try {
json = ow.writeValueAsString(user);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
kafkaTemplate.send(TOPIC, json);
}
For performance reasons, move the ObjectWriter declaration outside of the method, and don't pretty print the message
– cricket_007
Nov 20 '18 at 15:06
However, Spring should already know how to use Jackson to convert your objects codenotfound.com/…
– cricket_007
Nov 20 '18 at 15:10
Yes. you don't need to manually serialze. JsonSerializer internally usage ObjectMapper so it will work with User object as well. Refer my answer to correct your config and more details
– bittu
Nov 20 '18 at 19:27
add a comment |
Following worked for me.
public class UserController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = null;
try {
json = ow.writeValueAsString(user);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
kafkaTemplate.send(TOPIC, json);
}
For performance reasons, move the ObjectWriter declaration outside of the method, and don't pretty print the message
– cricket_007
Nov 20 '18 at 15:06
However, Spring should already know how to use Jackson to convert your objects codenotfound.com/…
– cricket_007
Nov 20 '18 at 15:10
Yes. you don't need to manually serialze. JsonSerializer internally usage ObjectMapper so it will work with User object as well. Refer my answer to correct your config and more details
– bittu
Nov 20 '18 at 19:27
add a comment |
Following worked for me.
public class UserController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = null;
try {
json = ow.writeValueAsString(user);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
kafkaTemplate.send(TOPIC, json);
}
Following worked for me.
public class UserController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = null;
try {
json = ow.writeValueAsString(user);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
kafkaTemplate.send(TOPIC, json);
}
answered Nov 20 '18 at 9:54
Prashant RaghavPrashant Raghav
84
84
For performance reasons, move the ObjectWriter declaration outside of the method, and don't pretty print the message
– cricket_007
Nov 20 '18 at 15:06
However, Spring should already know how to use Jackson to convert your objects codenotfound.com/…
– cricket_007
Nov 20 '18 at 15:10
Yes. you don't need to manually serialze. JsonSerializer internally usage ObjectMapper so it will work with User object as well. Refer my answer to correct your config and more details
– bittu
Nov 20 '18 at 19:27
add a comment |
For performance reasons, move the ObjectWriter declaration outside of the method, and don't pretty print the message
– cricket_007
Nov 20 '18 at 15:06
However, Spring should already know how to use Jackson to convert your objects codenotfound.com/…
– cricket_007
Nov 20 '18 at 15:10
Yes. you don't need to manually serialze. JsonSerializer internally usage ObjectMapper so it will work with User object as well. Refer my answer to correct your config and more details
– bittu
Nov 20 '18 at 19:27
For performance reasons, move the ObjectWriter declaration outside of the method, and don't pretty print the message
– cricket_007
Nov 20 '18 at 15:06
For performance reasons, move the ObjectWriter declaration outside of the method, and don't pretty print the message
– cricket_007
Nov 20 '18 at 15:06
However, Spring should already know how to use Jackson to convert your objects codenotfound.com/…
– cricket_007
Nov 20 '18 at 15:10
However, Spring should already know how to use Jackson to convert your objects codenotfound.com/…
– cricket_007
Nov 20 '18 at 15:10
Yes. you don't need to manually serialze. JsonSerializer internally usage ObjectMapper so it will work with User object as well. Refer my answer to correct your config and more details
– bittu
Nov 20 '18 at 19:27
Yes. you don't need to manually serialze. JsonSerializer internally usage ObjectMapper so it will work with User object as well. Refer my answer to correct your config and more details
– bittu
Nov 20 '18 at 19:27
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53388646%2fsend-a-json-payload-to-apache-kafka-topic-in-spring%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
the producer is configured in xml? can you post that one as well?
– aurelius
Nov 20 '18 at 8:23
@aurelius , I am using spring-boot , i don't have any xml configurations for producer.
– Prashant Raghav
Nov 20 '18 at 8:27
then you need the KafkaTemplate, which is used to send the messages
– aurelius
Nov 20 '18 at 8:29
yes i am trying to send via kafkatemplate kafkaTemplate.send(TOPIC, user);
– Prashant Raghav
Nov 20 '18 at 8:30
can you post that code as well, I think you did not change the type of the message that you send, from string to User
– aurelius
Nov 20 '18 at 8:30