Skip to content

Commit 40abffd

Browse files
authored
Merge pull request redpanda-data#6332 from daisukebe/schemas-command-generator
tools: add _schemas topic command generator for restoring
2 parents 21e9e1e + 0681a99 commit 40abffd

File tree

1 file changed

+82
-0
lines changed

1 file changed

+82
-0
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#!/usr/bin/env python3
2+
3+
# Usage:
4+
#
5+
# Say you have a dump file of the _schemas topic, taken via
6+
# 'rpk topic consume _schemas' and you want to restore it
7+
# directly to the _schemas topic again.
8+
#
9+
# Run the following command that parses the dump file and
10+
# generate runnable commands. This will fail without generating
11+
# the commands when a seq is higher than the corresponding offset.
12+
#
13+
# $ ./schemas_command_generator.py schemas-dump.txt > run.sh
14+
#
15+
# Here's how to restore the records:
16+
#
17+
# 1. Delete the existing _schemas topic
18+
# 2. Restart Redpanda
19+
# 3. Create the topic, configure the '-r' option based on the cluster size:
20+
# $ rpk topic create _schemas -r 3 -c cleanup.policy=compact -c compression.type=none
21+
# 4. Run the generated script above
22+
# $ chmod 755 run.sh
23+
# $ sh run.sh
24+
#
25+
# Example outputs:
26+
# $ sh run.sh
27+
# Produced to partition 0 at offset 0 with timestamp 1662516019353.
28+
# Produced to partition 0 at offset 1 with timestamp 1662516020736.
29+
# Produced to partition 0 at offset 2 with timestamp 1662516021788.
30+
# Produced to partition 0 at offset 3 with timestamp 1662516022826.
31+
# Done
32+
#
33+
34+
import json
35+
from pprint import pprint
36+
import re
37+
38+
39+
def main():
40+
import argparse
41+
42+
def generate_options():
43+
parser = argparse.ArgumentParser(
44+
description='Redpanda Schema Registry _schemas Command Generator')
45+
parser.add_argument('path', type=str, help='Path to the file')
46+
return parser
47+
48+
parser = generate_options()
49+
options, _ = parser.parse_known_args()
50+
51+
# Formatting as a single json
52+
tmp = '['
53+
with open(options.path) as f:
54+
for l in f.read().splitlines():
55+
if l.startswith('}'):
56+
tmp += ('},')
57+
else:
58+
tmp += (l)
59+
tmp = re.sub('},$', '}]', tmp)
60+
j = json.loads(tmp)
61+
62+
# Generating rpk topic create commands
63+
cmd_all = ''
64+
cmd_all += f"#!/bin/bash\n\n"
65+
for c, i in enumerate(j):
66+
d = json.loads(i['key'])
67+
if d['seq'] > i['offset']:
68+
cmd_all = f"The seq {d['seq']} is unexpectedly higher than the \
69+
offset {i['offset']} at key {i['key']}. \nThat is it's likely broken, hence exiting...."
70+
71+
break
72+
cmd_all += f"echo '{i['value']}' | rpk topic produce _schemas --compression none -k '{i['key']}'\n"
73+
if c == len(j) - 1:
74+
cmd_all += "echo Done"
75+
else:
76+
cmd_all += "sleep 1s\n"
77+
78+
print(cmd_all)
79+
80+
81+
if __name__ == '__main__':
82+
main()

0 commit comments

Comments
 (0)