-
Notifications
You must be signed in to change notification settings - Fork 185
WriteApi should use ThreadPoolScheduler for batching #561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Milestone
Comments
goznauk
added a commit
to goznauk/influxdb-client-python
that referenced
this issue
Feb 22, 2023
5 tasks
Quick Fix # noinspection PyProtectedMember
def monkey_patch_no_timeout_scheduler(write_api):
# https://github.com/influxdata/influxdb-client-python/pull/562
if write_api._write_options.write_type is WriteType.batching:
write_api._subject.on_completed()
# Define Subject that listen incoming data and produces writes into InfluxDB
write_api._subject = Subject()
write_api._disposable = write_api._subject.pipe(
# Split incoming data to windows by batch_size or flush_interval
ops.window_with_time_or_count(count=write_api._write_options.batch_size,
timespan=timedelta(milliseconds=write_api._write_options.flush_interval),
scheduler=ThreadPoolScheduler(1)),
# Map window into groups defined by 'organization', 'bucket' and 'precision'
ops.flat_map(lambda window: window.pipe(
# Group window by 'organization', 'bucket' and 'precision'
ops.group_by(lambda batch_item: batch_item.key),
# Create batch (concatenation line protocols by \n)
ops.map(lambda group: group.pipe(
ops.to_iterable(),
ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs)))
)),
ops.merge_all()
)),
# Write data into InfluxDB (possibility to retry if its fail)
ops.filter(lambda batch: batch.size > 0),
ops.map(mapper=lambda batch: write_api._to_response(data=batch, delay=write_api._jitter_delay())),
ops.merge_all()
).subscribe(write_api._on_next, write_api._on_error, write_api._on_complete) write_api = client.write_api(write_options=WriteOptions(...))
monkey_patch_no_timeout_scheduler(write_api) |
goznauk
added a commit
to goznauk/influxdb-client-python
that referenced
this issue
Aug 5, 2023
Use ThreadPoolScheduler for WriteApi batch subject instead of TimeoutScheduler. Fixes influxdata#561
bednar
pushed a commit
to goznauk/influxdb-client-python
that referenced
this issue
Jan 4, 2024
Use ThreadPoolScheduler for WriteApi batch subject instead of TimeoutScheduler. Fixes influxdata#561
bednar
added a commit
that referenced
this issue
Jan 4, 2024
* fix: prevent creating unnecessary threads repeatedly Use ThreadPoolScheduler for WriteApi batch subject instead of TimeoutScheduler. Fixes #561 * docs: Update CHANGELOG.md * docs: update CHANGELOG.md --------- Co-authored-by: Jakub Bednář <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
When creating WriteApi instance,
window_with_time_or_count
is called, which is usingTimeoutScheduler
for default. This creates new thread for every flush interval.Instead of
TimeoutScheduler
creating millions of threads, we can just use ThreadPoolScheduler(1) for handling window. TimeoutScheduler might not make sense to use for python. It can easily changed by just a line of code.The text was updated successfully, but these errors were encountered: