Skip to content

WriteApi should use ThreadPoolScheduler for batching #561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
goznauk opened this issue Feb 22, 2023 · 1 comment · Fixed by #562
Closed

WriteApi should use ThreadPoolScheduler for batching #561

goznauk opened this issue Feb 22, 2023 · 1 comment · Fixed by #562
Milestone

Comments

@goznauk
Copy link
Contributor

goznauk commented Feb 22, 2023

When creating WriteApi instance, window_with_time_or_count is called, which is usingTimeoutScheduler for default. This creates new thread for every flush interval.

    def __init__(self,
                 influxdb_client,
                 write_options: WriteOptions = WriteOptions(),
                 point_settings: PointSettings = PointSettings(),
                 **kwargs) -> None:
   
        ...

        if self._write_options.write_type is WriteType.batching:
            # Define Subject that listen incoming data and produces writes into InfluxDB
            self._subject = Subject()

            self._disposable = self._subject.pipe(
                # Split incoming data to windows by batch_size or flush_interval
                ops.window_with_time_or_count(count=write_options.batch_size,
                                              timespan=timedelta(milliseconds=write_options.flush_interval)),
                ...
  

Instead of TimeoutScheduler creating millions of threads, we can just use ThreadPoolScheduler(1) for handling window. TimeoutScheduler might not make sense to use for python. It can easily changed by just a line of code.

   ops.window_with_time_or_count(count=write_options.batch_size,
                                              timespan=timedelta(milliseconds=write_options.flush_interval), scheduler=ThreadPoolScheduler(1)),
         
goznauk added a commit to goznauk/influxdb-client-python that referenced this issue Feb 22, 2023
@goznauk
Copy link
Contributor Author

goznauk commented Mar 20, 2023

Quick Fix

# noinspection PyProtectedMember
def monkey_patch_no_timeout_scheduler(write_api):
    # https://github.com/influxdata/influxdb-client-python/pull/562
    if write_api._write_options.write_type is WriteType.batching:
        write_api._subject.on_completed()
        # Define Subject that listen incoming data and produces writes into InfluxDB
        write_api._subject = Subject()
        write_api._disposable = write_api._subject.pipe(
            # Split incoming data to windows by batch_size or flush_interval
            ops.window_with_time_or_count(count=write_api._write_options.batch_size,
                                          timespan=timedelta(milliseconds=write_api._write_options.flush_interval),
                                          scheduler=ThreadPoolScheduler(1)),
            # Map  window into groups defined by 'organization', 'bucket' and 'precision'
            ops.flat_map(lambda window: window.pipe(
                # Group window by 'organization', 'bucket' and 'precision'
                ops.group_by(lambda batch_item: batch_item.key),
                # Create batch (concatenation line protocols by \n)
                ops.map(lambda group: group.pipe(
                    ops.to_iterable(),
                    ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs)))
                )),
                ops.merge_all()
            )),
            # Write data into InfluxDB (possibility to retry if its fail)
            ops.filter(lambda batch: batch.size > 0),
            ops.map(mapper=lambda batch: write_api._to_response(data=batch, delay=write_api._jitter_delay())),
            ops.merge_all()
        ).subscribe(write_api._on_next, write_api._on_error, write_api._on_complete)
write_api = client.write_api(write_options=WriteOptions(...))
monkey_patch_no_timeout_scheduler(write_api)

goznauk added a commit to goznauk/influxdb-client-python that referenced this issue Aug 5, 2023
Use ThreadPoolScheduler for WriteApi batch subject instead of TimeoutScheduler.

Fixes influxdata#561
bednar pushed a commit to goznauk/influxdb-client-python that referenced this issue Jan 4, 2024
Use ThreadPoolScheduler for WriteApi batch subject instead of TimeoutScheduler.

Fixes influxdata#561
@bednar bednar added this to the 1.40.0 milestone Jan 4, 2024
bednar added a commit that referenced this issue Jan 4, 2024
* fix: prevent creating unnecessary threads repeatedly

Use ThreadPoolScheduler for WriteApi batch subject instead of TimeoutScheduler.

Fixes #561

* docs: Update CHANGELOG.md

* docs: update CHANGELOG.md

---------

Co-authored-by: Jakub Bednář <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants