diff --git a/CHANGELOG.md b/CHANGELOG.md index e542413a..a4d62793 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,8 @@ ### Bug Fixes 1. [#636](https://github.com/influxdata/influxdb-client-python/pull/636): Handle missing data in data frames -2. [#638](https://github.com/influxdata/influxdb-client-python/pull/638), [#642](https://github.com/influxdata/influxdb-client-python/pull/642): Refactor DataFrame operations to avoid chained assignment and resolve FutureWarning in pandas, ensuring compatibility with pandas 3.0. +1. [#638](https://github.com/influxdata/influxdb-client-python/pull/638), [#642](https://github.com/influxdata/influxdb-client-python/pull/642): Refactor DataFrame operations to avoid chained assignment and resolve FutureWarning in pandas, ensuring compatibility with pandas 3.0. +1. [#641](https://github.com/influxdata/influxdb-client-python/pull/641): Correctly dispose ThreadPoolScheduler in WriteApi ### Documentation 1. [#639](https://github.com/influxdata/influxdb-client-python/pull/639): Use Markdown for `README` diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 050a7a5c..3b3db68f 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -250,16 +250,18 @@ def __init__(self, self._success_callback = kwargs.get('success_callback', None) self._error_callback = kwargs.get('error_callback', None) self._retry_callback = kwargs.get('retry_callback', None) + self._window_scheduler = None if self._write_options.write_type is WriteType.batching: # Define Subject that listen incoming data and produces writes into InfluxDB self._subject = Subject() + self._window_scheduler = ThreadPoolScheduler(1) self._disposable = self._subject.pipe( # Split incoming data to windows by batch_size or flush_interval ops.window_with_time_or_count(count=write_options.batch_size, timespan=timedelta(milliseconds=write_options.flush_interval), - scheduler=ThreadPoolScheduler(1)), + scheduler=self._window_scheduler), # Map window into groups defined by 'organization', 'bucket' and 'precision' ops.flat_map(lambda window: window.pipe( # Group window by 'organization', 'bucket' and 'precision' @@ -440,6 +442,10 @@ def __del__(self): ) break + if self._window_scheduler: + self._window_scheduler.executor.shutdown(wait=False) + self._window_scheduler = None + if self._disposable: self._disposable = None pass @@ -565,6 +571,7 @@ def __getstate__(self): # Remove rx del state['_subject'] del state['_disposable'] + del state['_window_scheduler'] del state['_write_service'] return state