Skip to content

Commit 5e508ed

Browse files
authored
Merge pull request #73 from Yelp/u/georgios/KAFKA-36342_fix_control_batches_bug
Fix control batch bug
2 parents f6e8219 + 84f985b commit 5e508ed

File tree

2 files changed

+7
-6
lines changed

2 files changed

+7
-6
lines changed

kafka/consumer/fetcher.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -821,11 +821,12 @@ def _parse_fetched_data(self, completed_fetch):
821821
" offset %d to buffered record list", tp,
822822
position)
823823
unpacked = list(self._unpack_message_set(tp, records))
824-
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
825-
last_offset = unpacked[-1].offset
826-
self._sensors.records_fetch_lag.record(highwater - last_offset)
827-
num_bytes = records.valid_bytes()
828-
records_count = len(unpacked)
824+
if unpacked:
825+
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
826+
last_offset = unpacked[-1].offset
827+
self._sensors.records_fetch_lag.record(highwater - last_offset)
828+
num_bytes = records.valid_bytes()
829+
records_count = len(unpacked)
829830
elif records.size_in_bytes() > 0:
830831
# we did not read a single message from a non-empty
831832
# buffer because that message's size is larger than

kafka/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '1.4.7.post3'
1+
__version__ = '1.4.7.post4'

0 commit comments

Comments
 (0)