3
3
from datetime import datetime
4
4
5
5
from influxdb_client import WritePrecision , InfluxDBClient
6
- from influxdb_client .client .exceptions import InfluxDBError
7
6
from influxdb_client .client .util .date_utils import get_date_helper
8
7
from influxdb_client .client .util .multiprocessing_helper import MultiprocessingWriter
9
8
from influxdb_client .client .write_api import SYNCHRONOUS
10
9
11
10
12
- # noinspection PyUnusedLocal
13
- def _error_callback (conf : (str , str , str ), data : str , error : InfluxDBError ):
14
- with open ("test_MultiprocessingWriter.txt" , "w+" ) as file :
15
- file .write (error .message )
16
-
17
-
18
11
# noinspection PyMethodMayBeStatic
19
12
class MultiprocessingWriterTest (unittest .TestCase ):
20
13
@@ -23,8 +16,6 @@ def setUp(self) -> None:
23
16
self .token = os .getenv ('INFLUXDB_V2_TOKEN' , "my-token" )
24
17
self .org = os .getenv ('INFLUXDB_V2_ORG' , "my-org" )
25
18
self .writer = None
26
- if os .path .exists ("test_MultiprocessingWriter.txt" ):
27
- os .remove ("test_MultiprocessingWriter.txt" )
28
19
29
20
def tearDown (self ) -> None :
30
21
if self .writer :
@@ -79,10 +70,3 @@ def test_pass_parameters(self):
79
70
self .assertEqual ("a" , record ["tag" ])
80
71
self .assertEqual (5 , record ["_value" ])
81
72
self .assertEqual (get_date_helper ().to_utc (datetime .utcfromtimestamp (10 )), record ["_time" ])
82
-
83
- def test_wrong_configuration (self ):
84
-
85
- with MultiprocessingWriter (url = self .url , token = "ddd" , org = self .org , error_callback = _error_callback ) as writer :
86
- writer .write (bucket = "my-bucket" , record = f"mem,tag=a value=5i 10" , write_precision = WritePrecision .S )
87
-
88
- self .assertTrue (os .path .exists ("test_MultiprocessingWriter.txt" ))
0 commit comments