Skip to content

Commit 9cd4010

Browse files
authored
Merge pull request #480 from carlos-dominguez/contacts-and-match-in-postgres
upload/match pipeline refactor
2 parents 5c224e1 + 221d595 commit 9cd4010

17 files changed

+693
-868
lines changed

src/client/src/pages/Admin.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ export default function Admin(props) {
243243
<Typography variant="h5" styles={{paddingBottom: 5}}>Run New Analysis</Typography>
244244
<form onSubmit={handleExecute}>
245245
<Button type="submit" variant="contained" color="primary"
246-
disabled={statistics === 'Running' || isNewFileExist === false}>
246+
disabled={statistics === 'Running'}>
247247
Run Data Analysis
248248
</Button>
249249
</form>
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
"""postgres matching
2+
3+
Revision ID: 45a668fa6325
4+
Revises: fc7325372396
5+
Create Date: 2022-02-10 16:19:13.283250
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
from sqlalchemy.dialects import postgresql
11+
12+
# revision identifiers, used by Alembic.
13+
revision = '45a668fa6325'
14+
down_revision = 'fc7325372396'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
op.create_table('manual_matches',
22+
sa.Column('source_type_1', sa.String(), nullable=False),
23+
sa.Column('source_id_1', sa.String(), nullable=False),
24+
sa.Column('source_type_2', sa.String(), nullable=False),
25+
sa.Column('source_id_2', sa.String(), nullable=False),
26+
sa.PrimaryKeyConstraint('source_type_1', 'source_id_1', 'source_type_2', 'source_id_2')
27+
)
28+
op.create_table('salesforcecontacts',
29+
sa.Column('_id', sa.Integer(), nullable=False),
30+
sa.Column('contact_id', sa.String(), nullable=True),
31+
sa.Column('first_name', sa.String(), nullable=True),
32+
sa.Column('last_name', sa.String(), nullable=True),
33+
sa.Column('account_name', sa.String(), nullable=True),
34+
sa.Column('mailing_country', sa.String(), nullable=True),
35+
sa.Column('mailing_street', sa.String(), nullable=True),
36+
sa.Column('mailing_city', sa.String(), nullable=True),
37+
sa.Column('mailing_state_province', sa.String(), nullable=True),
38+
sa.Column('mailing_zip_postal_code', sa.String(), nullable=True),
39+
sa.Column('phone', sa.String(), nullable=True),
40+
sa.Column('mobile', sa.String(), nullable=True),
41+
sa.Column('email', sa.String(), nullable=True),
42+
sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
43+
sa.Column('created_date', sa.DateTime(), nullable=True),
44+
sa.PrimaryKeyConstraint('_id')
45+
)
46+
op.create_table('shelterluvpeople',
47+
sa.Column('_id', sa.Integer(), nullable=False),
48+
sa.Column('firstname', sa.String(), nullable=True),
49+
sa.Column('lastname', sa.String(), nullable=True),
50+
sa.Column('id', sa.String(), nullable=True),
51+
sa.Column('internal_id', sa.String(), nullable=True),
52+
sa.Column('associated', sa.String(), nullable=True),
53+
sa.Column('street', sa.String(), nullable=True),
54+
sa.Column('apartment', sa.String(), nullable=True),
55+
sa.Column('city', sa.String(), nullable=True),
56+
sa.Column('state', sa.String(), nullable=True),
57+
sa.Column('zip', sa.String(), nullable=True),
58+
sa.Column('email', sa.String(), nullable=True),
59+
sa.Column('phone', sa.String(), nullable=True),
60+
sa.Column('animal_ids', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
61+
sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
62+
sa.Column('created_date', sa.DateTime(), nullable=True),
63+
sa.PrimaryKeyConstraint('_id')
64+
)
65+
op.create_table('volgistics',
66+
sa.Column('_id', sa.Integer(), nullable=False),
67+
sa.Column('number', sa.String(), nullable=True),
68+
sa.Column('last_name', sa.String(), nullable=True),
69+
sa.Column('first_name', sa.String(), nullable=True),
70+
sa.Column('middle_name', sa.String(), nullable=True),
71+
sa.Column('complete_address', sa.String(), nullable=True),
72+
sa.Column('street_1', sa.String(), nullable=True),
73+
sa.Column('street_2', sa.String(), nullable=True),
74+
sa.Column('street_3', sa.String(), nullable=True),
75+
sa.Column('city', sa.String(), nullable=True),
76+
sa.Column('state', sa.String(), nullable=True),
77+
sa.Column('zip', sa.String(), nullable=True),
78+
sa.Column('all_phone_numbers', sa.String(), nullable=True),
79+
sa.Column('home', sa.String(), nullable=True),
80+
sa.Column('work', sa.String(), nullable=True),
81+
sa.Column('cell', sa.String(), nullable=True),
82+
sa.Column('email', sa.String(), nullable=True),
83+
sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
84+
sa.Column('created_date', sa.DateTime(), nullable=True),
85+
sa.PrimaryKeyConstraint('_id')
86+
)
87+
op.create_index('idx_pdp_contacts_source_type_and_id', 'pdp_contacts', ['source_type', 'source_id'], unique=False)
88+
op.create_index(op.f('ix_pdp_contacts_mobile'), 'pdp_contacts', ['mobile'], unique=False)
89+
op.create_index(op.f('idx_pdp_contacts_lower_first_name'), 'pdp_contacts', [sa.text('lower(first_name)')], unique=False)
90+
op.create_index(op.f('idx_pdp_contacts_lower_last_name'), 'pdp_contacts', [sa.text('lower(last_name)')], unique=False)
91+
op.create_index(op.f('idx_pdp_contacts_lower_email'), 'pdp_contacts', [sa.text('lower(email)')], unique=False)
92+
# ### end Alembic commands ###
93+
94+
95+
def downgrade():
96+
# ### commands auto generated by Alembic - please adjust! ###
97+
op.drop_index(op.f('ix_pdp_contacts_lower_email'), table_name='pdp_contacts')
98+
op.drop_index(op.f('ix_pdp_contacts_lower_last_name'), table_name='pdp_contacts')
99+
op.drop_index(op.f('ix_pdp_contacts_lower_first_name'), table_name='pdp_contacts')
100+
op.drop_index(op.f('ix_pdp_contacts_mobile'), table_name='pdp_contacts')
101+
op.drop_index('idx_pdp_contacts_source_type_and_id', table_name='pdp_contacts')
102+
op.drop_table('volgistics')
103+
op.drop_table('shelterluvpeople')
104+
op.drop_table('salesforcecontacts')
105+
op.drop_table('manual_matches')
106+
# ### end Alembic commands ###
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from api.API_ingest import shelterluv_api_handler
22

3-
def start():
3+
def start(conn):
44
print("Start Fetching raw data from different API sources")
55
#Run each source to store the output in dropbox and in the container as a CSV
6-
shelterluv_api_handler.store_shelterluv_people_all()
6+
shelterluv_api_handler.store_shelterluv_people_all(conn)
77
print("Finish Fetching raw data from different API sources")

src/server/api/API_ingest/shelterluv_api_handler.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
import os
2-
import requests
31
import csv
2+
import os
43
import time
54

6-
from constants import RAW_DATA_PATH
5+
import requests
6+
import pandas as pd
77
from api.API_ingest.dropbox_handler import upload_file_to_dropbox
8+
from constants import RAW_DATA_PATH
9+
from models import ShelterluvPeople
810

911
try:
1012
from secrets_dict import SHELTERLUV_SECRET_TOKEN
@@ -60,7 +62,7 @@ def write_csv(json_data):
6062

6163
''' Iterate over all shelterlove people and store in json file in the raw data folder
6264
We fetch 100 items in each request, since that is the limit based on our research '''
63-
def store_shelterluv_people_all():
65+
def store_shelterluv_people_all(conn):
6466
offset = 0
6567
LIMIT = 100
6668
has_more = True
@@ -90,8 +92,9 @@ def store_shelterluv_people_all():
9092
file_path = write_csv(shelterluv_people)
9193
print("Finish storing latest shelterluvpeople results to container")
9294

93-
9495
print("Start storing " + '/shelterluv/' + "results to dropbox")
9596
upload_file_to_dropbox(file_path, '/shelterluv/' + file_path.split('/')[-1])
9697
print("Finish storing " + '/shelterluv/' + "results to dropbox")
9798

99+
print("Uploading shelterluvpeople csv to database")
100+
ShelterluvPeople.insert_from_df(pd.read_csv(file_path, dtype="string"), conn)

src/server/api/file_uploader.py

Lines changed: 39 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,101 +1,55 @@
1-
import os
2-
import time
31
import pandas as pd
4-
import threading
5-
import io
6-
import re
7-
8-
from werkzeug.utils import secure_filename
9-
from flask import flash, current_app
10-
from datasource_manager import CSV_HEADERS
11-
from datasource_manager import DATASOURCE_MAPPING
12-
from openpyxl import load_workbook
13-
from tempfile import NamedTemporaryFile
2+
from config import engine
143
from donations_importer import validate_import_sfd
4+
from flask import current_app
5+
from models import ManualMatches, SalesForceContacts, ShelterluvPeople, Volgistics
156
from shifts_importer import validate_import_vs
7+
from werkzeug.utils import secure_filename
168

17-
from constants import RAW_DATA_PATH
18-
19-
SUCCESS_MSG = 'Uploaded Successfully!'
20-
lock = threading.Lock()
9+
SUCCESS_MSG = "Uploaded Successfully!"
2110

2211

2312
def validate_and_arrange_upload(file):
2413
current_app.logger.info("Start uploading file: " + file.filename)
2514
filename = secure_filename(file.filename)
26-
file_extension = filename.rpartition('.')[2]
27-
determine_upload_type(file, file_extension)
15+
file_extension = filename.rpartition(".")[2]
16+
with engine.begin() as conn:
17+
determine_upload_type(file, file_extension, conn)
2818

2919

30-
def determine_upload_type(file, file_extension):
31-
df = None
20+
def determine_upload_type(file, file_extension, conn):
21+
# Yes, this method of discovering what kind of file we have by looking at
22+
# the extension and columns is silly. We'd like to get more of our data from
23+
# automatically pulling from vendor APIs directly, in which case we'd know
24+
# what kind of data we had.
25+
if file_extension == "csv":
26+
df = pd.read_csv(file, dtype="string")
3227

33-
if file_extension == 'csv':
34-
dfs = [pd.read_csv(io.BytesIO(file.stream.read()), encoding='iso-8859-1')]
35-
file.close()
36-
else:
37-
38-
match = re.search('donat', file.filename, re.I)
39-
40-
if match: # It's a SalesForce Donations file
41-
validate_import_sfd(file)
28+
if {"salesforcecontacts", "volgistics", "shelterluvpeople"}.issubset(df.columns):
29+
ManualMatches.insert_from_df(df, conn)
30+
return
31+
elif {"Animal_ids", "Internal-ID"}.issubset(df.columns):
32+
ShelterluvPeople.insert_from_df(df, conn)
4233
return
43-
else:
44-
match = re.search('volunteer', file.filename, re.I)
45-
if match: # It's a Volgistics file
46-
validate_import_vs(file)
47-
dfs = excel_to_dataframes(file) # Also need to run Volgistics through match processing
48-
else:
49-
dfs = excel_to_dataframes(file) # It's a non-Volgistics, non-Shelterluv XLS? file
50-
51-
52-
53-
54-
found_sources = 0
55-
for df in dfs:
56-
for src_type in CSV_HEADERS:
57-
if set(CSV_HEADERS[src_type]).issubset(df.columns):
58-
with lock:
59-
found_sources += 1
60-
filename = secure_filename(file.filename)
61-
now = time.localtime()
62-
now_date = time.strftime("%Y-%m-%d--%H-%M-%S", now)
63-
current_app.logger.info(" -File: " + filename + " Matches files type: " + src_type)
64-
df.to_csv(os.path.join(RAW_DATA_PATH, src_type + '-' + now_date + '.csv'))
65-
clean_current_folder(src_type)
66-
df.to_csv(os.path.join(RAW_DATA_PATH, src_type + '-' + now_date + '.csv'))
67-
current_app.logger.info(" -Uploaded successfully as : " + src_type + '-' + now_date + '.' + file_extension)
68-
flash(src_type + " {0} ".format(SUCCESS_MSG), 'info')
69-
if found_sources == 0:
70-
current_app.logger.error("\n\n !!!!!!! No sources found in upload !!!! \n Uploaded file " + file.filename + " is probably from wrong report \n !!!!!!!!!!!")
71-
72-
73-
def excel_to_dataframes(xls):
74-
df = []
75-
wb = load_workbook(xls)
76-
77-
if len(wb.sheetnames) > 1:
78-
with NamedTemporaryFile() as tmp:
79-
wb.save(tmp.name)
80-
for sheetname in wb.sheetnames:
81-
for item in DATASOURCE_MAPPING:
82-
if 'sheetname' in DATASOURCE_MAPPING[item]:
83-
if DATASOURCE_MAPPING[item]['sheetname'] == sheetname:
84-
tmp.seek(0)
85-
df.append(pd.read_excel(tmp.read(), sheetname))
86-
else:
87-
df.append(pd.read_excel(xls))
88-
89-
return df
9034

35+
if file_extension == "xlsx":
36+
excel_file = pd.ExcelFile(file)
37+
if {"Master", "Service"}.issubset(excel_file.sheet_names):
38+
# Volgistics
39+
validate_import_vs(file, conn)
40+
Volgistics.insert_from_file(excel_file, conn)
41+
return
9142

92-
def clean_current_folder(src_type):
93-
if os.listdir(RAW_DATA_PATH):
94-
for file_name in os.listdir(RAW_DATA_PATH):
95-
file_path = os.path.join(RAW_DATA_PATH, file_name)
96-
file_name_striped = file_path.split('-')[0].split('/')[-1]
43+
df = pd.read_excel(excel_file)
44+
if "Contact ID 18" in df.columns:
45+
# Salesforce something-or-other
46+
if "Amount" in df.columns:
47+
# Salesforce donations
48+
validate_import_sfd(file, conn)
49+
return
50+
else:
51+
# Salesforce contacts
52+
SalesForceContacts.insert_from_file_df(df, conn)
53+
return
9754

98-
if file_name_striped == src_type:
99-
current_app.logger.info('File to remove: ' + file_path)
100-
os.remove(file_path)
101-
current_app.logger.info(" -Removed file: " + file_name + " from Current files folder")
55+
current_app.logger.error(f"Don't know how to process file {file.filename}")

src/server/api/internal_api.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from api.api import internal_api
2+
from config import engine
23
from flask import jsonify, current_app
34
from datetime import datetime
45
from api.API_ingest import ingest_sources_from_api
@@ -24,7 +25,8 @@ def user_test2():
2425
@internal_api.route("/api/ingestRawData", methods=["GET"])
2526
def ingest_raw_data():
2627
try:
27-
ingest_sources_from_api.start()
28+
with engine.begin() as conn:
29+
ingest_sources_from_api.start(conn)
2830
except Exception as e:
2931
current_app.logger.exception(e)
3032

0 commit comments

Comments
 (0)