Skip to content

upload/match pipeline refactor #480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/client/src/pages/Admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ export default function Admin(props) {
<Typography variant="h5" styles={{paddingBottom: 5}}>Run New Analysis</Typography>
<form onSubmit={handleExecute}>
<Button type="submit" variant="contained" color="primary"
disabled={statistics === 'Running' || isNewFileExist === false}>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UI is a bit wonky with this change; the whole "new files" concept is now kind of meaningless.

disabled={statistics === 'Running'}>
Run Data Analysis
</Button>
</form>
Expand Down
106 changes: 106 additions & 0 deletions src/server/alembic/versions/45a668fa6325_postgres_matching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""postgres matching

Revision ID: 45a668fa6325
Revises: fc7325372396
Create Date: 2022-02-10 16:19:13.283250

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '45a668fa6325'
down_revision = 'fc7325372396'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('manual_matches',
sa.Column('source_type_1', sa.String(), nullable=False),
sa.Column('source_id_1', sa.String(), nullable=False),
sa.Column('source_type_2', sa.String(), nullable=False),
sa.Column('source_id_2', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('source_type_1', 'source_id_1', 'source_type_2', 'source_id_2')
)
op.create_table('salesforcecontacts',
sa.Column('_id', sa.Integer(), nullable=False),
sa.Column('contact_id', sa.String(), nullable=True),
sa.Column('first_name', sa.String(), nullable=True),
sa.Column('last_name', sa.String(), nullable=True),
sa.Column('account_name', sa.String(), nullable=True),
sa.Column('mailing_country', sa.String(), nullable=True),
sa.Column('mailing_street', sa.String(), nullable=True),
sa.Column('mailing_city', sa.String(), nullable=True),
sa.Column('mailing_state_province', sa.String(), nullable=True),
sa.Column('mailing_zip_postal_code', sa.String(), nullable=True),
sa.Column('phone', sa.String(), nullable=True),
sa.Column('mobile', sa.String(), nullable=True),
sa.Column('email', sa.String(), nullable=True),
sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('created_date', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('_id')
)
op.create_table('shelterluvpeople',
sa.Column('_id', sa.Integer(), nullable=False),
sa.Column('firstname', sa.String(), nullable=True),
sa.Column('lastname', sa.String(), nullable=True),
sa.Column('id', sa.String(), nullable=True),
sa.Column('internal_id', sa.String(), nullable=True),
sa.Column('associated', sa.String(), nullable=True),
sa.Column('street', sa.String(), nullable=True),
sa.Column('apartment', sa.String(), nullable=True),
sa.Column('city', sa.String(), nullable=True),
sa.Column('state', sa.String(), nullable=True),
sa.Column('zip', sa.String(), nullable=True),
sa.Column('email', sa.String(), nullable=True),
sa.Column('phone', sa.String(), nullable=True),
sa.Column('animal_ids', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('created_date', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('_id')
)
op.create_table('volgistics',
sa.Column('_id', sa.Integer(), nullable=False),
sa.Column('number', sa.String(), nullable=True),
sa.Column('last_name', sa.String(), nullable=True),
sa.Column('first_name', sa.String(), nullable=True),
sa.Column('middle_name', sa.String(), nullable=True),
sa.Column('complete_address', sa.String(), nullable=True),
sa.Column('street_1', sa.String(), nullable=True),
sa.Column('street_2', sa.String(), nullable=True),
sa.Column('street_3', sa.String(), nullable=True),
sa.Column('city', sa.String(), nullable=True),
sa.Column('state', sa.String(), nullable=True),
sa.Column('zip', sa.String(), nullable=True),
sa.Column('all_phone_numbers', sa.String(), nullable=True),
sa.Column('home', sa.String(), nullable=True),
sa.Column('work', sa.String(), nullable=True),
sa.Column('cell', sa.String(), nullable=True),
sa.Column('email', sa.String(), nullable=True),
sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('created_date', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('_id')
)
op.create_index('idx_pdp_contacts_source_type_and_id', 'pdp_contacts', ['source_type', 'source_id'], unique=False)
op.create_index(op.f('ix_pdp_contacts_mobile'), 'pdp_contacts', ['mobile'], unique=False)
op.create_index(op.f('idx_pdp_contacts_lower_first_name'), 'pdp_contacts', [sa.text('lower(first_name)')], unique=False)
op.create_index(op.f('idx_pdp_contacts_lower_last_name'), 'pdp_contacts', [sa.text('lower(last_name)')], unique=False)
op.create_index(op.f('idx_pdp_contacts_lower_email'), 'pdp_contacts', [sa.text('lower(email)')], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_pdp_contacts_lower_email'), table_name='pdp_contacts')
op.drop_index(op.f('ix_pdp_contacts_lower_last_name'), table_name='pdp_contacts')
op.drop_index(op.f('ix_pdp_contacts_lower_first_name'), table_name='pdp_contacts')
op.drop_index(op.f('ix_pdp_contacts_mobile'), table_name='pdp_contacts')
op.drop_index('idx_pdp_contacts_source_type_and_id', table_name='pdp_contacts')
op.drop_table('volgistics')
op.drop_table('shelterluvpeople')
op.drop_table('salesforcecontacts')
op.drop_table('manual_matches')
# ### end Alembic commands ###
4 changes: 2 additions & 2 deletions src/server/api/API_ingest/ingest_sources_from_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from api.API_ingest import shelterluv_api_handler

def start():
def start(conn):
print("Start Fetching raw data from different API sources")
#Run each source to store the output in dropbox and in the container as a CSV
shelterluv_api_handler.store_shelterluv_people_all()
shelterluv_api_handler.store_shelterluv_people_all(conn)
print("Finish Fetching raw data from different API sources")
13 changes: 8 additions & 5 deletions src/server/api/API_ingest/shelterluv_api_handler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import requests
import csv
import os
import time

from constants import RAW_DATA_PATH
import requests
import pandas as pd
from api.API_ingest.dropbox_handler import upload_file_to_dropbox
from constants import RAW_DATA_PATH
from models import ShelterluvPeople

try:
from secrets_dict import SHELTERLUV_SECRET_TOKEN
Expand Down Expand Up @@ -60,7 +62,7 @@ def write_csv(json_data):

''' Iterate over all shelterlove people and store in json file in the raw data folder
We fetch 100 items in each request, since that is the limit based on our research '''
def store_shelterluv_people_all():
def store_shelterluv_people_all(conn):
offset = 0
LIMIT = 100
has_more = True
Expand Down Expand Up @@ -90,8 +92,9 @@ def store_shelterluv_people_all():
file_path = write_csv(shelterluv_people)
print("Finish storing latest shelterluvpeople results to container")


print("Start storing " + '/shelterluv/' + "results to dropbox")
upload_file_to_dropbox(file_path, '/shelterluv/' + file_path.split('/')[-1])
print("Finish storing " + '/shelterluv/' + "results to dropbox")

print("Uploading shelterluvpeople csv to database")
ShelterluvPeople.insert_from_df(pd.read_csv(file_path, dtype="string"), conn)
124 changes: 39 additions & 85 deletions src/server/api/file_uploader.py
Original file line number Diff line number Diff line change
@@ -1,101 +1,55 @@
import os
import time
import pandas as pd
import threading
import io
import re

from werkzeug.utils import secure_filename
from flask import flash, current_app
from datasource_manager import CSV_HEADERS
from datasource_manager import DATASOURCE_MAPPING
from openpyxl import load_workbook
from tempfile import NamedTemporaryFile
from config import engine
from donations_importer import validate_import_sfd
from flask import current_app
from models import ManualMatches, SalesForceContacts, ShelterluvPeople, Volgistics
from shifts_importer import validate_import_vs
from werkzeug.utils import secure_filename

from constants import RAW_DATA_PATH

SUCCESS_MSG = 'Uploaded Successfully!'
lock = threading.Lock()
SUCCESS_MSG = "Uploaded Successfully!"


def validate_and_arrange_upload(file):
current_app.logger.info("Start uploading file: " + file.filename)
filename = secure_filename(file.filename)
file_extension = filename.rpartition('.')[2]
determine_upload_type(file, file_extension)
file_extension = filename.rpartition(".")[2]
with engine.begin() as conn:
determine_upload_type(file, file_extension, conn)


def determine_upload_type(file, file_extension):
df = None
def determine_upload_type(file, file_extension, conn):
# Yes, this method of discovering what kind of file we have by looking at
# the extension and columns is silly. We'd like to get more of our data from
# automatically pulling from vendor APIs directly, in which case we'd know
# what kind of data we had.
if file_extension == "csv":
df = pd.read_csv(file, dtype="string")

if file_extension == 'csv':
dfs = [pd.read_csv(io.BytesIO(file.stream.read()), encoding='iso-8859-1')]
file.close()
else:

match = re.search('donat', file.filename, re.I)

if match: # It's a SalesForce Donations file
validate_import_sfd(file)
if {"salesforcecontacts", "volgistics", "shelterluvpeople"}.issubset(df.columns):
ManualMatches.insert_from_df(df, conn)
return
elif {"Animal_ids", "Internal-ID"}.issubset(df.columns):
ShelterluvPeople.insert_from_df(df, conn)
return
else:
match = re.search('volunteer', file.filename, re.I)
if match: # It's a Volgistics file
validate_import_vs(file)
dfs = excel_to_dataframes(file) # Also need to run Volgistics through match processing
else:
dfs = excel_to_dataframes(file) # It's a non-Volgistics, non-Shelterluv XLS? file




found_sources = 0
for df in dfs:
for src_type in CSV_HEADERS:
if set(CSV_HEADERS[src_type]).issubset(df.columns):
with lock:
found_sources += 1
filename = secure_filename(file.filename)
now = time.localtime()
now_date = time.strftime("%Y-%m-%d--%H-%M-%S", now)
current_app.logger.info(" -File: " + filename + " Matches files type: " + src_type)
df.to_csv(os.path.join(RAW_DATA_PATH, src_type + '-' + now_date + '.csv'))
clean_current_folder(src_type)
df.to_csv(os.path.join(RAW_DATA_PATH, src_type + '-' + now_date + '.csv'))
current_app.logger.info(" -Uploaded successfully as : " + src_type + '-' + now_date + '.' + file_extension)
flash(src_type + " {0} ".format(SUCCESS_MSG), 'info')
if found_sources == 0:
current_app.logger.error("\n\n !!!!!!! No sources found in upload !!!! \n Uploaded file " + file.filename + " is probably from wrong report \n !!!!!!!!!!!")


def excel_to_dataframes(xls):
df = []
wb = load_workbook(xls)

if len(wb.sheetnames) > 1:
with NamedTemporaryFile() as tmp:
wb.save(tmp.name)
for sheetname in wb.sheetnames:
for item in DATASOURCE_MAPPING:
if 'sheetname' in DATASOURCE_MAPPING[item]:
if DATASOURCE_MAPPING[item]['sheetname'] == sheetname:
tmp.seek(0)
df.append(pd.read_excel(tmp.read(), sheetname))
else:
df.append(pd.read_excel(xls))

return df

if file_extension == "xlsx":
excel_file = pd.ExcelFile(file)
if {"Master", "Service"}.issubset(excel_file.sheet_names):
# Volgistics
validate_import_vs(file, conn)
Volgistics.insert_from_file(excel_file, conn)
return

def clean_current_folder(src_type):
if os.listdir(RAW_DATA_PATH):
for file_name in os.listdir(RAW_DATA_PATH):
file_path = os.path.join(RAW_DATA_PATH, file_name)
file_name_striped = file_path.split('-')[0].split('/')[-1]
df = pd.read_excel(excel_file)
if "Contact ID 18" in df.columns:
# Salesforce something-or-other
if "Amount" in df.columns:
# Salesforce donations
validate_import_sfd(file, conn)
return
else:
# Salesforce contacts
SalesForceContacts.insert_from_file_df(df, conn)
return

if file_name_striped == src_type:
current_app.logger.info('File to remove: ' + file_path)
os.remove(file_path)
current_app.logger.info(" -Removed file: " + file_name + " from Current files folder")
current_app.logger.error(f"Don't know how to process file {file.filename}")
4 changes: 3 additions & 1 deletion src/server/api/internal_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from api.api import internal_api
from config import engine
from flask import jsonify, current_app
from datetime import datetime
from api.API_ingest import ingest_sources_from_api
Expand All @@ -24,7 +25,8 @@ def user_test2():
@internal_api.route("/api/ingestRawData", methods=["GET"])
def ingest_raw_data():
try:
ingest_sources_from_api.start()
with engine.begin() as conn:
ingest_sources_from_api.start(conn)
except Exception as e:
current_app.logger.exception(e)

Expand Down
Loading