7
7
from config import RAW_DATA_PATH
8
8
from config import engine
9
9
from models import Base
10
+
11
+ import time
12
+
10
13
from rfm_funcs .create_scores import create_scores
11
14
12
15
def start_flow ():
13
-
16
+ start = time . time ()
14
17
job_id = admin_api .start_job ()
15
18
job_outcome = None
16
19
trace_back_string = None
@@ -22,71 +25,64 @@ def start_flow():
22
25
23
26
else :
24
27
28
+
25
29
try :
26
30
27
31
log_db .log_exec_status (job_id , 'start_flow' , 'executing' , '' )
28
32
29
33
file_path_list = os .listdir (RAW_DATA_PATH )
30
34
31
35
if file_path_list :
32
- with engine .connect () as connection :
33
- Base .metadata .create_all (connection )
34
-
35
- # Get previous version of pdp_contacts table, which is used later to classify new records
36
- pdp_contacts_df = pd .read_sql_table ('pdp_contacts' , connection )
37
- pdp_contacts_df = pdp_contacts_df [pdp_contacts_df ["archived_date" ].isnull ()]
38
- pdp_contacts_df = pdp_contacts_df .drop (columns = ['archived_date' , 'created_date' , '_id' , 'matching_id' ])
39
-
40
- current_app .logger .info ('Loaded {} records from pdp_contacts table' .format (pdp_contacts_df .shape [0 ]))
41
-
42
- # Clean the input data and normalize/rename columns
43
- # Populate new records in secondary tables (donations, volunteer shifts)
44
- # input - existing files in path
45
- # output - normalized object of all entries, as well as the input json rows for primary sources
46
- log_db .log_exec_status (job_id , 'clean_and_load' , 'executing' , '' )
47
- normalized_data , source_json , manual_matches_df = clean_and_load_data .start (connection , pdp_contacts_df , file_path_list )
48
-
49
- # Standardize column data types via postgres (e.g. reading a csv column as int vs. str)
50
- # (If additional inconsistencies are encountered, may need to enforce the schema of
51
- # the contacts loader by initializing it from pdp_contacts.)
52
- normalized_data .to_sql ('_temp_pdp_contacts_loader' , connection , index = False , if_exists = 'replace' )
53
- normalized_data = pd .read_sql_table ('_temp_pdp_contacts_loader' , connection )
54
-
55
- # Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB
56
- log_db .log_exec_status (job_id , 'classify' , 'executing' , '' )
57
- rows_classified = calssify_new_data .start (pdp_contacts_df , normalized_data )
58
-
59
- # Archives rows the were updated in the current state of the DB (changes their archived_date to now)
60
- archive_rows .archive (connection , rows_classified ["updated" ])
61
-
62
- # Match new+updated records against previous version of pdp_contacts database, and
63
- # write these rows to the database.
64
- match_data .start (connection , rows_classified , manual_matches_df , job_id )
65
-
66
- # Copy raw input rows to json fields in pdp_contacts,
67
- # using a temporary table to simplify the update code.
68
- current_app .logger .info ('Saving json of original rows to pdp_contacts' )
69
- source_json .to_sql ('_temp_pdp_contacts_loader' , connection , index = False , if_exists = 'replace' )
70
- # https://www.postgresql.org/docs/8.4/sql-update.html
71
- connection .execute ('''
72
- UPDATE pdp_contacts pdp
73
- SET json = to_json(temp.json)
74
- FROM _temp_pdp_contacts_loader temp
75
- WHERE
76
- pdp.source_type = temp.source_type AND
77
- pdp.source_id = temp.source_id AND
78
- pdp.archived_date IS NULL
79
- ''' )
80
-
81
- current_app .logger .info ('Finished flow script run, running RFM scoring' )
82
-
83
- score_result = create_scores () # Run RFM scoring on newly-processed donations
84
- current_app .logger .info ('Scored ' + str (score_result ) + ' tuples' )
85
-
86
- job_outcome = 'completed'
87
- log_db .log_exec_status (job_id , 'flow' , 'complete' , '' )
88
-
89
-
36
+ with engine .begin () as connection :
37
+
38
+ # Get previous version of pdp_contacts table, which is used later to classify new records
39
+ pdp_contacts_df = pd .read_sql_table ('pdp_contacts' , connection )
40
+ pdp_contacts_df = pdp_contacts_df [pdp_contacts_df ["archived_date" ].isnull ()]
41
+ pdp_contacts_df = pdp_contacts_df .drop (columns = ['archived_date' , 'created_date' , '_id' , 'matching_id' ])
42
+
43
+ current_app .logger .info ('Loaded {} records from pdp_contacts table' .format (pdp_contacts_df .shape [0 ]))
44
+
45
+ # Clean the input data and normalize/rename columns
46
+ # Populate new records in secondary tables (donations, volunteer shifts)
47
+ # input - existing files in path
48
+ # output - normalized object of all entries, as well as the input json rows for primary sources
49
+ log_db .log_exec_status (job_id , 'clean_and_load' , 'executing' , '' )
50
+ normalized_data , source_json , manual_matches_df = clean_and_load_data .start (connection , pdp_contacts_df , file_path_list )
51
+ # Standardize column data types via postgres (e.g. reading a csv column as int vs. str)
52
+ # (If additional inconsistencies are encountered, may need to enforce the schema of
53
+ # the contacts loader by initializing it from pdp_contacts.)
54
+ normalized_data .to_sql ('_temp_pdp_contacts_loader' , connection , index = False , if_exists = 'replace' )
55
+ normalized_data = pd .read_sql_table ('_temp_pdp_contacts_loader' , connection )
56
+
57
+ # Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB
58
+ log_db .log_exec_status (job_id , 'classify' , 'executing' , '' )
59
+ rows_classified = calssify_new_data .start (pdp_contacts_df , normalized_data )
60
+
61
+ # Archives rows the were updated in the current state of the DB (changes their archived_date to now)
62
+ archive_rows .archive (connection , rows_classified ["updated" ])
63
+
64
+ # Match new+updated records against previous version of pdp_contacts database, and
65
+ # write these rows to the database.
66
+ match_data .start (connection , rows_classified , manual_matches_df , job_id )
67
+
68
+ # Copy raw input rows to json fields in pdp_contacts,
69
+ # using a temporary table to simplify the update code.
70
+ current_app .logger .info ('Saving json of original rows to pdp_contacts' )
71
+ source_json .to_sql ('_temp_pdp_contacts_loader' , connection , index = False , if_exists = 'replace' )
72
+ # https://www.postgresql.org/docs/8.4/sql-update.html
73
+ connection .execute ('''
74
+ UPDATE pdp_contacts pdp
75
+ SET json = to_json(temp.json)
76
+ FROM _temp_pdp_contacts_loader temp
77
+ WHERE
78
+ pdp.source_type = temp.source_type AND
79
+ pdp.source_id = temp.source_id AND
80
+ pdp.archived_date IS NULL
81
+ ''' )
82
+
83
+ current_app .logger .info ('Finished flow script run' )
84
+ job_outcome = 'completed'
85
+ log_db .log_exec_status (job_id , 'flow' , 'complete' , '' )
90
86
91
87
else : # No files in list
92
88
current_app .logger .info ('No files to process' )
@@ -107,4 +103,5 @@ def start_flow():
107
103
job_outcome = 'error'
108
104
return 'error'
109
105
106
+ current_app .logger .info ('Pipeline execution took {} seconds ' .format (time .time () - start ))
110
107
return job_outcome
0 commit comments