Skip to content

Commit ae6b49a

Browse files
committed
Merge pull request dpkp#166 from patricklucas/teach_producers_about_compression
Add 'codec' parameter to Producer
2 parents b13fa3f + 805b52a commit ae6b49a

File tree

4 files changed

+119
-31
lines changed

4 files changed

+119
-31
lines changed

kafka/common.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ class ConsumerNoMoreData(KafkaError):
170170
class ProtocolError(KafkaError):
171171
pass
172172

173+
174+
class UnsupportedCodecError(KafkaError):
175+
pass
176+
177+
173178
kafka_errors = {
174179
-1 : UnknownError,
175180
1 : OffsetOutOfRangeError,
@@ -187,6 +192,7 @@ class ProtocolError(KafkaError):
187192
13 : StaleLeaderEpochCodeError,
188193
}
189194

195+
190196
def check_error(response):
191197
error = kafka_errors.get(response.error)
192198
if error:

kafka/producer.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
from itertools import cycle
1010
from multiprocessing import Queue, Process
1111

12-
from kafka.common import ProduceRequest, TopicAndPartition
12+
from kafka.common import (
13+
ProduceRequest, TopicAndPartition, UnsupportedCodecError
14+
)
1315
from kafka.partitioner import HashedPartitioner
14-
from kafka.protocol import create_message
16+
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
1517

1618
log = logging.getLogger("kafka")
1719

@@ -21,7 +23,7 @@
2123
STOP_ASYNC_PRODUCER = -1
2224

2325

24-
def _send_upstream(queue, client, batch_time, batch_size,
26+
def _send_upstream(queue, client, codec, batch_time, batch_size,
2527
req_acks, ack_timeout):
2628
"""
2729
Listen on the queue for a specified number of messages or till
@@ -62,7 +64,8 @@ def _send_upstream(queue, client, batch_time, batch_size,
6264

6365
# Send collected requests upstream
6466
reqs = []
65-
for topic_partition, messages in msgset.items():
67+
for topic_partition, msg in msgset.items():
68+
messages = create_message_set(msg, codec)
6669
req = ProduceRequest(topic_partition.topic,
6770
topic_partition.partition,
6871
messages)
@@ -102,6 +105,7 @@ class Producer(object):
102105
def __init__(self, client, async=False,
103106
req_acks=ACK_AFTER_LOCAL_WRITE,
104107
ack_timeout=DEFAULT_ACK_TIMEOUT,
108+
codec=None,
105109
batch_send=False,
106110
batch_send_every_n=BATCH_SEND_MSG_COUNT,
107111
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
@@ -119,11 +123,19 @@ def __init__(self, client, async=False,
119123
self.req_acks = req_acks
120124
self.ack_timeout = ack_timeout
121125

126+
if codec is None:
127+
codec = CODEC_NONE
128+
elif codec not in ALL_CODECS:
129+
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
130+
131+
self.codec = codec
132+
122133
if self.async:
123134
self.queue = Queue() # Messages are sent through this queue
124135
self.proc = Process(target=_send_upstream,
125136
args=(self.queue,
126137
self.client.copy(),
138+
self.codec,
127139
batch_send_every_t,
128140
batch_send_every_n,
129141
self.req_acks,
@@ -139,11 +151,10 @@ def send_messages(self, topic, partition, *msg):
139151
"""
140152
if self.async:
141153
for m in msg:
142-
self.queue.put((TopicAndPartition(topic, partition),
143-
create_message(m)))
154+
self.queue.put((TopicAndPartition(topic, partition), m))
144155
resp = []
145156
else:
146-
messages = [create_message(m) for m in msg]
157+
messages = create_message_set(msg, self.codec)
147158
req = ProduceRequest(topic, partition, messages)
148159
try:
149160
resp = self.client.send_produce_request([req], acks=self.req_acks,
@@ -168,7 +179,7 @@ def stop(self, timeout=1):
168179

169180
class SimpleProducer(Producer):
170181
"""
171-
A simple, round-robbin producer. Each message goes to exactly one partition
182+
A simple, round-robin producer. Each message goes to exactly one partition
172183
173184
Params:
174185
client - The Kafka client instance to use
@@ -189,14 +200,15 @@ class SimpleProducer(Producer):
189200
def __init__(self, client, async=False,
190201
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
191202
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
203+
codec=None,
192204
batch_send=False,
193205
batch_send_every_n=BATCH_SEND_MSG_COUNT,
194206
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
195207
random_start=False):
196208
self.partition_cycles = {}
197209
self.random_start = random_start
198210
super(SimpleProducer, self).__init__(client, async, req_acks,
199-
ack_timeout, batch_send,
211+
ack_timeout, codec, batch_send,
200212
batch_send_every_n,
201213
batch_send_every_t)
202214

@@ -241,6 +253,7 @@ class KeyedProducer(Producer):
241253
def __init__(self, client, partitioner=None, async=False,
242254
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
243255
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
256+
codec=None,
244257
batch_send=False,
245258
batch_send_every_n=BATCH_SEND_MSG_COUNT,
246259
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
@@ -250,7 +263,7 @@ def __init__(self, client, partitioner=None, async=False,
250263
self.partitioners = {}
251264

252265
super(KeyedProducer, self).__init__(client, async, req_acks,
253-
ack_timeout, batch_send,
266+
ack_timeout, codec, batch_send,
254267
batch_send_every_n,
255268
batch_send_every_t)
256269

kafka/protocol.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
1010
ProduceResponse, FetchResponse, OffsetResponse,
1111
OffsetCommitResponse, OffsetFetchResponse, ProtocolError,
12-
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
12+
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall,
13+
UnsupportedCodecError
1314
)
1415
from kafka.util import (
1516
read_short_string, read_int_string, relative_unpack,
@@ -18,6 +19,12 @@
1819

1920
log = logging.getLogger("kafka")
2021

22+
ATTRIBUTE_CODEC_MASK = 0x03
23+
CODEC_NONE = 0x00
24+
CODEC_GZIP = 0x01
25+
CODEC_SNAPPY = 0x02
26+
ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)
27+
2128

2229
class KafkaProtocol(object):
2330
"""
@@ -32,11 +39,6 @@ class KafkaProtocol(object):
3239
OFFSET_COMMIT_KEY = 8
3340
OFFSET_FETCH_KEY = 9
3441

35-
ATTRIBUTE_CODEC_MASK = 0x03
36-
CODEC_NONE = 0x00
37-
CODEC_GZIP = 0x01
38-
CODEC_SNAPPY = 0x02
39-
4042
###################
4143
# Private API #
4244
###################
@@ -150,17 +152,17 @@ def _decode_message(cls, data, offset):
150152
(key, cur) = read_int_string(data, cur)
151153
(value, cur) = read_int_string(data, cur)
152154

153-
codec = att & KafkaProtocol.ATTRIBUTE_CODEC_MASK
155+
codec = att & ATTRIBUTE_CODEC_MASK
154156

155-
if codec == KafkaProtocol.CODEC_NONE:
157+
if codec == CODEC_NONE:
156158
yield (offset, Message(magic, att, key, value))
157159

158-
elif codec == KafkaProtocol.CODEC_GZIP:
160+
elif codec == CODEC_GZIP:
159161
gz = gzip_decode(value)
160162
for (offset, msg) in KafkaProtocol._decode_message_set_iter(gz):
161163
yield (offset, msg)
162164

163-
elif codec == KafkaProtocol.CODEC_SNAPPY:
165+
elif codec == CODEC_SNAPPY:
164166
snp = snappy_decode(value)
165167
for (offset, msg) in KafkaProtocol._decode_message_set_iter(snp):
166168
yield (offset, msg)
@@ -543,7 +545,7 @@ def create_gzip_message(payloads, key=None):
543545
[create_message(payload) for payload in payloads])
544546

545547
gzipped = gzip_encode(message_set)
546-
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP
548+
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
547549

548550
return Message(0, 0x00 | codec, key, gzipped)
549551

@@ -564,6 +566,22 @@ def create_snappy_message(payloads, key=None):
564566
[create_message(payload) for payload in payloads])
565567

566568
snapped = snappy_encode(message_set)
567-
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY
569+
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
568570

569571
return Message(0, 0x00 | codec, key, snapped)
572+
573+
574+
def create_message_set(messages, codec=CODEC_NONE):
575+
"""Create a message set using the given codec.
576+
577+
If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
578+
return a list containing a single codec-encoded message.
579+
"""
580+
if codec == CODEC_NONE:
581+
return [create_message(m) for m in messages]
582+
elif codec == CODEC_GZIP:
583+
return [create_gzip_message(messages)]
584+
elif codec == CODEC_SNAPPY:
585+
return [create_snappy_message(messages)]
586+
else:
587+
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)

test/test_protocol.py

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,30 @@
1+
import contextlib
2+
from contextlib import contextmanager
13
import struct
24
import unittest2
35

6+
import mock
7+
from mock import sentinel
8+
49
from kafka import KafkaClient
510
from kafka.common import (
611
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
712
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
813
ProduceRequest, FetchRequest, Message, ChecksumError,
9-
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
10-
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
11-
TopicAndPartition, KafkaUnavailableError, ProtocolError,
12-
LeaderUnavailableError, PartitionUnavailableError
14+
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage,
15+
BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError,
16+
ProtocolError, LeaderUnavailableError, PartitionUnavailableError,
17+
UnsupportedCodecError
1318
)
1419
from kafka.codec import (
1520
has_snappy, gzip_encode, gzip_decode,
1621
snappy_encode, snappy_decode
1722
)
23+
import kafka.protocol
1824
from kafka.protocol import (
19-
create_gzip_message, create_message, create_snappy_message, KafkaProtocol
25+
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
26+
create_message, create_gzip_message, create_snappy_message,
27+
create_message_set
2028
)
2129

2230
class TestProtocol(unittest2.TestCase):
@@ -33,8 +41,7 @@ def test_create_gzip(self):
3341
payloads = ["v1", "v2"]
3442
msg = create_gzip_message(payloads)
3543
self.assertEqual(msg.magic, 0)
36-
self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
37-
KafkaProtocol.CODEC_GZIP)
44+
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
3845
self.assertEqual(msg.key, None)
3946
# Need to decode to check since gzipped payload is non-deterministic
4047
decoded = gzip_decode(msg.value)
@@ -63,8 +70,7 @@ def test_create_snappy(self):
6370
payloads = ["v1", "v2"]
6471
msg = create_snappy_message(payloads)
6572
self.assertEqual(msg.magic, 0)
66-
self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
67-
KafkaProtocol.CODEC_SNAPPY)
73+
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
6874
self.assertEqual(msg.key, None)
6975
decoded = snappy_decode(msg.value)
7076
expect = "".join([
@@ -692,3 +698,48 @@ def test_decode_offset_fetch_response(self):
692698
OffsetFetchResponse(topic = 'topic1', partition = 2, offset = 4, error = 0, metadata = "meta"),
693699
OffsetFetchResponse(topic = 'topic1', partition = 4, offset = 8, error = 0, metadata = "meta"),
694700
]))
701+
702+
@contextmanager
703+
def mock_create_message_fns(self):
704+
patches = contextlib.nested(
705+
mock.patch.object(kafka.protocol, "create_message",
706+
return_value=sentinel.message),
707+
mock.patch.object(kafka.protocol, "create_gzip_message",
708+
return_value=sentinel.gzip_message),
709+
mock.patch.object(kafka.protocol, "create_snappy_message",
710+
return_value=sentinel.snappy_message),
711+
)
712+
713+
with patches:
714+
yield
715+
716+
def test_create_message_set(self):
717+
messages = [1, 2, 3]
718+
719+
# Default codec is CODEC_NONE. Expect list of regular messages.
720+
expect = [sentinel.message] * len(messages)
721+
with self.mock_create_message_fns():
722+
message_set = create_message_set(messages)
723+
self.assertEqual(message_set, expect)
724+
725+
# CODEC_NONE: Expect list of regular messages.
726+
expect = [sentinel.message] * len(messages)
727+
with self.mock_create_message_fns():
728+
message_set = create_message_set(messages, CODEC_NONE)
729+
self.assertEqual(message_set, expect)
730+
731+
# CODEC_GZIP: Expect list of one gzip-encoded message.
732+
expect = [sentinel.gzip_message]
733+
with self.mock_create_message_fns():
734+
message_set = create_message_set(messages, CODEC_GZIP)
735+
self.assertEqual(message_set, expect)
736+
737+
# CODEC_SNAPPY: Expect list of one snappy-encoded message.
738+
expect = [sentinel.snappy_message]
739+
with self.mock_create_message_fns():
740+
message_set = create_message_set(messages, CODEC_SNAPPY)
741+
self.assertEqual(message_set, expect)
742+
743+
# Unknown codec should raise UnsupportedCodecError.
744+
with self.assertRaises(UnsupportedCodecError):
745+
create_message_set(messages, -1)

0 commit comments

Comments
 (0)