-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathbase.py
95 lines (78 loc) · 2.21 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# -*- coding: utf-8 -*-
"""
Batch processing utilities
"""
from abc import ABC, abstractmethod
from typing import Any, Callable, Iterable, List, Tuple
class BasePartialProcessor(ABC):
"""
Abstract class for batch processors.
"""
def __init__(self):
self.success_messages: List = []
self.fail_messages: List = []
self.exceptions: List = []
@abstractmethod
def _prepare(self):
"""
Prepare context manager.
"""
raise NotImplementedError()
@abstractmethod
def _clean(self):
"""
Clear context manager.
"""
raise NotImplementedError()
@abstractmethod
def _process_record(self, record: Any):
"""
Process record with handler.
"""
raise NotImplementedError()
def process(self) -> List[Tuple]:
"""
Call instance's handler for each record.
"""
return [self._process_record(record) for record in self.records]
def __enter__(self):
self._prepare()
return self
def __exit__(self, exception_type, exception_value, traceback):
self._clean()
def __call__(self, records: Iterable[Any], handler: Callable):
"""
Set instance attributes before execution
Parameters
----------
records: Iterable[Any]
Iterable with objects to be processed.
handler: Callable
Callable to process "records" entries.
"""
self.records = records
self.handler = handler
return self
def success_handler(self, record: Any, result: Any):
"""
Success callback
Returns
-------
tuple
"success", result, original record
"""
entry = ("success", result, record)
self.success_messages.append(record)
return entry
def failure_handler(self, record: Any, exception: Exception):
"""
Failure callback
Returns
-------
tuple
"fail", exceptions args, original record
"""
entry = ("fail", exception.args, record)
self.exceptions.append(exception)
self.fail_messages.append(record)
return entry