Skip to content

Commit f6e8219

Browse files
authored
Merge pull request #72 from Yelp/u/jhewland/KAFKA-36342-skip-control-batches
fetcher: Skip transactional control batches
2 parents fb00adb + 4a320f9 commit f6e8219

File tree

2 files changed

+8
-1
lines changed

2 files changed

+8
-1
lines changed

kafka/consumer/fetcher.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,13 @@ def _unpack_message_set(self, tp, records):
464464
except AttributeError:
465465
pass
466466

467+
# Control messages are used to enable transactions in Kafka and are generated by the
468+
# broker. Clients should not return control batches (ie. those with this bit set) to
469+
# applications. (since 0.11.0.0)
470+
if getattr(batch, "is_control_batch", False):
471+
batch = records.next_batch()
472+
continue
473+
467474
for record in batch:
468475
key_size = len(record.key) if record.key is not None else -1
469476
value_size = len(record.value) if record.value is not None else -1

kafka/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '1.4.7.post2'
1+
__version__ = '1.4.7.post3'

0 commit comments

Comments
 (0)