Skip to content

Commit 7eaca8e

Browse files
committed
Split out and speed up producer tests
1 parent 385f2d8 commit 7eaca8e

File tree

1 file changed

+139
-0
lines changed

1 file changed

+139
-0
lines changed

test/test_producer_integration.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import unittest
2+
import time
3+
4+
from kafka import * # noqa
5+
from kafka.common import * # noqa
6+
from kafka.codec import has_gzip, has_snappy
7+
from .fixtures import ZookeeperFixture, KafkaFixture
8+
from .testutil import *
9+
10+
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
11+
topic = 'produce_topic'
12+
13+
@classmethod
14+
def setUpClass(cls): # noqa
15+
cls.zk = ZookeeperFixture.instance()
16+
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
17+
cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
18+
19+
@classmethod
20+
def tearDownClass(cls): # noqa
21+
cls.client.close()
22+
cls.server.close()
23+
cls.zk.close()
24+
25+
def test_produce_many_simple(self):
26+
start_offset = self.current_offset(self.topic, 0)
27+
28+
produce = ProduceRequest(self.topic, 0, messages=[
29+
create_message("Test message %d" % i) for i in range(100)
30+
])
31+
32+
resp = self.client.send_produce_request([produce])
33+
self.assertEqual(len(resp), 1) # Only one response
34+
self.assertEqual(resp[0].error, 0) # No error
35+
self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
36+
37+
self.assertEqual(self.current_offset(self.topic, 0), start_offset+100)
38+
39+
resp = self.client.send_produce_request([produce])
40+
self.assertEqual(len(resp), 1) # Only one response
41+
self.assertEqual(resp[0].error, 0) # No error
42+
self.assertEqual(resp[0].offset, start_offset+100) # Initial offset of first message
43+
44+
self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
45+
46+
def test_produce_10k_simple(self):
47+
start_offset = self.current_offset(self.topic, 0)
48+
49+
produce = ProduceRequest(self.topic, 0, messages=[
50+
create_message("Test message %d" % i) for i in range(10000)
51+
])
52+
53+
resp = self.client.send_produce_request([produce])
54+
self.assertEqual(len(resp), 1) # Only one response
55+
self.assertEqual(resp[0].error, 0) # No error
56+
self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
57+
58+
self.assertEqual(self.current_offset(self.topic, 0), start_offset+10000)
59+
60+
def test_produce_many_gzip(self):
61+
start_offset = self.current_offset(self.topic, 0)
62+
63+
message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
64+
message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
65+
66+
produce = ProduceRequest(self.topic, 0, messages=[message1, message2])
67+
68+
resp = self.client.send_produce_request([produce])
69+
self.assertEqual(len(resp), 1) # Only one response
70+
self.assertEqual(resp[0].error, 0) # No error
71+
self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
72+
73+
self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
74+
75+
@unittest.skip("All snappy integration tests fail with nosnappyjava")
76+
def test_produce_many_snappy(self):
77+
start_offset = self.current_offset(self.topic, 0)
78+
79+
produce = ProduceRequest(self.topic, 0, messages=[
80+
create_snappy_message(["Snappy 1 %d" % i for i in range(100)]),
81+
create_snappy_message(["Snappy 2 %d" % i for i in range(100)]),
82+
])
83+
84+
resp = self.client.send_produce_request([produce])
85+
86+
self.assertEqual(len(resp), 1) # Only one response
87+
self.assertEqual(resp[0].error, 0) # No error
88+
self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
89+
90+
self.assertEqual(self.current_offset(self.topic, 0), start_offset+200)
91+
92+
def test_produce_mixed(self):
93+
start_offset = self.current_offset(self.topic, 0)
94+
95+
msg_count = 1+100
96+
messages = [
97+
create_message("Just a plain message"),
98+
create_gzip_message(["Gzipped %d" % i for i in range(100)]),
99+
]
100+
101+
# All snappy integration tests fail with nosnappyjava
102+
if False and has_snappy():
103+
msg_count += 100
104+
messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)]))
105+
106+
produce = ProduceRequest(self.topic, 0, messages=messages)
107+
resp = self.client.send_produce_request([produce])
108+
109+
self.assertEqual(len(resp), 1) # Only one response
110+
self.assertEqual(resp[0].error, 0) # No error
111+
self.assertEqual(resp[0].offset, start_offset) # Initial offset of first message
112+
113+
self.assertEqual(self.current_offset(self.topic, 0), start_offset+msg_count)
114+
115+
def test_produce_100k_gzipped(self):
116+
start_offset = self.current_offset(self.topic, 0)
117+
118+
req1 = ProduceRequest(self.topic, 0, messages=[
119+
create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
120+
])
121+
resp1 = self.client.send_produce_request([req1])
122+
123+
self.assertEqual(len(resp1), 1) # Only one response
124+
self.assertEqual(resp1[0].error, 0) # No error
125+
self.assertEqual(resp1[0].offset, start_offset) # Initial offset of first message
126+
127+
self.assertEqual(self.current_offset(self.topic, 0), start_offset+50000)
128+
129+
req2 = ProduceRequest(self.topic, 0, messages=[
130+
create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)])
131+
])
132+
133+
resp2 = self.client.send_produce_request([req2])
134+
135+
self.assertEqual(len(resp2), 1) # Only one response
136+
self.assertEqual(resp2[0].error, 0) # No error
137+
self.assertEqual(resp2[0].offset, start_offset+50000) # Initial offset of first message
138+
139+
self.assertEqual(self.current_offset(self.topic, 0), start_offset+100000)

0 commit comments

Comments
 (0)