Skip to content

Commit 1984dab

Browse files
committed
Finish breaking out integration tests
1 parent 8983e73 commit 1984dab

File tree

5 files changed

+387
-933
lines changed

5 files changed

+387
-933
lines changed

test/test_client_integration.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import unittest
2+
import time
3+
4+
from kafka import * # noqa
5+
from kafka.common import * # noqa
6+
from kafka.codec import has_gzip, has_snappy
7+
from .fixtures import ZookeeperFixture, KafkaFixture
8+
from .testutil import *
9+
10+
@unittest.skipIf(skip_integration(), 'Skipping Integration')
11+
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
12+
@classmethod
13+
def setUpClass(cls): # noqa
14+
cls.zk = ZookeeperFixture.instance()
15+
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
16+
17+
@classmethod
18+
def tearDownClass(cls): # noqa
19+
cls.server.close()
20+
cls.zk.close()
21+
22+
def test_consume_none(self):
23+
fetch = FetchRequest(self.topic, 0, 0, 1024)
24+
25+
fetch_resp, = self.client.send_fetch_request([fetch])
26+
self.assertEquals(fetch_resp.error, 0)
27+
self.assertEquals(fetch_resp.topic, self.topic)
28+
self.assertEquals(fetch_resp.partition, 0)
29+
30+
messages = list(fetch_resp.messages)
31+
self.assertEquals(len(messages), 0)
32+
33+
####################
34+
# Offset Tests #
35+
####################
36+
37+
@unittest.skip('commit offset not supported in this version')
38+
def test_commit_fetch_offsets(self):
39+
req = OffsetCommitRequest(self.topic, 0, 42, "metadata")
40+
(resp,) = self.client.send_offset_commit_request("group", [req])
41+
self.assertEquals(resp.error, 0)
42+
43+
req = OffsetFetchRequest(self.topic, 0)
44+
(resp,) = self.client.send_offset_fetch_request("group", [req])
45+
self.assertEquals(resp.error, 0)
46+
self.assertEquals(resp.offset, 42)
47+
self.assertEquals(resp.metadata, "") # Metadata isn't stored for now

test/test_consumer_integration.py

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
import unittest
2+
from datetime import datetime
3+
4+
from kafka import * # noqa
5+
from kafka.common import * # noqa
6+
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
7+
from .fixtures import ZookeeperFixture, KafkaFixture
8+
from .testutil import *
9+
10+
@unittest.skipIf(skip_integration(), 'Skipping Integration')
11+
class TestConsumerIntegration(KafkaIntegrationTestCase):
12+
@classmethod
13+
def setUpClass(cls):
14+
cls.zk = ZookeeperFixture.instance()
15+
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
16+
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
17+
18+
cls.server = cls.server1 # Bootstrapping server
19+
20+
@classmethod
21+
def tearDownClass(cls): # noqa
22+
cls.server1.close()
23+
cls.server2.close()
24+
cls.zk.close()
25+
26+
def send_messages(self, partition, messages):
27+
messages = [ create_message(self.msg(str(msg))) for msg in messages ]
28+
produce = ProduceRequest(self.topic, partition, messages = messages)
29+
resp, = self.client.send_produce_request([produce])
30+
self.assertEquals(resp.error, 0)
31+
32+
return [ x.value for x in messages ]
33+
34+
def assert_message_count(self, messages, num_messages):
35+
# Make sure we got them all
36+
self.assertEquals(len(messages), num_messages)
37+
38+
# Make sure there are no duplicates
39+
self.assertEquals(len(set(messages)), num_messages)
40+
41+
def test_simple_consumer(self):
42+
self.send_messages(0, range(0, 100))
43+
self.send_messages(1, range(100, 200))
44+
45+
# Start a consumer
46+
consumer = SimpleConsumer(self.client, "group1",
47+
self.topic, auto_commit=False,
48+
iter_timeout=0)
49+
50+
self.assert_message_count([ message for message in consumer ], 200)
51+
52+
consumer.stop()
53+
54+
def test_simple_consumer__seek(self):
55+
self.send_messages(0, range(0, 100))
56+
self.send_messages(1, range(100, 200))
57+
58+
consumer = SimpleConsumer(self.client, "group1",
59+
self.topic, auto_commit=False,
60+
iter_timeout=0)
61+
62+
# Rewind 10 messages from the end
63+
consumer.seek(-10, 2)
64+
self.assert_message_count([ message for message in consumer ], 10)
65+
66+
# Rewind 13 messages from the end
67+
consumer.seek(-13, 2)
68+
self.assert_message_count([ message for message in consumer ], 13)
69+
70+
consumer.stop()
71+
72+
def test_simple_consumer_blocking(self):
73+
consumer = SimpleConsumer(self.client, "group1",
74+
self.topic,
75+
auto_commit=False, iter_timeout=0)
76+
77+
# Ask for 5 messages, nothing in queue, block 5 seconds
78+
with Timer() as t:
79+
messages = consumer.get_messages(block=True, timeout=5)
80+
self.assert_message_count(messages, 0)
81+
self.assertGreaterEqual(t.interval, 5)
82+
83+
self.send_messages(0, range(0, 10))
84+
85+
# Ask for 5 messages, 10 in queue. Get 5 back, no blocking
86+
with Timer() as t:
87+
messages = consumer.get_messages(count=5, block=True, timeout=5)
88+
self.assert_message_count(messages, 5)
89+
self.assertLessEqual(t.interval, 1)
90+
91+
# Ask for 10 messages, get 5 back, block 5 seconds
92+
with Timer() as t:
93+
messages = consumer.get_messages(count=10, block=True, timeout=5)
94+
self.assert_message_count(messages, 5)
95+
self.assertGreaterEqual(t.interval, 5)
96+
97+
consumer.stop()
98+
99+
def test_simple_consumer_pending(self):
100+
# Produce 10 messages to partitions 0 and 1
101+
self.send_messages(0, range(0, 10))
102+
self.send_messages(1, range(10, 20))
103+
104+
consumer = SimpleConsumer(self.client, "group1", self.topic,
105+
auto_commit=False, iter_timeout=0)
106+
107+
self.assertEquals(consumer.pending(), 20)
108+
self.assertEquals(consumer.pending(partitions=[0]), 10)
109+
self.assertEquals(consumer.pending(partitions=[1]), 10)
110+
111+
consumer.stop()
112+
113+
def test_multi_process_consumer(self):
114+
# Produce 100 messages to partitions 0 and 1
115+
self.send_messages(0, range(0, 100))
116+
self.send_messages(1, range(100, 200))
117+
118+
consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False)
119+
120+
self.assert_message_count([ message for message in consumer ], 200)
121+
122+
consumer.stop()
123+
124+
def test_multi_process_consumer_blocking(self):
125+
consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False)
126+
127+
# Ask for 5 messages, No messages in queue, block 5 seconds
128+
with Timer() as t:
129+
messages = consumer.get_messages(block=True, timeout=5)
130+
self.assert_message_count(messages, 0)
131+
132+
self.assertGreaterEqual(t.interval, 5)
133+
134+
# Send 10 messages
135+
self.send_messages(0, range(0, 10))
136+
137+
# Ask for 5 messages, 10 messages in queue, block 0 seconds
138+
with Timer() as t:
139+
messages = consumer.get_messages(count=5, block=True, timeout=5)
140+
self.assert_message_count(messages, 5)
141+
self.assertLessEqual(t.interval, 1)
142+
143+
# Ask for 10 messages, 5 in queue, block 5 seconds
144+
with Timer() as t:
145+
messages = consumer.get_messages(count=10, block=True, timeout=5)
146+
self.assert_message_count(messages, 5)
147+
self.assertGreaterEqual(t.interval, 5)
148+
149+
consumer.stop()
150+
151+
def test_multi_proc_pending(self):
152+
self.send_messages(0, range(0, 10))
153+
self.send_messages(1, range(10, 20))
154+
155+
consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
156+
157+
self.assertEquals(consumer.pending(), 20)
158+
self.assertEquals(consumer.pending(partitions=[0]), 10)
159+
self.assertEquals(consumer.pending(partitions=[1]), 10)
160+
161+
consumer.stop()
162+
163+
def test_large_messages(self):
164+
# Produce 10 "normal" size messages
165+
small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
166+
167+
# Produce 10 messages that are large (bigger than default fetch size)
168+
large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
169+
170+
# Consumer should still get all of them
171+
consumer = SimpleConsumer(self.client, "group1", self.topic,
172+
auto_commit=False, iter_timeout=0)
173+
174+
expected_messages = set(small_messages + large_messages)
175+
actual_messages = set([ x.message.value for x in consumer ])
176+
self.assertEqual(expected_messages, actual_messages)
177+
178+
consumer.stop()
179+
180+
def test_huge_messages(self):
181+
huge_message, = self.send_messages(0, [
182+
create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
183+
])
184+
185+
# Create a consumer with the default buffer size
186+
consumer = SimpleConsumer(self.client, "group1", self.topic,
187+
auto_commit=False, iter_timeout=0)
188+
189+
# This consumer failes to get the message
190+
with self.assertRaises(ConsumerFetchSizeTooSmall):
191+
consumer.get_message(False, 0.1)
192+
193+
consumer.stop()
194+
195+
# Create a consumer with no fetch size limit
196+
big_consumer = SimpleConsumer(self.client, "group1", self.topic,
197+
max_buffer_size=None, partitions=[0],
198+
auto_commit=False, iter_timeout=0)
199+
200+
# Seek to the last message
201+
big_consumer.seek(-1, 2)
202+
203+
# Consume giant message successfully
204+
message = big_consumer.get_message(block=False, timeout=10)
205+
self.assertIsNotNone(message)
206+
self.assertEquals(message.message.value, huge_message)
207+
208+
big_consumer.stop()

test/test_failover_integration.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import unittest
2+
import time
3+
4+
from kafka import * # noqa
5+
from kafka.common import * # noqa
6+
from .fixtures import ZookeeperFixture, KafkaFixture
7+
from .testutil import *
8+
9+
@unittest.skipIf(skip_integration(), 'Skipping Integration')
10+
class TestFailover(KafkaIntegrationTestCase):
11+
create_client = False
12+
13+
@classmethod
14+
def setUpClass(cls): # noqa
15+
zk_chroot = random_string(10)
16+
replicas = 2
17+
partitions = 2
18+
19+
# mini zookeeper, 2 kafka brokers
20+
cls.zk = ZookeeperFixture.instance()
21+
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
22+
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
23+
24+
hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
25+
cls.client = KafkaClient(hosts)
26+
27+
@classmethod
28+
def tearDownClass(cls):
29+
cls.client.close()
30+
for broker in cls.brokers:
31+
broker.close()
32+
cls.zk.close()
33+
34+
def test_switch_leader(self):
35+
key, topic, partition = random_string(5), self.topic, 0
36+
producer = SimpleProducer(self.client)
37+
38+
for i in range(1, 4):
39+
40+
# XXX unfortunately, the conns dict needs to be warmed for this to work
41+
# XXX unfortunately, for warming to work, we need at least as many partitions as brokers
42+
self._send_random_messages(producer, self.topic, 10)
43+
44+
# kil leader for partition 0
45+
broker = self._kill_leader(topic, partition)
46+
47+
# expect failure, reload meta data
48+
with self.assertRaises(FailedPayloadsError):
49+
producer.send_messages(self.topic, 'part 1')
50+
producer.send_messages(self.topic, 'part 2')
51+
time.sleep(1)
52+
53+
# send to new leader
54+
self._send_random_messages(producer, self.topic, 10)
55+
56+
broker.open()
57+
time.sleep(3)
58+
59+
# count number of messages
60+
count = self._count_messages('test_switch_leader group %s' % i, topic)
61+
self.assertIn(count, range(20 * i, 22 * i + 1))
62+
63+
producer.stop()
64+
65+
def test_switch_leader_async(self):
66+
key, topic, partition = random_string(5), self.topic, 0
67+
producer = SimpleProducer(self.client, async=True)
68+
69+
for i in range(1, 4):
70+
71+
self._send_random_messages(producer, self.topic, 10)
72+
73+
# kil leader for partition 0
74+
broker = self._kill_leader(topic, partition)
75+
76+
# expect failure, reload meta data
77+
producer.send_messages(self.topic, 'part 1')
78+
producer.send_messages(self.topic, 'part 2')
79+
time.sleep(1)
80+
81+
# send to new leader
82+
self._send_random_messages(producer, self.topic, 10)
83+
84+
broker.open()
85+
time.sleep(3)
86+
87+
# count number of messages
88+
count = self._count_messages('test_switch_leader_async group %s' % i, topic)
89+
self.assertIn(count, range(20 * i, 22 * i + 1))
90+
91+
producer.stop()
92+
93+
def _send_random_messages(self, producer, topic, n):
94+
for j in range(n):
95+
resp = producer.send_messages(topic, random_string(10))
96+
if len(resp) > 0:
97+
self.assertEquals(resp[0].error, 0)
98+
time.sleep(1) # give it some time
99+
100+
def _kill_leader(self, topic, partition):
101+
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
102+
broker = self.brokers[leader.nodeId]
103+
broker.close()
104+
time.sleep(1) # give it some time
105+
return broker
106+
107+
def _count_messages(self, group, topic):
108+
hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
109+
client = KafkaClient(hosts)
110+
consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
111+
all_messages = []
112+
for message in consumer:
113+
all_messages.append(message)
114+
consumer.stop()
115+
client.close()
116+
return len(all_messages)

0 commit comments

Comments
 (0)