Skip to content

Commit 8ef5104

Browse files
authored
Merge pull request #16 from tinybirdco/fix-handling-null-values
Added changes to handle properly NULL and empty string values
2 parents fb4f862 + a0d0269 commit 8ef5104

File tree

2 files changed

+32
-7
lines changed

2 files changed

+32
-7
lines changed

clickhouse_mysql/writer/csvwriter.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ def insert(self, event_or_events):
136136
if self.dst_table is None:
137137
self.dst_table = event.table
138138

139-
self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL)
139+
self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_NONNUMERIC)
140140
if not self.header_written:
141141
self.writer.writeheader()
142142

@@ -190,7 +190,7 @@ def delete_row(self, event_or_events):
190190
if self.dst_table is None:
191191
self.dst_table = event.table
192192

193-
self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL)
193+
self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_NONNUMERIC)
194194
if not self.header_written:
195195
self.writer.writeheader()
196196

@@ -253,7 +253,7 @@ def update(self, event_or_events):
253253
if self.dst_table is None:
254254
self.dst_table = event.table
255255

256-
self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.csv.QUOTE_ALL)
256+
self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_NONNUMERIC)
257257
if not self.header_written:
258258
self.writer.writeheader()
259259

@@ -266,28 +266,34 @@ def update(self, event_or_events):
266266
self.generate_row(event_converted)
267267

268268

269+
def convert_null_values(self, row):
270+
""" We need to mark those fields that are null to be able to distinguish between NULL and empty strings """
271+
for key in list(row.keys()):
272+
if row[key] is None:
273+
row[key] = "NULL"
274+
269275
def generate_row(self, event):
270276
""" When using mempool or csvpool events are cached so you can receive different kind of events in the same list. These events should be handled in a different way """
271277

272278
if isinstance(event.pymysqlreplication_event, WriteRowsEvent):
273279
for row in event:
274-
# row['tb_upd'] = event.ts.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
275280
row['tb_upd'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
276281
row['operation'] = 0
282+
self.convert_null_values(row)
277283
self.writer.writerow(self.convert(row))
278284
elif isinstance(event.pymysqlreplication_event, DeleteRowsEvent):
279285
for row in event:
280-
# row['tb_upd'] = event.ts.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
281286
row['tb_upd'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
282287
row['operation'] = 2
288+
self.convert_null_values(row)
283289
self.writer.writerow(self.convert(row))
284290
elif isinstance(event.pymysqlreplication_event, UpdateRowsEvent):
285291
for row in event.pymysqlreplication_event.rows:
286-
# row['after_values']['tb_upd'] = event.ts.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
287292
row['after_values']['tb_upd'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
288293
row['after_values']['operation'] = 1
294+
self.convert_null_values(row['after_values'])
289295
self.writer.writerow(self.convert(row['after_values']))
290-
296+
291297

292298
def push(self):
293299
if not self.next_writer_builder or not self.fieldnames:

clickhouse_mysql/writer/tbcsvwriter.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import logging
55
import time
6+
import subprocess
67

78
from clickhouse_mysql.writer.writer import Writer
89

@@ -50,6 +51,21 @@ def __init__(
5051
self.dst_table_prefix = dst_table_prefix
5152
self.dst_distribute = dst_distribute
5253

54+
55+
def format_null_values(self, csv_file):
56+
""" We need to replace NULL values by \\N (CH null value) and do not quote this field.
57+
It is the only way to distinguish between NULL and empty strings. With this we will have:
58+
- xx,\\N,yy --> For null values
59+
- xx,'',yy --> For empty strings """
60+
61+
with open(csv_file, 'r') as file:
62+
data = file.read()
63+
data = data.replace('"NULL"', '\\N')
64+
65+
with open(csv_file, 'w') as file:
66+
file.write(data)
67+
68+
5369
def uploadCSV(self, table, filename, tries=1):
5470
limit_of_retries = 3
5571
params = {
@@ -58,6 +74,9 @@ def uploadCSV(self, table, filename, tries=1):
5874
}
5975

6076
try:
77+
# Add replace NULL values by \N
78+
self.format_null_values(filename)
79+
6180
with open(filename, 'rb') as f:
6281
m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')})
6382
url = f"{self.tb_host}/v0/datasources"

0 commit comments

Comments
 (0)