|
41 | 41 | logger = logging.getLogger(__name__)
|
42 | 42 |
|
43 | 43 | WAITING_TIME_IF_NO_TASKS = 10 # seconds
|
| 44 | +MAX_NB_REGULAR_FILES_PER_COMMIT = 75 |
| 45 | +MAX_NB_LFS_FILES_PER_COMMIT = 150 |
44 | 46 |
|
45 | 47 |
|
46 | 48 | def upload_large_folder_internal(
|
@@ -373,17 +375,18 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
|
373 | 375 | if (
|
374 | 376 | status.nb_workers_commit == 0
|
375 | 377 | and status.queue_commit.qsize() > 0
|
376 |
| - and (status.last_commit_attempt is None or time.time() - status.last_commit_attempt > 5 * 60) |
| 378 | + and status.last_commit_attempt is not None |
| 379 | + and time.time() - status.last_commit_attempt > 5 * 60 |
377 | 380 | ):
|
378 | 381 | status.nb_workers_commit += 1
|
379 | 382 | logger.debug("Job: commit (more than 5 minutes since last commit attempt)")
|
380 |
| - return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25)) |
| 383 | + return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit)) |
381 | 384 |
|
382 |
| - # 2. Commit if at least 25 files are ready to commit |
383 |
| - elif status.nb_workers_commit == 0 and status.queue_commit.qsize() >= 25: |
| 385 | + # 2. Commit if at least 100 files are ready to commit |
| 386 | + elif status.nb_workers_commit == 0 and status.queue_commit.qsize() >= 150: |
384 | 387 | status.nb_workers_commit += 1
|
385 |
| - logger.debug("Job: commit (>25 files ready)") |
386 |
| - return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25)) |
| 388 | + logger.debug("Job: commit (>100 files ready)") |
| 389 | + return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit)) |
387 | 390 |
|
388 | 391 | # 3. Get upload mode if at least 10 files
|
389 | 392 | elif status.queue_get_upload_mode.qsize() >= 10:
|
@@ -430,18 +433,39 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
|
430 | 433 | logger.debug("Job: get upload mode")
|
431 | 434 | return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50))
|
432 | 435 |
|
433 |
| - # 10. Commit if at least 1 file |
434 |
| - elif status.nb_workers_commit == 0 and status.queue_commit.qsize() > 0: |
| 436 | + # 10. Commit if at least 1 file and 1 min since last commit attempt |
| 437 | + elif ( |
| 438 | + status.nb_workers_commit == 0 |
| 439 | + and status.queue_commit.qsize() > 0 |
| 440 | + and status.last_commit_attempt is not None |
| 441 | + and time.time() - status.last_commit_attempt > 1 * 60 |
| 442 | + ): |
| 443 | + status.nb_workers_commit += 1 |
| 444 | + logger.debug("Job: commit (1 min since last commit attempt)") |
| 445 | + return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit)) |
| 446 | + |
| 447 | + # 11. Commit if at least 1 file all other queues are empty and all workers are waiting |
| 448 | + # e.g. when it's the last commit |
| 449 | + elif ( |
| 450 | + status.nb_workers_commit == 0 |
| 451 | + and status.queue_commit.qsize() > 0 |
| 452 | + and status.queue_sha256.qsize() == 0 |
| 453 | + and status.queue_get_upload_mode.qsize() == 0 |
| 454 | + and status.queue_preupload_lfs.qsize() == 0 |
| 455 | + and status.nb_workers_sha256 == 0 |
| 456 | + and status.nb_workers_get_upload_mode == 0 |
| 457 | + and status.nb_workers_preupload_lfs == 0 |
| 458 | + ): |
435 | 459 | status.nb_workers_commit += 1
|
436 | 460 | logger.debug("Job: commit")
|
437 |
| - return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25)) |
| 461 | + return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit)) |
438 | 462 |
|
439 |
| - # 11. If all queues are empty, exit |
| 463 | + # 12. If all queues are empty, exit |
440 | 464 | elif all(metadata.is_committed or metadata.should_ignore for _, metadata in status.items):
|
441 | 465 | logger.info("All files have been processed! Exiting worker.")
|
442 | 466 | return None
|
443 | 467 |
|
444 |
| - # 12. If no task is available, wait |
| 468 | + # 13. If no task is available, wait |
445 | 469 | else:
|
446 | 470 | status.nb_workers_waiting += 1
|
447 | 471 | logger.debug(f"No task available, waiting... ({WAITING_TIME_IF_NO_TASKS}s)")
|
@@ -547,6 +571,30 @@ def _get_n(queue: "queue.Queue[JOB_ITEM_T]", n: int) -> List[JOB_ITEM_T]:
|
547 | 571 | return [queue.get() for _ in range(min(queue.qsize(), n))]
|
548 | 572 |
|
549 | 573 |
|
| 574 | +def _get_items_to_commit(queue: "queue.Queue[JOB_ITEM_T]") -> List[JOB_ITEM_T]: |
| 575 | + """Special case for commit job: the number of items to commit depends on the type of files.""" |
| 576 | + # Can take at most 50 regular files and/or 100 LFS files in a single commit |
| 577 | + items: List[JOB_ITEM_T] = [] |
| 578 | + nb_lfs, nb_regular = 0, 0 |
| 579 | + while True: |
| 580 | + # If empty queue => commit everything |
| 581 | + if queue.qsize() == 0: |
| 582 | + return items |
| 583 | + |
| 584 | + # If we have enough items => commit them |
| 585 | + if nb_lfs >= MAX_NB_LFS_FILES_PER_COMMIT or nb_regular >= MAX_NB_REGULAR_FILES_PER_COMMIT: |
| 586 | + return items |
| 587 | + |
| 588 | + # Else, get a new item and increase counter |
| 589 | + item = queue.get() |
| 590 | + items.append(item) |
| 591 | + _, metadata = item |
| 592 | + if metadata.upload_mode == "lfs": |
| 593 | + nb_lfs += 1 |
| 594 | + else: |
| 595 | + nb_regular += 1 |
| 596 | + |
| 597 | + |
550 | 598 | def _print_overwrite(report: str) -> None:
|
551 | 599 | """Print a report, overwriting the previous lines.
|
552 | 600 |
|
|
0 commit comments