|
12 | 12 | from requests_toolbelt.multipart.encoder import MultipartEncoder
|
13 | 13 | import json
|
14 | 14 |
|
15 |
| -class CHCSVWriter(Writer): |
16 |
| - """Write into ClickHouse via CSV file and clickhouse-client tool""" |
| 15 | +class TBCSVWriter(Writer): |
| 16 | + """Write into Tinybird via CSV file""" |
17 | 17 |
|
18 | 18 | dst_schema = None
|
19 | 19 | dst_table = None
|
20 | 20 | dst_distribute = None
|
21 | 21 |
|
22 |
| - host = None |
23 |
| - port = None |
24 |
| - user = None |
25 |
| - password = None |
| 22 | + tb_host = None |
| 23 | + tb_token = None |
26 | 24 |
|
27 | 25 | def __init__(
|
28 | 26 | self,
|
29 |
| - connection_settings, |
| 27 | + tb_host, |
| 28 | + tb_token, |
30 | 29 | dst_schema=None,
|
31 | 30 | dst_table=None,
|
32 | 31 | dst_table_prefix=None,
|
33 | 32 | dst_distribute=False,
|
34 | 33 | ):
|
35 |
| - if dst_distribute and dst_schema is not None: |
36 |
| - dst_schema += "_all" |
37 |
| - if dst_distribute and dst_table is not None: |
38 |
| - dst_table += "_all" |
39 |
| - logging.info( |
40 |
| - "CHCSWriter() connection_settings={} dst_schema={} dst_table={}".format(connection_settings, dst_schema, |
41 |
| - dst_table)) |
42 |
| - self.host = connection_settings['host'] |
43 |
| - self.port = connection_settings['port'] |
44 |
| - self.user = connection_settings['user'] |
45 |
| - self.password = connection_settings['password'] |
| 34 | + # if dst_distribute and dst_schema is not None: |
| 35 | + # dst_schema += "_all" |
| 36 | + # if dst_distribute and dst_table is not None: |
| 37 | + # dst_table += "_all" |
| 38 | + # logging.info( |
| 39 | + # "CHCSWriter() connection_settings={} dst_schema={} dst_table={}".format(connection_settings, dst_schema, |
| 40 | + # dst_table)) |
| 41 | + self.tb_host = tb_host |
| 42 | + self.tb_token = tb_token |
| 43 | + |
| 44 | + if self.tb_host is None or self.tb_token is None: |
| 45 | + logging.critical(f" Host: {self.tb_host} or token {self.tb_token} is missing") |
| 46 | + return None |
| 47 | + |
46 | 48 | self.dst_schema = dst_schema
|
47 | 49 | self.dst_table = dst_table
|
48 | 50 | self.dst_table_prefix = dst_table_prefix
|
@@ -75,17 +77,14 @@ def insert(self, event_or_events=None):
|
75 | 77 | 'name': self.dst_table,
|
76 | 78 | 'mode': 'append'
|
77 | 79 | }
|
78 |
| - token = 'p.eyJ1IjogIjUzMWUwMDUxLTRmZGQtNDc5MC05OGY0LWNlNTA3NDVkYzFkNSIsICJpZCI6ICIzNzMyOWExNy1mMGM2LTQyNWMtODhmOC1hZDE2MTU4OWIwNmMifQ.80_ZbRL9D8x2mlWay1EjdeA-zi7asxqKqp06NoOmadg' |
79 |
| - # token = 'p.eyJ1IjogImJjYzkyYjJkLTY3YWItNDIxMC05YjkzLWVhYjc4ODA3ZTVkMiIsICJpZCI6ICJiMjNiNGQwNC01N2FhLTRiOTMtOTNkMS1lNzE0ODcxZWJkYTUifQ.YNzxkpebfTIgZGSQ9UiUpunKymeY6n7cWUY-3jgSp8E' |
80 | 80 |
|
81 | 81 | f = open(event.filename, 'rb')
|
82 | 82 | m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')})
|
83 | 83 |
|
84 |
| - url = 'https://tb-test.tinybird.co/v0/datasources' |
85 |
| - # url = 'http://localhost:8001/v0/datasources' |
| 84 | + url = f"{self.tb_host}/v0/datasources" |
86 | 85 |
|
87 | 86 | response = requests.post(url, data=m,
|
88 |
| - headers={'Authorization': 'Bearer ' + token, 'Content-Type': m.content_type}, |
| 87 | + headers={'Authorization': 'Bearer ' + self.tb_token, 'Content-Type': m.content_type}, |
89 | 88 | params=params
|
90 | 89 | )
|
91 | 90 |
|
|
0 commit comments