Saved entity in a Spring Kafka Listener is not rollbacked on a transactional test












1















I have a @KafkaListener which saves a new entity using Spring Data JDBC Repository. The saved entity is not rollbacked on a test annotated with @Transactional



@Service
public class KafkaConsumer {

private final EntityRepository entityRepository;

private CountDownLatch countDownLatch;

public KafkaConsumer(EntityRepository entityRepository) {
this.entityRepository = entityRepository;
}

@KafkaListener(topics = "topic", groupId = "group-id")
public void consume(String id) {
Entity entity = new Entity();
entity.setId(id);
entity.setNew(true);

entityRepository.save(entity);

if (countDownLatch != null) countDownLatch.countDown();
}

public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}

@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class SpringKafkaDataJdbcMysqlTransactionalTestApplicationTests {

@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, 1, "topic");

@Autowired
private KafkaTemplate kafkaTemplate;

@Autowired
private KafkaConsumer kafkaConsumer;

@Autowired
private EntityRepository entityRepository;

@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
}

@Test
public void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);

kafkaConsumer.setCountDownLatch(countDownLatch);

String id = UUID.randomUUID().toString();

kafkaTemplate.send("topic", id);

assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();

assertThat(entityRepository.findById(id)).isNotEmpty();
}

}


I am running this using a MySQL database and I am expecting the saved entity to be rollbacked. Looks like the transaction started by the repository is not joining the transaction started by the transaction in the test because it is a transaction in a separate thread.



How to join the transaction and rollback the entity? Also, is this a good way to test this behavior?



I replicated this here: https://github.com/yraydhitya/spring-kafka-data-jdbc-mysql-transactional-test










share|improve this question


















  • 1





    Are you using an INNODB table? (required for transactions),

    – Gary Russell
    Jan 1 at 16:44






  • 1





    That’s because @KafkaListener method is not a part of transaction around test() method. It is performed in a different listener container thread

    – Artem Bilan
    Jan 1 at 19:21






  • 1





    Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.

    – yraydhitya
    Jan 2 at 12:05






  • 1





    No, there is no. That's because transactions are thread-bound.

    – Artem Bilan
    Jan 2 at 15:03
















1















I have a @KafkaListener which saves a new entity using Spring Data JDBC Repository. The saved entity is not rollbacked on a test annotated with @Transactional



@Service
public class KafkaConsumer {

private final EntityRepository entityRepository;

private CountDownLatch countDownLatch;

public KafkaConsumer(EntityRepository entityRepository) {
this.entityRepository = entityRepository;
}

@KafkaListener(topics = "topic", groupId = "group-id")
public void consume(String id) {
Entity entity = new Entity();
entity.setId(id);
entity.setNew(true);

entityRepository.save(entity);

if (countDownLatch != null) countDownLatch.countDown();
}

public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}

@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class SpringKafkaDataJdbcMysqlTransactionalTestApplicationTests {

@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, 1, "topic");

@Autowired
private KafkaTemplate kafkaTemplate;

@Autowired
private KafkaConsumer kafkaConsumer;

@Autowired
private EntityRepository entityRepository;

@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
}

@Test
public void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);

kafkaConsumer.setCountDownLatch(countDownLatch);

String id = UUID.randomUUID().toString();

kafkaTemplate.send("topic", id);

assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();

assertThat(entityRepository.findById(id)).isNotEmpty();
}

}


I am running this using a MySQL database and I am expecting the saved entity to be rollbacked. Looks like the transaction started by the repository is not joining the transaction started by the transaction in the test because it is a transaction in a separate thread.



How to join the transaction and rollback the entity? Also, is this a good way to test this behavior?



I replicated this here: https://github.com/yraydhitya/spring-kafka-data-jdbc-mysql-transactional-test










share|improve this question


















  • 1





    Are you using an INNODB table? (required for transactions),

    – Gary Russell
    Jan 1 at 16:44






  • 1





    That’s because @KafkaListener method is not a part of transaction around test() method. It is performed in a different listener container thread

    – Artem Bilan
    Jan 1 at 19:21






  • 1





    Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.

    – yraydhitya
    Jan 2 at 12:05






  • 1





    No, there is no. That's because transactions are thread-bound.

    – Artem Bilan
    Jan 2 at 15:03














1












1








1








I have a @KafkaListener which saves a new entity using Spring Data JDBC Repository. The saved entity is not rollbacked on a test annotated with @Transactional



@Service
public class KafkaConsumer {

private final EntityRepository entityRepository;

private CountDownLatch countDownLatch;

public KafkaConsumer(EntityRepository entityRepository) {
this.entityRepository = entityRepository;
}

@KafkaListener(topics = "topic", groupId = "group-id")
public void consume(String id) {
Entity entity = new Entity();
entity.setId(id);
entity.setNew(true);

entityRepository.save(entity);

if (countDownLatch != null) countDownLatch.countDown();
}

public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}

@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class SpringKafkaDataJdbcMysqlTransactionalTestApplicationTests {

@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, 1, "topic");

@Autowired
private KafkaTemplate kafkaTemplate;

@Autowired
private KafkaConsumer kafkaConsumer;

@Autowired
private EntityRepository entityRepository;

@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
}

@Test
public void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);

kafkaConsumer.setCountDownLatch(countDownLatch);

String id = UUID.randomUUID().toString();

kafkaTemplate.send("topic", id);

assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();

assertThat(entityRepository.findById(id)).isNotEmpty();
}

}


I am running this using a MySQL database and I am expecting the saved entity to be rollbacked. Looks like the transaction started by the repository is not joining the transaction started by the transaction in the test because it is a transaction in a separate thread.



How to join the transaction and rollback the entity? Also, is this a good way to test this behavior?



I replicated this here: https://github.com/yraydhitya/spring-kafka-data-jdbc-mysql-transactional-test










share|improve this question














I have a @KafkaListener which saves a new entity using Spring Data JDBC Repository. The saved entity is not rollbacked on a test annotated with @Transactional



@Service
public class KafkaConsumer {

private final EntityRepository entityRepository;

private CountDownLatch countDownLatch;

public KafkaConsumer(EntityRepository entityRepository) {
this.entityRepository = entityRepository;
}

@KafkaListener(topics = "topic", groupId = "group-id")
public void consume(String id) {
Entity entity = new Entity();
entity.setId(id);
entity.setNew(true);

entityRepository.save(entity);

if (countDownLatch != null) countDownLatch.countDown();
}

public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}

@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
public class SpringKafkaDataJdbcMysqlTransactionalTestApplicationTests {

@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, 1, "topic");

@Autowired
private KafkaTemplate kafkaTemplate;

@Autowired
private KafkaConsumer kafkaConsumer;

@Autowired
private EntityRepository entityRepository;

@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaRule.getEmbeddedKafka().getBrokersAsString());
}

@Test
public void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);

kafkaConsumer.setCountDownLatch(countDownLatch);

String id = UUID.randomUUID().toString();

kafkaTemplate.send("topic", id);

assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();

assertThat(entityRepository.findById(id)).isNotEmpty();
}

}


I am running this using a MySQL database and I am expecting the saved entity to be rollbacked. Looks like the transaction started by the repository is not joining the transaction started by the transaction in the test because it is a transaction in a separate thread.



How to join the transaction and rollback the entity? Also, is this a good way to test this behavior?



I replicated this here: https://github.com/yraydhitya/spring-kafka-data-jdbc-mysql-transactional-test







spring spring-data spring-jdbc transactional spring-kafka






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Jan 1 at 12:31









yraydhityayraydhitya

62




62








  • 1





    Are you using an INNODB table? (required for transactions),

    – Gary Russell
    Jan 1 at 16:44






  • 1





    That’s because @KafkaListener method is not a part of transaction around test() method. It is performed in a different listener container thread

    – Artem Bilan
    Jan 1 at 19:21






  • 1





    Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.

    – yraydhitya
    Jan 2 at 12:05






  • 1





    No, there is no. That's because transactions are thread-bound.

    – Artem Bilan
    Jan 2 at 15:03














  • 1





    Are you using an INNODB table? (required for transactions),

    – Gary Russell
    Jan 1 at 16:44






  • 1





    That’s because @KafkaListener method is not a part of transaction around test() method. It is performed in a different listener container thread

    – Artem Bilan
    Jan 1 at 19:21






  • 1





    Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.

    – yraydhitya
    Jan 2 at 12:05






  • 1





    No, there is no. That's because transactions are thread-bound.

    – Artem Bilan
    Jan 2 at 15:03








1




1





Are you using an INNODB table? (required for transactions),

– Gary Russell
Jan 1 at 16:44





Are you using an INNODB table? (required for transactions),

– Gary Russell
Jan 1 at 16:44




1




1





That’s because @KafkaListener method is not a part of transaction around test() method. It is performed in a different listener container thread

– Artem Bilan
Jan 1 at 19:21





That’s because @KafkaListener method is not a part of transaction around test() method. It is performed in a different listener container thread

– Artem Bilan
Jan 1 at 19:21




1




1





Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.

– yraydhitya
Jan 2 at 12:05





Yes, I am using an InnoDB table. Is there a way to make the transaction in the listener container thread joins the test transaction? I ended up with truncating the table and it made my other tests unstable.

– yraydhitya
Jan 2 at 12:05




1




1





No, there is no. That's because transactions are thread-bound.

– Artem Bilan
Jan 2 at 15:03





No, there is no. That's because transactions are thread-bound.

– Artem Bilan
Jan 2 at 15:03












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53995445%2fsaved-entity-in-a-spring-kafka-listener-is-not-rollbacked-on-a-transactional-tes%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53995445%2fsaved-entity-in-a-spring-kafka-listener-is-not-rollbacked-on-a-transactional-tes%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

MongoDB - Not Authorized To Execute Command

How to fix TextFormField cause rebuild widget in Flutter

in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith