SuspendApp with Kafka
When streaming records from Kafka we need to commit (acknowledge) the offset of the records we've processed. The official recommendation for doing this is committing offsets in batches, so we typically don't send the commit event to Kafka for every processed record. Instead, we commit the offset every 5 seconds (or every x records, 5s is default).
Imagine the application getting stopped after 4,5 seconds, either by Ctrl+C or K8S or another type of containerization. We could've processed thousands, or tens of thousands of events. If we don't commit these offsets before shutting down we'd have to re-process all the events.
We can easily prevent this with SuspendApp, and kotlin-kafka
or reactor-kafka.
Both these high-level Kafka libraries guarantee committing offsets upon termination of the stream, this includes
cancellation!
In the example below, all calls to acknowledge
will be committed to Kafka before the SuspendApp terminates when
receiving SIGTERM
or SIGINT
.
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import org.apache.kafka.common.serialization.StringDeserializer
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import arrow.continuations.SuspendApp
fun main() = SuspendApp {
val settings: ReceiverSettings<Nothing, String> = ReceiverSettings(
bootstrapServers = bootstrapServers,
groupId = "group-id",
valueDeserializer = StringDeserializer()
)
KafkaReceiver(settings)
.receive(topicName)
.map { record ->
println("${record.key()} -> ${record.value()}")
record.offset.acknowledge()
}.collect()
}