- Notifications
You must be signed in to change notification settings - Fork246
Description
Describe the bug
When trying to commit offsets with a transactional producer doing await producer.send_offsets_to_transaction(offsets, group_id)
there is a sort of race condition where it seems that two background tasks are trying to do the same.
In my application I am using always the sametransactional producer
with always the sametransactional_id
. The commit is being perform one by one rather than in batches.
Traceback
03/11/2025 02:48:03 PM Unexpected errorin sender routineTraceback (most recent call last): File"/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 176,in _sender_routinetask.result() File"\examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 367,in _do_txn_offset_commit await handler.do(node_id) File"/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 413,indo retry_backoff = self.handle_response(resp) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File"/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py", line 643,in handle_response offset = self._offsets[tp].offset~~~~~~~~~~~~~^^^^KeyError: TopicPartition(topic='local--kstreams-json', partition=0)
I have added some logs inSender.py
module and I can see thatSender._do_txn_offset_commit
is being called twice, trying tocommit
the same offset twice for theTopicPartition
(in this exampleoffset 62
). As you can see,offsets
is adict
with id4345275008
which is shared by the 2 tasks.
The first task callsTxnOffsetCommitHandler.handle_response
which works fine, then it is called again but then theoffsets dict
is empty, which causes the code to crash when it doesoffset = self._offsets[tp].offset
.
BegingSender._do_txn_offset_commitwithoffsets {TopicPartition(topic='local--kstreams-json',partition=0):OffsetAndMetadata(offset=62,metadata='')}.DictOffsetsid4345275008EndingSender._do_txn_offset_commitwithoffsets {TopicPartition(topic='local--kstreams-json',partition=0):OffsetAndMetadata(offset=62,metadata='')}.DictOffsetsid4345275008BegingSender._do_txn_offset_commitwithoffsets {TopicPartition(topic='local--kstreams-json',partition=0):OffsetAndMetadata(offset=62,metadata='')}.DictOffsetsid4345275008EndingSender._do_txn_offset_commitwithoffsets {TopicPartition(topic='local--kstreams-json',partition=0):OffsetAndMetadata(offset=62,metadata='')}.DictOffsetsid4345275008HandleresponseinTxnOffsetCommitHandler.handle_responseforoffsets {TopicPartition(topic='local--kstreams-json',partition=0):OffsetAndMetadata(offset=62,metadata='')}.DictOffsetsid434527500803/11/202503:00:00PMCommitingoffsets {TopicPartition(topic='local--kstreams-json',partition=0):62}forgroupmy-groupHandleresponseinTxnOffsetCommitHandler.handle_responseforoffsets {}.Offsetsid434527500803/11/202503:00:00PMUnexpectederrorinsenderroutineTraceback (mostrecentcalllast):File"/Users/marcosschroh/Projects/kstreams/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py",line176,in_sender_routinetask.result()File"/Users/marcosschroh/Projects/kstreams/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py",line366,in_do_txn_offset_commitawaithandler.do(node_id)File"/Users/marcosschroh/Projects/kstreams/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py",line412,indoretry_backoff=self.handle_response(resp)^^^^^^^^^^^^^^^^^^^^^^^^^^File"/Users/marcosschroh/Projects/kstreams/examples/transactions/.venv/lib/python3.12/site-packages/aiokafka/producer/sender.py",line642,inhandle_responseoffset=self._offsets[tp].offset~~~~~~~~~~~~~^^^^
Expected behaviour
The offset commit with a transactional producer should always work and wait for previous commit to finish in order to start a new one.
Environment (please complete the following information):
- aiokafka version: 0.12.0
- Kafka Broker version: 3.7