Saved entity in a Spring Kafka Listener is not rollbacked on a transactional test
I have a @KafkaListener
which saves a new entity using Spring Data JDBC Repository. The saved entity is not rollbacked on a test annotated with @Transactional
@Service
public class KafkaConsumer {
private final EntityRepository entityRepository;
private CountDownLatch countDownLatch;
public KafkaConsumer(EntityRepository entityRepository) {
this.entityRepository = entityRepository;
}
@KafkaListener(topics = "topic", groupId = "group-id")
public void consume(String id) {
Entity entity = new Entity();
entity.setId(id);
entity.setNew(true);
entityRepository.save(entity);
if (countDownLatch != null) countDownLatch.countDown();
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class SpringKafkaDataJdbcMysqlTransactionalTestApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, 1, "topic");
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private EntityRepository entityRepository;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
}
@Test
public void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
kafkaConsumer.setCountDownLatch(countDownLatch);
String id = UUID.randomUUID().toString();
kafkaTemplate.send("topic", id);
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(entityRepository.findById(id)).isNotEmpty();
}
}
I am running this using a MySQL database and I am expecting the saved entity to be rollbacked. Looks like the transaction started by the repository is not joining the transaction started by the transaction in the test because it is a transaction in a separate thread.
How to join the transaction and rollback the entity? Also, is this a good way to test this behavior?
I replicated this here: https://github.com/yraydhitya/spring-kafka-data-jdbc-mysql-transactional-test
spring spring-data spring-jdbc transactional spring-kafka
add a comment |
I have a @KafkaListener
which saves a new entity using Spring Data JDBC Repository. The saved entity is not rollbacked on a test annotated with @Transactional
@Service
public class KafkaConsumer {
private final EntityRepository entityRepository;
private CountDownLatch countDownLatch;
public KafkaConsumer(EntityRepository entityRepository) {
this.entityRepository = entityRepository;
}
@KafkaListener(topics = "topic", groupId = "group-id")
public void consume(String id) {
Entity entity = new Entity();
entity.setId(id);
entity.setNew(true);
entityRepository.save(entity);
if (countDownLatch != null) countDownLatch.countDown();
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class SpringKafkaDataJdbcMysqlTransactionalTestApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, 1, "topic");
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private EntityRepository entityRepository;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
}
@Test
public void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
kafkaConsumer.setCountDownLatch(countDownLatch);
String id = UUID.randomUUID().toString();
kafkaTemplate.send("topic", id);
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(entityRepository.findById(id)).isNotEmpty();
}
}
I am running this using a MySQL database and I am expecting the saved entity to be rollbacked. Looks like the transaction started by the repository is not joining the transaction started by the transaction in the test because it is a transaction in a separate thread.
How to join the transaction and rollback the entity? Also, is this a good way to test this behavior?
I replicated this here: https://github.com/yraydhitya/spring-kafka-data-jdbc-mysql-transactional-test
spring spring-data spring-jdbc transactional spring-kafka
1
Are you using an INNODB table? (required for transactions),
– Gary Russell
Jan 1 at 16:44
1
That’s because@KafkaListener
method is not a part of transaction aroundtest()
method. It is performed in a different listener container thread
– Artem Bilan
Jan 1 at 19:21
1
Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.
– yraydhitya
Jan 2 at 12:05
1
No, there is no. That's because transactions are thread-bound.
– Artem Bilan
Jan 2 at 15:03
add a comment |
I have a @KafkaListener
which saves a new entity using Spring Data JDBC Repository. The saved entity is not rollbacked on a test annotated with @Transactional
@Service
public class KafkaConsumer {
private final EntityRepository entityRepository;
private CountDownLatch countDownLatch;
public KafkaConsumer(EntityRepository entityRepository) {
this.entityRepository = entityRepository;
}
@KafkaListener(topics = "topic", groupId = "group-id")
public void consume(String id) {
Entity entity = new Entity();
entity.setId(id);
entity.setNew(true);
entityRepository.save(entity);
if (countDownLatch != null) countDownLatch.countDown();
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class SpringKafkaDataJdbcMysqlTransactionalTestApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, 1, "topic");
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private EntityRepository entityRepository;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
}
@Test
public void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
kafkaConsumer.setCountDownLatch(countDownLatch);
String id = UUID.randomUUID().toString();
kafkaTemplate.send("topic", id);
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(entityRepository.findById(id)).isNotEmpty();
}
}
I am running this using a MySQL database and I am expecting the saved entity to be rollbacked. Looks like the transaction started by the repository is not joining the transaction started by the transaction in the test because it is a transaction in a separate thread.
How to join the transaction and rollback the entity? Also, is this a good way to test this behavior?
I replicated this here: https://github.com/yraydhitya/spring-kafka-data-jdbc-mysql-transactional-test
spring spring-data spring-jdbc transactional spring-kafka
I have a @KafkaListener
which saves a new entity using Spring Data JDBC Repository. The saved entity is not rollbacked on a test annotated with @Transactional
@Service
public class KafkaConsumer {
private final EntityRepository entityRepository;
private CountDownLatch countDownLatch;
public KafkaConsumer(EntityRepository entityRepository) {
this.entityRepository = entityRepository;
}
@KafkaListener(topics = "topic", groupId = "group-id")
public void consume(String id) {
Entity entity = new Entity();
entity.setId(id);
entity.setNew(true);
entityRepository.save(entity);
if (countDownLatch != null) countDownLatch.countDown();
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class SpringKafkaDataJdbcMysqlTransactionalTestApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, 1, "topic");
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private EntityRepository entityRepository;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
}
@Test
public void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
kafkaConsumer.setCountDownLatch(countDownLatch);
String id = UUID.randomUUID().toString();
kafkaTemplate.send("topic", id);
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(entityRepository.findById(id)).isNotEmpty();
}
}
I am running this using a MySQL database and I am expecting the saved entity to be rollbacked. Looks like the transaction started by the repository is not joining the transaction started by the transaction in the test because it is a transaction in a separate thread.
How to join the transaction and rollback the entity? Also, is this a good way to test this behavior?
I replicated this here: https://github.com/yraydhitya/spring-kafka-data-jdbc-mysql-transactional-test
spring spring-data spring-jdbc transactional spring-kafka
spring spring-data spring-jdbc transactional spring-kafka
asked Jan 1 at 12:31


yraydhityayraydhitya
62
62
1
Are you using an INNODB table? (required for transactions),
– Gary Russell
Jan 1 at 16:44
1
That’s because@KafkaListener
method is not a part of transaction aroundtest()
method. It is performed in a different listener container thread
– Artem Bilan
Jan 1 at 19:21
1
Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.
– yraydhitya
Jan 2 at 12:05
1
No, there is no. That's because transactions are thread-bound.
– Artem Bilan
Jan 2 at 15:03
add a comment |
1
Are you using an INNODB table? (required for transactions),
– Gary Russell
Jan 1 at 16:44
1
That’s because@KafkaListener
method is not a part of transaction aroundtest()
method. It is performed in a different listener container thread
– Artem Bilan
Jan 1 at 19:21
1
Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.
– yraydhitya
Jan 2 at 12:05
1
No, there is no. That's because transactions are thread-bound.
– Artem Bilan
Jan 2 at 15:03
1
1
Are you using an INNODB table? (required for transactions),
– Gary Russell
Jan 1 at 16:44
Are you using an INNODB table? (required for transactions),
– Gary Russell
Jan 1 at 16:44
1
1
That’s because
@KafkaListener
method is not a part of transaction around test()
method. It is performed in a different listener container thread– Artem Bilan
Jan 1 at 19:21
That’s because
@KafkaListener
method is not a part of transaction around test()
method. It is performed in a different listener container thread– Artem Bilan
Jan 1 at 19:21
1
1
Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.
– yraydhitya
Jan 2 at 12:05
Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.
– yraydhitya
Jan 2 at 12:05
1
1
No, there is no. That's because transactions are thread-bound.
– Artem Bilan
Jan 2 at 15:03
No, there is no. That's because transactions are thread-bound.
– Artem Bilan
Jan 2 at 15:03
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53995445%2fsaved-entity-in-a-spring-kafka-listener-is-not-rollbacked-on-a-transactional-tes%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53995445%2fsaved-entity-in-a-spring-kafka-listener-is-not-rollbacked-on-a-transactional-tes%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
Are you using an INNODB table? (required for transactions),
– Gary Russell
Jan 1 at 16:44
1
That’s because
@KafkaListener
method is not a part of transaction aroundtest()
method. It is performed in a different listener container thread– Artem Bilan
Jan 1 at 19:21
1
Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.
– yraydhitya
Jan 2 at 12:05
1
No, there is no. That's because transactions are thread-bound.
– Artem Bilan
Jan 2 at 15:03