Skip to content

Commit d1084e0

Browse files
authored
Merge pull request #429 from CodeForPhilly/catch_flow_error
Catches errors in flow_script and sets status (in execution_status table)
2 parents 11b6223 + 27a7194 commit d1084e0

File tree

3 files changed

+114
-63
lines changed

3 files changed

+114
-63
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
"""Remove execution_status.details field size limit
2+
3+
Revision ID: a3ba63dee8f4
4+
Revises: 40be910424f0
5+
Create Date: 2021-09-18 18:14:48.044985
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
11+
12+
# revision identifiers, used by Alembic.
13+
revision = 'a3ba63dee8f4'
14+
down_revision = '40be910424f0'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
op.alter_column('execution_status',"details", type_=sa.String(None) )
21+
22+
23+
24+
def downgrade():
25+
op.alter_column('execution_status',"details", type_=sa.String(128) )
26+

src/server/api/admin_api.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ def execute():
100100
elif job_outcome == 'completed' :
101101
return jsonify({'outcome' : 'Analysis completed'}), 200
102102

103+
elif job_outcome == 'error' :
104+
return jsonify({'outcome' : 'Analysis not completed due to error'}), 500
105+
103106
else:
104107
return jsonify({'outcome' : 'Unknown status: ' + str(job_outcome)}), 200
105108

src/server/pipeline/flow_script.py

Lines changed: 85 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import os
1+
import os , sys, traceback
22
import pandas as pd
33

44
from flask import current_app
@@ -12,73 +12,95 @@
1212
def start_flow():
1313

1414
job_id = admin_api.start_job()
15+
job_outcome = None
16+
trace_back_string = None
17+
1518

1619
if not job_id:
1720
current_app.logger.info('Failed to get job_id')
1821
job_outcome = 'busy'
1922

2023
else:
21-
log_db.log_exec_status(job_id, 'start_flow', 'executing', '')
22-
23-
file_path_list = os.listdir(RAW_DATA_PATH)
24-
25-
if file_path_list:
26-
with engine.connect() as connection:
27-
Base.metadata.create_all(connection)
28-
29-
# Get previous version of pdp_contacts table, which is used later to classify new records
30-
pdp_contacts_df = pd.read_sql_table('pdp_contacts', connection)
31-
pdp_contacts_df = pdp_contacts_df[pdp_contacts_df["archived_date"].isnull()]
32-
pdp_contacts_df = pdp_contacts_df.drop(columns=['archived_date', 'created_date', '_id', 'matching_id'])
33-
34-
current_app.logger.info('Loaded {} records from pdp_contacts table'.format(pdp_contacts_df.shape[0]))
35-
36-
# Clean the input data and normalize/rename columns
37-
# Populate new records in secondary tables (donations, volunteer shifts)
38-
# input - existing files in path
39-
# output - normalized object of all entries, as well as the input json rows for primary sources
40-
log_db.log_exec_status(job_id, 'clean_and_load', 'executing', '')
41-
normalized_data, source_json, manual_matches_df = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list)
42-
43-
# Standardize column data types via postgres (e.g. reading a csv column as int vs. str)
44-
# (If additional inconsistencies are encountered, may need to enforce the schema of
45-
# the contacts loader by initializing it from pdp_contacts.)
46-
normalized_data.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
47-
normalized_data = pd.read_sql_table('_temp_pdp_contacts_loader', connection)
48-
49-
# Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB
50-
log_db.log_exec_status(job_id, 'classify', 'executing', '')
51-
rows_classified = calssify_new_data.start(pdp_contacts_df, normalized_data)
52-
53-
# Archives rows the were updated in the current state of the DB (changes their archived_date to now)
54-
archive_rows.archive(connection, rows_classified["updated"])
55-
56-
# Match new+updated records against previous version of pdp_contacts database, and
57-
# write these rows to the database.
58-
match_data.start(connection, rows_classified, manual_matches_df, job_id)
59-
60-
# Copy raw input rows to json fields in pdp_contacts,
61-
# using a temporary table to simplify the update code.
62-
current_app.logger.info('Saving json of original rows to pdp_contacts')
63-
source_json.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
64-
# https://www.postgresql.org/docs/8.4/sql-update.html
65-
connection.execute('''
66-
UPDATE pdp_contacts pdp
67-
SET json = to_json(temp.json)
68-
FROM _temp_pdp_contacts_loader temp
69-
WHERE
70-
pdp.source_type = temp.source_type AND
71-
pdp.source_id = temp.source_id AND
72-
pdp.archived_date IS NULL
73-
''')
74-
75-
current_app.logger.info('Finished flow script run')
76-
job_outcome = 'completed'
77-
78-
else: # No files in list
79-
current_app.logger.info('No files to process')
80-
job_outcome = 'nothing to do'
81-
82-
log_db.log_exec_status(job_id, 'flow', 'complete', '' )
24+
25+
try:
26+
27+
log_db.log_exec_status(job_id, 'start_flow', 'executing', '')
28+
29+
file_path_list = os.listdir(RAW_DATA_PATH)
30+
31+
if file_path_list:
32+
with engine.connect() as connection:
33+
Base.metadata.create_all(connection)
34+
35+
# Get previous version of pdp_contacts table, which is used later to classify new records
36+
pdp_contacts_df = pd.read_sql_table('pdp_contacts', connection)
37+
pdp_contacts_df = pdp_contacts_df[pdp_contacts_df["archived_date"].isnull()]
38+
pdp_contacts_df = pdp_contacts_df.drop(columns=['archived_date', 'created_date', '_id', 'matching_id'])
39+
40+
current_app.logger.info('Loaded {} records from pdp_contacts table'.format(pdp_contacts_df.shape[0]))
41+
42+
# Clean the input data and normalize/rename columns
43+
# Populate new records in secondary tables (donations, volunteer shifts)
44+
# input - existing files in path
45+
# output - normalized object of all entries, as well as the input json rows for primary sources
46+
log_db.log_exec_status(job_id, 'clean_and_load', 'executing', '')
47+
normalized_data, source_json, manual_matches_df = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list)
48+
49+
# Standardize column data types via postgres (e.g. reading a csv column as int vs. str)
50+
# (If additional inconsistencies are encountered, may need to enforce the schema of
51+
# the contacts loader by initializing it from pdp_contacts.)
52+
normalized_data.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
53+
normalized_data = pd.read_sql_table('_temp_pdp_contacts_loader', connection)
54+
55+
# Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB
56+
log_db.log_exec_status(job_id, 'classify', 'executing', '')
57+
rows_classified = calssify_new_data.start(pdp_contacts_df, normalized_data)
58+
59+
# Archives rows the were updated in the current state of the DB (changes their archived_date to now)
60+
archive_rows.archive(connection, rows_classified["updated"])
61+
62+
# Match new+updated records against previous version of pdp_contacts database, and
63+
# write these rows to the database.
64+
match_data.start(connection, rows_classified, manual_matches_df, job_id)
65+
66+
# Copy raw input rows to json fields in pdp_contacts,
67+
# using a temporary table to simplify the update code.
68+
current_app.logger.info('Saving json of original rows to pdp_contacts')
69+
source_json.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
70+
# https://www.postgresql.org/docs/8.4/sql-update.html
71+
connection.execute('''
72+
UPDATE pdp_contacts pdp
73+
SET json = to_json(temp.json)
74+
FROM _temp_pdp_contacts_loader temp
75+
WHERE
76+
pdp.source_type = temp.source_type AND
77+
pdp.source_id = temp.source_id AND
78+
pdp.archived_date IS NULL
79+
''')
80+
81+
current_app.logger.info('Finished flow script run')
82+
job_outcome = 'completed'
83+
log_db.log_exec_status(job_id, 'flow', 'complete', '' )
84+
85+
86+
87+
else: # No files in list
88+
current_app.logger.info('No files to process')
89+
job_outcome = 'nothing to do'
90+
log_db.log_exec_status(job_id, 'flow', 'complete', '' )
91+
92+
93+
except Exception as e:
94+
current_app.logger.error(e)
95+
trace_back_string = traceback.format_exc()
96+
current_app.logger.error(trace_back_string)
97+
98+
finally:
99+
if job_outcome != 'completed':
100+
101+
log_db.log_exec_status(job_id, 'flow', 'error', trace_back_string )
102+
current_app.logger.error("Uncaught error status, setting job status to \'error\' ")
103+
job_outcome = 'error'
104+
return 'error'
83105

84106
return job_outcome

0 commit comments

Comments
 (0)