Skip to content

Commit ee26c3f

Browse files
authored
Ignore MetadataResponses with empty broker list (#1506)
1 parent c9d783a commit ee26c3f

File tree

3 files changed

+28
-1
lines changed

3 files changed

+28
-1
lines changed

kafka/cluster.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ def update_metadata(self, metadata):
214214
return self.failed_update(error)
215215

216216
if not metadata.brokers:
217-
log.warning("No broker metadata found in MetadataResponse")
217+
log.warning("No broker metadata found in MetadataResponse -- ignoring.")
218+
return self.failed_update(Errors.MetadataEmptyBrokerList(metadata))
218219

219220
_new_brokers = {}
220221
for broker in metadata.brokers:

kafka/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ class StaleMetadata(KafkaError):
5454
invalid_metadata = True
5555

5656

57+
class MetadataEmptyBrokerList(KafkaError):
58+
retriable = True
59+
60+
5761
class UnrecognizedBrokerVersion(KafkaError):
5862
pass
5963

test/test_cluster.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# pylint: skip-file
2+
from __future__ import absolute_import
3+
4+
import pytest
5+
6+
from kafka.cluster import ClusterMetadata
7+
from kafka.protocol.metadata import MetadataResponse
8+
9+
10+
def test_empty_broker_list():
11+
cluster = ClusterMetadata()
12+
assert len(cluster.brokers()) == 0
13+
14+
cluster.update_metadata(MetadataResponse[0](
15+
[(0, 'foo', 12), (1, 'bar', 34)], []))
16+
assert len(cluster.brokers()) == 2
17+
18+
# empty broker list response should be ignored
19+
cluster.update_metadata(MetadataResponse[0](
20+
[], # empty brokers
21+
[(17, 'foo', []), (17, 'bar', [])])) # topics w/ error
22+
assert len(cluster.brokers()) == 2

0 commit comments

Comments
 (0)