@@ -80,27 +80,21 @@ def read(self, count=0, block=True, timeout=None):
80
80
self ._pre_cb ()
81
81
# END pre callback
82
82
83
- # if we have count items, don't do any queue preparation - if someone
84
- # depletes the queue in the meanwhile, the channel will close and
85
- # we will unblock naturally
86
- # PROBLEM: If there are multiple consumer of this channel, we might
87
- # run out of items without being replenished == block forever in the
88
- # worst case. task.min_count could have triggered to produce more ...
89
- # usually per read with n items, we put n items on to the queue,
90
- # so we wouldn't check this
91
- # Even if we have just one consumer ( we could determine that with
92
- # the reference count ), it could be that in one moment we don't yet
93
- # have an item, but its currently being produced by some worker.
94
- # This is why we:
95
- # * make no assumptions if there are multiple consumers
96
- # *
83
+ # NOTE: we always queue the operation that would give us count items
84
+ # as tracking the scheduled items or testing the channels size
85
+ # is in herently unsafe depending on the design of the task network
86
+ # If we put on tasks onto the queue for every request, we are sure
87
+ # to always produce enough items, even if the task.min_count actually
88
+ # provided enough - its better to have some possibly empty task runs
89
+ # than having and empty queue that blocks.
90
+
91
+ # NOTE: TODO: that case is only possible if one Task could be connected
92
+ # to multiple input channels in a manner known by the system. Currently
93
+ # this is not possible, but should be implemented at some point
97
94
98
95
# if the user tries to use us to read from a done task, we will never
99
96
# compute as all produced items are already in the channel
100
97
skip_compute = self ._task .is_done () or self ._task .error ()
101
- #if count > 0:
102
- # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
103
- # END
104
98
105
99
########## prepare ##############################
106
100
if not skip_compute :
@@ -249,13 +243,6 @@ def _prepare_channel_read(self, task, count):
249
243
# raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
250
244
# END skip processing
251
245
252
- # if the task does not have the required output on its queue, schedule
253
- # it for processing. If we should process all, we don't care about the
254
- # amount as it should process until its all done.
255
- #if count > 1 and task._out_wc.size() >= count:
256
- # continue
257
- # END skip if we have enough
258
-
259
246
# but use the actual count to produce the output, we may produce
260
247
# more than requested
261
248
numchunks = 1
@@ -283,33 +270,26 @@ def _prepare_channel_read(self, task, count):
283
270
queue = self ._queue
284
271
if numchunks > 1 :
285
272
for i in xrange (numchunks ):
286
- # schedule them as early as we know about them
287
- task .add_scheduled_items (chunksize )
288
273
queue .put ((task .process , chunksize ))
289
274
# END for each chunk to put
290
275
else :
291
- task .add_scheduled_items (chunksize )
292
276
queue .put ((task .process , chunksize ))
293
277
# END try efficient looping
294
278
295
279
if remainder :
296
- task .add_scheduled_items (remainder )
297
280
queue .put ((task .process , remainder ))
298
281
# END handle chunksize
299
282
else :
300
283
# no workers, so we have to do the work ourselves
301
284
if numchunks > 1 :
302
285
for i in xrange (numchunks ):
303
- task .add_scheduled_items (chunksize )
304
286
task .process (chunksize )
305
287
# END for each chunk to put
306
288
else :
307
- task .add_scheduled_items (chunksize )
308
289
task .process (chunksize )
309
290
# END try efficient looping
310
291
311
292
if remainder :
312
- task .add_scheduled_items (remainder )
313
293
task .process (remainder )
314
294
# END handle chunksize
315
295
# END handle serial mode
@@ -452,7 +432,6 @@ def add_task(self, task):
452
432
# This brings about 15% more performance, but sacrifices thread-safety
453
433
# when reading from multiple threads.
454
434
if self .size () == 0 :
455
- task ._slock = DummyLock ()
456
435
wctype = SerialWChannel
457
436
# END improve locks
458
437
0 commit comments