Skip to content

Commit faf1749

Browse files
TheAtomicOptiondpkp
authored andcommitted
Added controlled thread shutdown to example.py (dpkp#1268)
1 parent 146b893 commit faf1749

File tree

1 file changed

+32
-8
lines changed

1 file changed

+32
-8
lines changed

example.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,46 @@
66

77

88
class Producer(threading.Thread):
9-
daemon = True
9+
def __init__(self):
10+
threading.Thread.__init__(self)
11+
self.stop_event = threading.Event()
12+
13+
def stop(self):
14+
self.stop_event.set()
1015

1116
def run(self):
1217
producer = KafkaProducer(bootstrap_servers='localhost:9092')
1318

14-
while True:
19+
while not self.stop_event.is_set():
1520
producer.send('my-topic', b"test")
1621
producer.send('my-topic', b"\xc2Hola, mundo!")
1722
time.sleep(1)
1823

24+
producer.close()
1925

2026
class Consumer(multiprocessing.Process):
21-
daemon = True
22-
27+
def __init__(self):
28+
multiprocessing.Process.__init__(self)
29+
self.stop_event = multiprocessing.Event()
30+
31+
def stop(self):
32+
self.stop_event.set()
33+
2334
def run(self):
2435
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
25-
auto_offset_reset='earliest')
36+
auto_offset_reset='earliest',
37+
consumer_timeout_ms=1000)
2638
consumer.subscribe(['my-topic'])
2739

28-
for message in consumer:
29-
print (message)
30-
40+
while not self.stop_event.is_set():
41+
for message in consumer:
42+
print(message)
43+
if self.stop_event.is_set():
44+
break
3145

46+
consumer.close()
47+
48+
3249
def main():
3350
tasks = [
3451
Producer(),
@@ -39,7 +56,14 @@ def main():
3956
t.start()
4057

4158
time.sleep(10)
59+
60+
for task in tasks:
61+
task.stop()
4262

63+
for task in tasks:
64+
task.join()
65+
66+
4367
if __name__ == "__main__":
4468
logging.basicConfig(
4569
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',

0 commit comments

Comments
 (0)