Skip to content

Commit 805b52a

Browse files
author
Patrick Lucas
committed
Improve error handling and tests w.r.t. codecs
Add function kafka.protocol.create_message_set() that takes a list of payloads and a codec and returns a message set with the desired encoding. Introduce kafka.common.UnsupportedCodecError, raised if an unknown codec is specified. Include a test for the new function.
1 parent 39796ec commit 805b52a

File tree

4 files changed

+91
-27
lines changed

4 files changed

+91
-27
lines changed

kafka/common.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ class ConsumerNoMoreData(KafkaError):
170170
class ProtocolError(KafkaError):
171171
pass
172172

173+
174+
class UnsupportedCodecError(KafkaError):
175+
pass
176+
177+
173178
kafka_errors = {
174179
-1 : UnknownError,
175180
1 : OffsetOutOfRangeError,
@@ -187,6 +192,7 @@ class ProtocolError(KafkaError):
187192
13 : StaleLeaderEpochCodeError,
188193
}
189194

195+
190196
def check_error(response):
191197
error = kafka_errors.get(response.error)
192198
if error:

kafka/producer.py

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,11 @@
99
from itertools import cycle
1010
from multiprocessing import Queue, Process
1111

12-
from kafka.common import ProduceRequest, TopicAndPartition
13-
from kafka.partitioner import HashedPartitioner
14-
from kafka.protocol import (
15-
CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS,
16-
create_message, create_gzip_message, create_snappy_message,
12+
from kafka.common import (
13+
ProduceRequest, TopicAndPartition, UnsupportedCodecError
1714
)
15+
from kafka.partitioner import HashedPartitioner
16+
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
1817

1918
log = logging.getLogger("kafka")
2019

@@ -66,13 +65,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
6665
# Send collected requests upstream
6766
reqs = []
6867
for topic_partition, msg in msgset.items():
69-
if codec == CODEC_GZIP:
70-
messages = [create_gzip_message(msg)]
71-
elif codec == CODEC_SNAPPY:
72-
messages = [create_snappy_message(msg)]
73-
else:
74-
messages = [create_message(m) for m in msg]
75-
68+
messages = create_message_set(msg, codec)
7669
req = ProduceRequest(topic_partition.topic,
7770
topic_partition.partition,
7871
messages)
@@ -132,7 +125,9 @@ def __init__(self, client, async=False,
132125

133126
if codec is None:
134127
codec = CODEC_NONE
135-
assert codec in ALL_CODECS
128+
elif codec not in ALL_CODECS:
129+
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
130+
136131
self.codec = codec
137132

138133
if self.async:
@@ -159,13 +154,7 @@ def send_messages(self, topic, partition, *msg):
159154
self.queue.put((TopicAndPartition(topic, partition), m))
160155
resp = []
161156
else:
162-
if self.codec == CODEC_GZIP:
163-
messages = [create_gzip_message(msg)]
164-
elif self.codec == CODEC_SNAPPY:
165-
messages = [create_snappy_message(msg)]
166-
else:
167-
messages = [create_message(m) for m in msg]
168-
157+
messages = create_message_set(msg, self.codec)
169158
req = ProduceRequest(topic, partition, messages)
170159
try:
171160
resp = self.client.send_produce_request([req], acks=self.req_acks,

kafka/protocol.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
1010
ProduceResponse, FetchResponse, OffsetResponse,
1111
OffsetCommitResponse, OffsetFetchResponse, ProtocolError,
12-
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
12+
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall,
13+
UnsupportedCodecError
1314
)
1415
from kafka.util import (
1516
read_short_string, read_int_string, relative_unpack,
@@ -568,3 +569,19 @@ def create_snappy_message(payloads, key=None):
568569
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
569570

570571
return Message(0, 0x00 | codec, key, snapped)
572+
573+
574+
def create_message_set(messages, codec=CODEC_NONE):
575+
"""Create a message set using the given codec.
576+
577+
If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
578+
return a list containing a single codec-encoded message.
579+
"""
580+
if codec == CODEC_NONE:
581+
return [create_message(m) for m in messages]
582+
elif codec == CODEC_GZIP:
583+
return [create_gzip_message(messages)]
584+
elif codec == CODEC_SNAPPY:
585+
return [create_snappy_message(messages)]
586+
else:
587+
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)

test/test_protocol.py

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,30 @@
1+
import contextlib
2+
from contextlib import contextmanager
13
import struct
24
import unittest2
35

6+
import mock
7+
from mock import sentinel
8+
49
from kafka import KafkaClient
510
from kafka.common import (
611
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
712
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
813
ProduceRequest, FetchRequest, Message, ChecksumError,
9-
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
10-
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
11-
TopicAndPartition, KafkaUnavailableError, ProtocolError,
12-
LeaderUnavailableError, PartitionUnavailableError
14+
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage,
15+
BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError,
16+
ProtocolError, LeaderUnavailableError, PartitionUnavailableError,
17+
UnsupportedCodecError
1318
)
1419
from kafka.codec import (
1520
has_snappy, gzip_encode, gzip_decode,
1621
snappy_encode, snappy_decode
1722
)
23+
import kafka.protocol
1824
from kafka.protocol import (
19-
create_gzip_message, create_message, create_snappy_message, KafkaProtocol,
20-
ATTRIBUTE_CODEC_MASK, CODEC_GZIP, CODEC_SNAPPY
25+
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
26+
create_message, create_gzip_message, create_snappy_message,
27+
create_message_set
2128
)
2229

2330
class TestProtocol(unittest2.TestCase):
@@ -691,3 +698,48 @@ def test_decode_offset_fetch_response(self):
691698
OffsetFetchResponse(topic = 'topic1', partition = 2, offset = 4, error = 0, metadata = "meta"),
692699
OffsetFetchResponse(topic = 'topic1', partition = 4, offset = 8, error = 0, metadata = "meta"),
693700
]))
701+
702+
@contextmanager
703+
def mock_create_message_fns(self):
704+
patches = contextlib.nested(
705+
mock.patch.object(kafka.protocol, "create_message",
706+
return_value=sentinel.message),
707+
mock.patch.object(kafka.protocol, "create_gzip_message",
708+
return_value=sentinel.gzip_message),
709+
mock.patch.object(kafka.protocol, "create_snappy_message",
710+
return_value=sentinel.snappy_message),
711+
)
712+
713+
with patches:
714+
yield
715+
716+
def test_create_message_set(self):
717+
messages = [1, 2, 3]
718+
719+
# Default codec is CODEC_NONE. Expect list of regular messages.
720+
expect = [sentinel.message] * len(messages)
721+
with self.mock_create_message_fns():
722+
message_set = create_message_set(messages)
723+
self.assertEqual(message_set, expect)
724+
725+
# CODEC_NONE: Expect list of regular messages.
726+
expect = [sentinel.message] * len(messages)
727+
with self.mock_create_message_fns():
728+
message_set = create_message_set(messages, CODEC_NONE)
729+
self.assertEqual(message_set, expect)
730+
731+
# CODEC_GZIP: Expect list of one gzip-encoded message.
732+
expect = [sentinel.gzip_message]
733+
with self.mock_create_message_fns():
734+
message_set = create_message_set(messages, CODEC_GZIP)
735+
self.assertEqual(message_set, expect)
736+
737+
# CODEC_SNAPPY: Expect list of one snappy-encoded message.
738+
expect = [sentinel.snappy_message]
739+
with self.mock_create_message_fns():
740+
message_set = create_message_set(messages, CODEC_SNAPPY)
741+
self.assertEqual(message_set, expect)
742+
743+
# Unknown codec should raise UnsupportedCodecError.
744+
with self.assertRaises(UnsupportedCodecError):
745+
create_message_set(messages, -1)

0 commit comments

Comments
 (0)