We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 25a3e7d commit fd1e041Copy full SHA for fd1e041
examples/mysql_to_kafka.py
@@ -23,12 +23,20 @@
23
def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1):
24
admin_client = KafkaAdminClient(bootstrap_servers="127.0.0.1:9092")
25
26
- topic = NewTopic(
27
- name=topic_name,
28
- num_partitions=num_partitions,
29
- replication_factor=replication_factor,
30
- )
31
- admin_client.create_topics(new_topics=[topic])
+ topic_exists = False
+ try:
+ topic_listings = admin_client.list_topics()
+ topic_exists = topic_name in topic_listings
+ except TopicAlreadyExistsError:
+ topic_exists = True
32
+
33
+ if not topic_exists:
34
+ topic = NewTopic(
35
+ name=topic_name,
36
+ num_partitions=num_partitions,
37
+ replication_factor=replication_factor,
38
+ )
39
+ admin_client.create_topics(new_topics=[topic])
40
41
42
def main():
0 commit comments