@@ -9,21 +9,24 @@ SimpleProducer
9
9
from kafka import SimpleProducer, KafkaClient
10
10
11
11
# To send messages synchronously
12
- kafka = KafkaClient(" localhost:9092" )
12
+ kafka = KafkaClient(' localhost:9092' )
13
13
producer = SimpleProducer(kafka)
14
14
15
- # Note that the application is responsible for encoding messages to type str
16
- producer.send_messages(" my-topic" , " some message" )
17
- producer.send_messages(" my-topic" , " this method" , " is variadic" )
15
+ # Note that the application is responsible for encoding messages to type bytes
16
+ producer.send_messages(b ' my-topic' , b ' some message' )
17
+ producer.send_messages(b ' my-topic' , b ' this method' , b ' is variadic' )
18
18
19
19
# Send unicode message
20
- producer.send_messages(" my-topic" , u ' 你怎么样?' .encode(' utf-8' ))
20
+ producer.send_messages(b ' my-topic' , u ' 你怎么样?' .encode(' utf-8' ))
21
+
22
+ Asynchronous Mode
23
+ -----------------
24
+
25
+ .. code :: python
21
26
22
27
# To send messages asynchronously
23
- # WARNING: current implementation does not guarantee message delivery on failure!
24
- # messages can get dropped! Use at your own risk! Or help us improve with a PR!
25
28
producer = SimpleProducer(kafka, async = True )
26
- producer.send_messages(" my-topic" , " async message" )
29
+ producer.send_messages(b ' my-topic' , b ' async message' )
27
30
28
31
# To wait for acknowledgements
29
32
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
@@ -32,13 +35,12 @@ SimpleProducer
32
35
# by all in sync replicas before sending a response
33
36
producer = SimpleProducer(kafka, async = False ,
34
37
req_acks = SimpleProducer.ACK_AFTER_LOCAL_WRITE ,
35
- ack_timeout = 2000 )
36
-
37
- response = producer.send_messages(" my-topic" , " another message" )
38
+ ack_timeout = 2000 ,
39
+ sync_fail_on_error = False )
38
40
39
- if response:
40
- print (response[ 0 ].error)
41
- print (response[ 0 ] .offset)
41
+ responses = producer.send_messages( b ' my-topic ' , b ' another message ' )
42
+ for r in responses:
43
+ logging.info(r .offset)
42
44
43
45
# To send messages in batch. You can use any of the available
44
46
# producers for doing this. The following producer will collect
@@ -56,16 +58,21 @@ Keyed messages
56
58
57
59
.. code :: python
58
60
59
- from kafka import (KafkaClient, KeyedProducer, HashedPartitioner,
60
- RoundRobinPartitioner)
61
+ from kafka import (
62
+ KafkaClient, KeyedProducer,
63
+ Murmur2Partitioner, RoundRobinPartitioner)
61
64
62
- kafka = KafkaClient(" localhost:9092" )
65
+ kafka = KafkaClient(' localhost:9092' )
63
66
64
- # HashedPartitioner is default
67
+ # HashedPartitioner is default (currently uses python hash())
65
68
producer = KeyedProducer(kafka)
66
- producer.send_messages(" my-topic" , " key1" , " some message" )
67
- producer.send_messages(" my-topic" , " key2" , " this methode" )
69
+ producer.send_messages(b ' my-topic' , b ' key1' , b ' some message' )
70
+ producer.send_messages(b ' my-topic' , b ' key2' , b ' this methode' )
68
71
72
+ # Murmur2Partitioner attempts to mirror the java client hashing
73
+ producer = KeyedProducer(kafka, partitioner = Murmur2Partitioner)
74
+
75
+ # Or just produce round-robin (or just use SimpleProducer)
69
76
producer = KeyedProducer(kafka, partitioner = RoundRobinPartitioner)
70
77
71
78
@@ -78,18 +85,16 @@ KafkaConsumer
78
85
from kafka import KafkaConsumer
79
86
80
87
# To consume messages
81
- consumer = KafkaConsumer(" my-topic" ,
82
- group_id = " my_group" ,
83
- bootstrap_servers = [" localhost:9092" ])
88
+ consumer = KafkaConsumer(' my-topic' ,
89
+ group_id = ' my_group' ,
90
+ bootstrap_servers = [' localhost:9092' ])
84
91
for message in consumer:
85
92
# message value is raw byte string -- decode if necessary!
86
93
# e.g., for unicode: `message.value.decode('utf-8')`
87
94
print (" %s :%d :%d : key=%s value=%s " % (message.topic, message.partition,
88
95
message.offset, message.key,
89
96
message.value))
90
97
91
- kafka.close()
92
-
93
98
94
99
messages (m) are namedtuples with attributes:
95
100
@@ -121,16 +126,16 @@ messages (m) are namedtuples with attributes:
121
126
# so it can be included in the next commit
122
127
#
123
128
# **messages that are not marked w/ task_done currently do not commit!
124
- kafka .task_done(m)
129
+ consumer .task_done(m)
125
130
126
131
# If auto_commit_enable is False, remember to commit() periodically
127
- kafka .commit()
132
+ consumer .commit()
128
133
129
134
# Batch process interface
130
135
while True :
131
136
for m in kafka.fetch_messages():
132
137
process_message(m)
133
- kafka .task_done(m)
138
+ consumer .task_done(m)
134
139
135
140
136
141
Configuration settings can be passed to constructor,
@@ -162,13 +167,13 @@ Multiprocess consumer
162
167
163
168
from kafka import KafkaClient, MultiProcessConsumer
164
169
165
- kafka = KafkaClient(" localhost:9092" )
170
+ kafka = KafkaClient(' localhost:9092' )
166
171
167
172
# This will split the number of partitions among two processes
168
- consumer = MultiProcessConsumer(kafka, " my-group" , " my-topic" , num_procs = 2 )
173
+ consumer = MultiProcessConsumer(kafka, b ' my-group' , b ' my-topic' , num_procs = 2 )
169
174
170
175
# This will spawn processes such that each handles 2 partitions max
171
- consumer = MultiProcessConsumer(kafka, " my-group" , " my-topic" ,
176
+ consumer = MultiProcessConsumer(kafka, b ' my-group' , b ' my-topic' ,
172
177
partitions_per_proc = 2 )
173
178
174
179
for message in consumer:
@@ -186,14 +191,14 @@ Low level
186
191
from kafka.protocol import KafkaProtocol
187
192
from kafka.common import ProduceRequest
188
193
189
- kafka = KafkaClient(" localhost:9092" )
194
+ kafka = KafkaClient(' localhost:9092' )
190
195
191
- req = ProduceRequest(topic = " my-topic" , partition = 1 ,
192
- messages = [create_message(" some message" )])
196
+ req = ProduceRequest(topic = b ' my-topic' , partition = 1 ,
197
+ messages = [create_message(b ' some message' )])
193
198
resps = kafka.send_produce_request(payloads = [req], fail_on_error = True )
194
199
kafka.close()
195
200
196
- resps[0 ].topic # " my-topic"
201
+ resps[0 ].topic # b' my-topic'
197
202
resps[0 ].partition # 1
198
203
resps[0 ].error # 0 (hopefully)
199
204
resps[0 ].offset # offset of the first message sent in this request
0 commit comments