Chaining Kafka and Database Transactions with Spring Boot: An In-Depth Look

Raphael De Lio
8 min readOct 28, 2023

--

Twitter | LinkedIn | YouTube | Instagram

In this article, we explore the complexities of handling transactions across Kafka and relational databases in Spring Boot applications.

The traditional approach, using ChainedKafkaTransactionManager, has been deprecated in 2021. But worry not, the solution is elegantly simple thanks to the guidance from Gary Russell, the leading contributor to the Spring Kafka project.

We'll delve into consumer-initiated transactions and see how to configure our application properly for this use case. We will also demystify what happens "Under the Boot"—by observing logs to understand how transactions are managed and committed.

The Solution

Well, the solution is simple. Fortunately, Gary Russel, the main Spring Kafka contributor, didn't leave us hanging here.

In the ticket in which he deprecated the Chained Kafka Transaction Manager, he also led us to what is the recommended approach for anyone who relied on the deprecated class:

For consumer-initiated transactions, annotate the listener method with @Transactional; the container (configured with a KTM) starts the kakfa transaction and the transaction interceptor starts the DB transaction. This provides virtually the same functionality as the Chained KTM in that the DB transaction will commit or roll back just before the Kafka transaction.

For producer-initiated transactions, Transaction Synchronization already works; if another transaction is in process when the transactional KafkaTemplate is called, the template will synchronize the Kafka transaction with the existing transaction.

And he also left documentation in the repository with examples on how to implement it. The solution is simple.

Enable Kafka Transaction Manager

First of all, we need to enable Spring Kafka's Kafka Transaction Manager. We can do it by simply setting the transactional id property in our application.properties:

spring.kafka.producer.transaction-id-prefix=tx-

By setting this property, Spring Boot will automatically configure the Kafka Transaction Manager. (Documentation)

Configure your Listener Containers to use Kafka Transaction Manager

Spring Boot will also configure our listener containers to use the auto configured KafkaTransactionManager. This will ensure that our listeners will automatically begin a Kafka transaction when receiving a new message.

Using the @Transactional Annotation

Here, when implementing our listener, we must ensure we are also annotating the method with the @Transactional annotation. This annotation will tell our application to begin a JPA transaction upon receiving a message.

@KafkaListener(id = "group1", topics = "topic1")
@Transactional("transactionManager")
public void listen1(String in) {
log.info("Received from topic1: {}", in);

log.info("Sending to topic2: {}", in.toUpperCase());
kafkaTemplate.send("topic2", in.toUpperCase());

log.info("Writing to database: {}", in);
demoRepository.save(
DemoEntity.builder()
.name(in)
.timestamp(System.currentTimeMillis())
.build()
);
}

The trick here is giving transactionManager as the value for the @Transactional annotation. This is because there will be two transaction managers available: transactionManager and kafkaTransactionManager.

The transactionManager bean is an instance of JpaTransactionManager while the kafkaTransactionManager bean is an instance of KafkaTransactionManager.

And the reason why we want to give the transactionManager as the value for the @Transactional annotation is because our KafkaMessageListenerContainer is already creating transactions for us on consuption. Whenever a new message comes in, it will automatically begin a Kafka transaction before it starts running our method.

Therefore, all we have to do is tell Spring Boot to, before our method is run, to also begin a transaction, but at this time, for the JpaTransactionManager.

And that’s it!

There’s nothing else we have to do. When receiving new messages, Spring Kafka will automatically begin a Kafka Transaction and after that, the @Transactional annotation will create a JPA Transaction. If the JPA Transaction fails, the Kafka Transaction will also fail and be rolled back.

Under the Boot

Let’s not just take my word for granted. Let’s also look at the logs under different scenarios to see what’s actually going on in our application.

First of all, let’s tell Spring Boot to change the logging level of a few classes:

logging:
level:
org:
apache:
kafka: error
orm:
jpa:
JpaTransactionManager: trace
kafka:
transaction: trace
listener:
KafkaMessageListenerContainer: error

Here, we want to mute some noise generated by the org.springframework.kafka.listener.KafkaMessageListenerContainer class and the org.apache.kafka package. We do it by setting them to error.

Then, we want to actually see more information about the JpaTransactionManager class and the org.springframework.transaction and the org.springframework.kafka.transaction packages. Therefore, we set them to trace.

Now, when our application gets started, we want to send a message to Kafka:

@Bean
public ApplicationRunner sendKafkaMessage(KafkaTemplate<String, String> template) {
return args -> template.executeInTransaction(t -> t.send("topic1", "test"));
}

This will trigger our listener method and we will be able to see what’s actually going on under the “Boot”.

@Transactional(“transactionManager”)

Let’s try first by using the transactionManager as the value of our @Transactional annotation.

These are the logs that were generated:

14:28:33.833 DEBUG o.s.k.t.KafkaTransactionManager - Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
14:28:33.834 DEBUG o.s.k.t.KafkaTransactionManager - Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@685b500f]]
14:28:33.839 DEBUG o.s.o.j.JpaTransactionManager - Creating new transaction with name [demo.kafka.demokafkatransaction.DemoListener.listen1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; 'transactionManager'
14:28:33.839 DEBUG o.s.o.j.JpaTransactionManager - Opened new EntityManager [SessionImpl(151636314<open>)] for JPA transaction
14:28:33.842 DEBUG o.s.o.j.JpaTransactionManager - Exposing JPA transaction as JDBC [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@611e4b08]
14:28:33.842 INFO d.k.d.DemoListener - Received from topic1: test
14:28:33.842 INFO d.k.d.DemoListener - Sending to topic2: TEST
14:28:33.847 INFO d.k.d.DemoListener - Writing to database: test
14:28:33.848 DEBUG o.s.o.j.JpaTransactionManager - Found thread-bound EntityManager [SessionImpl(151636314<open>)] for JPA transaction
14:28:33.848 DEBUG o.s.o.j.JpaTransactionManager - Participating in existing transaction
14:28:33.856 DEBUG o.s.o.j.JpaTransactionManager - Initiating transaction commit
14:28:33.856 DEBUG o.s.o.j.JpaTransactionManager - Committing JPA transaction on EntityManager [SessionImpl(151636314<open>)]
14:28:33.868 DEBUG o.s.o.j.JpaTransactionManager - Closing JPA EntityManager [SessionImpl(151636314<open>)] after transaction
14:28:33.986 DEBUG o.s.k.t.KafkaTransactionManager - Initiating transaction commit

You can see that our logs start with KafkaTransactionManager creating a new Kafka Transaction:

KafkaTransactionManager - Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT

Right after the Kafka Transaction is created, the JpaTransactionManager creates a Database Transaction as well:

JpaTransactionManager - Creating new transaction with name [demo.kafka.demokafkatransaction.DemoListener.listen1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; 'transactionManager'

Only after the Database Transaction is created, we will see our actual implementation being run:

DemoListener - Received from topic1: test
DemoListener - Sending to topic2: TEST
DemoListener - Writing to database: test

As we start writing into the Database, we can see that the JpaTransactionManager discovers that a Database Transaction is already running and that it will take part into this tranction:

JpaTransactionManager - Participating in existing transaction

For each operation in the database we would see a line like the one above. And this is important because we want all of the operations to run within the same transaction.

As we reach the end of our method, we will see that the JpaTransactionManager will init the commit of its transaction:

JpaTransactionManager - Initiating transaction commit

And right after that, the KafkaTransactionManager will also commit its own transaction:

KafkaTransactionManager - Initiating transaction commit

That’s it. We can through the logs that our Kafka Transaction is wrapping the Database Transaction. It starts before the Database Transaction starts and is commited after the Database Transaction is commited.

@Transactional(“kafkaTransactionManager”)

Now let’s see what would happen if instead we give our @Transactional annotation the KafkaTransactionManager as its value.

Once again, let’s analyze the generated logs after we run our application:

14:30:29.741 DEBUG o.s.k.t.KafkaTransactionManager - Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
14:30:29.742 DEBUG o.s.k.t.KafkaTransactionManager - Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@f8fd0f4]]
14:30:29.751 DEBUG o.s.k.t.KafkaTransactionManager - Participating in existing transaction
14:30:29.751 INFO d.k.d.DemoListener - Received from topic1: test
14:30:29.751 INFO d.k.d.DemoListener - Sending to topic2: TEST
14:30:29.754 INFO d.k.d.DemoListener - Writing to database: test
14:30:29.757 DEBUG o.s.o.j.JpaTransactionManager - Creating new transaction with name [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
14:30:29.757 DEBUG o.s.o.j.JpaTransactionManager - Opened new EntityManager [SessionImpl(530662483<open>)] for JPA transaction
14:30:29.759 DEBUG o.s.o.j.JpaTransactionManager - Exposing JPA transaction as JDBC [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@74c4c7d0]
14:30:29.767 DEBUG o.s.o.j.JpaTransactionManager - Initiating transaction commit
14:30:29.767 DEBUG o.s.o.j.JpaTransactionManager - Committing JPA transaction on EntityManager [SessionImpl(530662483<open>)]
14:30:29.780 DEBUG o.s.o.j.JpaTransactionManager - Closing JPA EntityManager [SessionImpl(530662483<open>)] after transaction
14:30:29.895 DEBUG o.s.k.t.KafkaTransactionManager - Initiating transaction commit

Our logs start once again with KafkaTransactionManager creating a new transaction:

KafkaTransactionManager - Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT

However, we can already see a differente right after the Kafka Transaction is created. This time, the KafkaTransactionManager says that it’s participating in an existing transaction:

KafkaTransactionManager - Participating in existing transaction

This is because our Kafka Transaction was first created by the KafkaMessageListenerContainer. After it’s creation, the @Transactional annotation tries to create another transaction, but fortunately, it identified that a transaction was already active and decided to participate in the active one.

Then, we can also see that no Database Transaction is created. Instead, our implementation starts executing right after the Kafka Transaction is created:

DemoListener - Received from topic1: test
DemoListener - Sending to topic2: TEST
DemoListener - Writing to database: test

And only once our implementation tries to write into the database, a Database Transaction is created:

JpaTransactionManager - Creating new transaction with name [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT

And the following lines of code are very important because we can see that it is not our implementation that is triggering the creation of a Database Transaction, it is the save() method from the JpaRepository.

And it’s creating an actual transaction instead of participating in an existing one. This means that this will be commited as soon as the save() method is completed:

JpaTransactionManager - Initiating transaction commit

If we were doing multiple operations in the database, we would see a new Database Transaction being created and commited for each one of them.

And finally, after all Database Transaction have been commited. We see that the KafkaTransactionManager commits its transaction:

KafkaTransactionManager - Initiating transaction commit

Even though the Kafka Transaction would have failed if any of the Database Transactions failed, the previous successful Database Transactions wouldn’t have rolled back. This is not the behaviour we are looking for.

Conclusion

In conclusion, handling distributed transactions in a microservice architecture backed by Apache Kafka and Spring Boot doesn’t have to be a daunting task. Though the deprecation of ChainedKafkaTransactionManager may have initially created confusion, the solutions provided by Spring Kafka offer a streamlined approach to likely achieve atomicity across both Kafka and database transactions.

By configuring a few properties and using the @Transactional annotation wisely, we can ensure that the Kafka transaction is only commited if the database transaction is also commited.

In my next story, I want to explore the Transactional Outbox Pattern, a pattern designed to ensure even stronger guarantees for transactional consistency in a distributed environment. Stay tuned!

GitHub

Source

Contribute

Writing takes time and effort. I love writing and sharing knowledge, but I also have bills to pay. If you like my work, please, consider donating through Buy Me a Coffee: https://www.buymeacoffee.com/RaphaelDeLio

Or by sending me BitCoin: 1HjG7pmghg3Z8RATH4aiUWr156BGafJ6Zw

Follow Me on Social Media

Stay connected and dive deeper into the world of Kotlin with me! Follow my journey across all major social platforms for exclusive content, tips, and discussions.

Twitter | LinkedIn | YouTube | Instagram

--

--