-
Notifications
You must be signed in to change notification settings - Fork 9
upload/match pipeline refactor #480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
sposerina
merged 7 commits into
CodeForPhilly:master
from
carlos-dominguez:contacts-and-match-in-postgres
Jun 30, 2022
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
104c564
upload/match pipeline refactor
carlos-dominguez e830d20
alembic migration
carlos-dominguez d8c8150
fix dedup_consecutive
carlos-dominguez 9b44aa4
delimited names, lowercase emails
carlos-dominguez 1a86dd4
typos
carlos-dominguez 4748f3b
remove quotation marks in names
carlos-dominguez 221d595
itsdangerous hack
carlos-dominguez File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
106 changes: 106 additions & 0 deletions
106
src/server/alembic/versions/45a668fa6325_postgres_matching.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
"""postgres matching | ||
|
||
Revision ID: 45a668fa6325 | ||
Revises: fc7325372396 | ||
Create Date: 2022-02-10 16:19:13.283250 | ||
|
||
""" | ||
from alembic import op | ||
import sqlalchemy as sa | ||
from sqlalchemy.dialects import postgresql | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = '45a668fa6325' | ||
down_revision = 'fc7325372396' | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
op.create_table('manual_matches', | ||
sa.Column('source_type_1', sa.String(), nullable=False), | ||
sa.Column('source_id_1', sa.String(), nullable=False), | ||
sa.Column('source_type_2', sa.String(), nullable=False), | ||
sa.Column('source_id_2', sa.String(), nullable=False), | ||
sa.PrimaryKeyConstraint('source_type_1', 'source_id_1', 'source_type_2', 'source_id_2') | ||
) | ||
op.create_table('salesforcecontacts', | ||
sa.Column('_id', sa.Integer(), nullable=False), | ||
sa.Column('contact_id', sa.String(), nullable=True), | ||
sa.Column('first_name', sa.String(), nullable=True), | ||
sa.Column('last_name', sa.String(), nullable=True), | ||
sa.Column('account_name', sa.String(), nullable=True), | ||
sa.Column('mailing_country', sa.String(), nullable=True), | ||
sa.Column('mailing_street', sa.String(), nullable=True), | ||
sa.Column('mailing_city', sa.String(), nullable=True), | ||
sa.Column('mailing_state_province', sa.String(), nullable=True), | ||
sa.Column('mailing_zip_postal_code', sa.String(), nullable=True), | ||
sa.Column('phone', sa.String(), nullable=True), | ||
sa.Column('mobile', sa.String(), nullable=True), | ||
sa.Column('email', sa.String(), nullable=True), | ||
sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True), | ||
sa.Column('created_date', sa.DateTime(), nullable=True), | ||
sa.PrimaryKeyConstraint('_id') | ||
) | ||
op.create_table('shelterluvpeople', | ||
sa.Column('_id', sa.Integer(), nullable=False), | ||
sa.Column('firstname', sa.String(), nullable=True), | ||
sa.Column('lastname', sa.String(), nullable=True), | ||
sa.Column('id', sa.String(), nullable=True), | ||
sa.Column('internal_id', sa.String(), nullable=True), | ||
sa.Column('associated', sa.String(), nullable=True), | ||
sa.Column('street', sa.String(), nullable=True), | ||
sa.Column('apartment', sa.String(), nullable=True), | ||
sa.Column('city', sa.String(), nullable=True), | ||
sa.Column('state', sa.String(), nullable=True), | ||
sa.Column('zip', sa.String(), nullable=True), | ||
sa.Column('email', sa.String(), nullable=True), | ||
sa.Column('phone', sa.String(), nullable=True), | ||
sa.Column('animal_ids', postgresql.JSONB(astext_type=sa.Text()), nullable=True), | ||
sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True), | ||
sa.Column('created_date', sa.DateTime(), nullable=True), | ||
sa.PrimaryKeyConstraint('_id') | ||
) | ||
op.create_table('volgistics', | ||
sa.Column('_id', sa.Integer(), nullable=False), | ||
sa.Column('number', sa.String(), nullable=True), | ||
sa.Column('last_name', sa.String(), nullable=True), | ||
sa.Column('first_name', sa.String(), nullable=True), | ||
sa.Column('middle_name', sa.String(), nullable=True), | ||
sa.Column('complete_address', sa.String(), nullable=True), | ||
sa.Column('street_1', sa.String(), nullable=True), | ||
sa.Column('street_2', sa.String(), nullable=True), | ||
sa.Column('street_3', sa.String(), nullable=True), | ||
sa.Column('city', sa.String(), nullable=True), | ||
sa.Column('state', sa.String(), nullable=True), | ||
sa.Column('zip', sa.String(), nullable=True), | ||
sa.Column('all_phone_numbers', sa.String(), nullable=True), | ||
sa.Column('home', sa.String(), nullable=True), | ||
sa.Column('work', sa.String(), nullable=True), | ||
sa.Column('cell', sa.String(), nullable=True), | ||
sa.Column('email', sa.String(), nullable=True), | ||
sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True), | ||
sa.Column('created_date', sa.DateTime(), nullable=True), | ||
sa.PrimaryKeyConstraint('_id') | ||
) | ||
op.create_index('idx_pdp_contacts_source_type_and_id', 'pdp_contacts', ['source_type', 'source_id'], unique=False) | ||
op.create_index(op.f('ix_pdp_contacts_mobile'), 'pdp_contacts', ['mobile'], unique=False) | ||
op.create_index(op.f('idx_pdp_contacts_lower_first_name'), 'pdp_contacts', [sa.text('lower(first_name)')], unique=False) | ||
op.create_index(op.f('idx_pdp_contacts_lower_last_name'), 'pdp_contacts', [sa.text('lower(last_name)')], unique=False) | ||
op.create_index(op.f('idx_pdp_contacts_lower_email'), 'pdp_contacts', [sa.text('lower(email)')], unique=False) | ||
# ### end Alembic commands ### | ||
|
||
|
||
def downgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
op.drop_index(op.f('ix_pdp_contacts_lower_email'), table_name='pdp_contacts') | ||
op.drop_index(op.f('ix_pdp_contacts_lower_last_name'), table_name='pdp_contacts') | ||
op.drop_index(op.f('ix_pdp_contacts_lower_first_name'), table_name='pdp_contacts') | ||
op.drop_index(op.f('ix_pdp_contacts_mobile'), table_name='pdp_contacts') | ||
op.drop_index('idx_pdp_contacts_source_type_and_id', table_name='pdp_contacts') | ||
op.drop_table('volgistics') | ||
op.drop_table('shelterluvpeople') | ||
op.drop_table('salesforcecontacts') | ||
op.drop_table('manual_matches') | ||
# ### end Alembic commands ### |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
from api.API_ingest import shelterluv_api_handler | ||
|
||
def start(): | ||
def start(conn): | ||
print("Start Fetching raw data from different API sources") | ||
#Run each source to store the output in dropbox and in the container as a CSV | ||
shelterluv_api_handler.store_shelterluv_people_all() | ||
shelterluv_api_handler.store_shelterluv_people_all(conn) | ||
print("Finish Fetching raw data from different API sources") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,101 +1,55 @@ | ||
import os | ||
import time | ||
import pandas as pd | ||
import threading | ||
import io | ||
import re | ||
|
||
from werkzeug.utils import secure_filename | ||
from flask import flash, current_app | ||
from datasource_manager import CSV_HEADERS | ||
from datasource_manager import DATASOURCE_MAPPING | ||
from openpyxl import load_workbook | ||
from tempfile import NamedTemporaryFile | ||
from config import engine | ||
from donations_importer import validate_import_sfd | ||
from flask import current_app | ||
from models import ManualMatches, SalesForceContacts, ShelterluvPeople, Volgistics | ||
from shifts_importer import validate_import_vs | ||
from werkzeug.utils import secure_filename | ||
|
||
from constants import RAW_DATA_PATH | ||
|
||
SUCCESS_MSG = 'Uploaded Successfully!' | ||
lock = threading.Lock() | ||
SUCCESS_MSG = "Uploaded Successfully!" | ||
|
||
|
||
def validate_and_arrange_upload(file): | ||
current_app.logger.info("Start uploading file: " + file.filename) | ||
filename = secure_filename(file.filename) | ||
file_extension = filename.rpartition('.')[2] | ||
determine_upload_type(file, file_extension) | ||
file_extension = filename.rpartition(".")[2] | ||
with engine.begin() as conn: | ||
determine_upload_type(file, file_extension, conn) | ||
|
||
|
||
def determine_upload_type(file, file_extension): | ||
df = None | ||
def determine_upload_type(file, file_extension, conn): | ||
# Yes, this method of discovering what kind of file we have by looking at | ||
# the extension and columns is silly. We'd like to get more of our data from | ||
# automatically pulling from vendor APIs directly, in which case we'd know | ||
# what kind of data we had. | ||
if file_extension == "csv": | ||
df = pd.read_csv(file, dtype="string") | ||
|
||
if file_extension == 'csv': | ||
dfs = [pd.read_csv(io.BytesIO(file.stream.read()), encoding='iso-8859-1')] | ||
file.close() | ||
else: | ||
|
||
match = re.search('donat', file.filename, re.I) | ||
|
||
if match: # It's a SalesForce Donations file | ||
validate_import_sfd(file) | ||
if {"salesforcecontacts", "volgistics", "shelterluvpeople"}.issubset(df.columns): | ||
ManualMatches.insert_from_df(df, conn) | ||
return | ||
elif {"Animal_ids", "Internal-ID"}.issubset(df.columns): | ||
ShelterluvPeople.insert_from_df(df, conn) | ||
return | ||
else: | ||
match = re.search('volunteer', file.filename, re.I) | ||
if match: # It's a Volgistics file | ||
validate_import_vs(file) | ||
dfs = excel_to_dataframes(file) # Also need to run Volgistics through match processing | ||
else: | ||
dfs = excel_to_dataframes(file) # It's a non-Volgistics, non-Shelterluv XLS? file | ||
|
||
|
||
|
||
|
||
found_sources = 0 | ||
for df in dfs: | ||
for src_type in CSV_HEADERS: | ||
if set(CSV_HEADERS[src_type]).issubset(df.columns): | ||
with lock: | ||
found_sources += 1 | ||
filename = secure_filename(file.filename) | ||
now = time.localtime() | ||
now_date = time.strftime("%Y-%m-%d--%H-%M-%S", now) | ||
current_app.logger.info(" -File: " + filename + " Matches files type: " + src_type) | ||
df.to_csv(os.path.join(RAW_DATA_PATH, src_type + '-' + now_date + '.csv')) | ||
clean_current_folder(src_type) | ||
df.to_csv(os.path.join(RAW_DATA_PATH, src_type + '-' + now_date + '.csv')) | ||
current_app.logger.info(" -Uploaded successfully as : " + src_type + '-' + now_date + '.' + file_extension) | ||
flash(src_type + " {0} ".format(SUCCESS_MSG), 'info') | ||
if found_sources == 0: | ||
current_app.logger.error("\n\n !!!!!!! No sources found in upload !!!! \n Uploaded file " + file.filename + " is probably from wrong report \n !!!!!!!!!!!") | ||
|
||
|
||
def excel_to_dataframes(xls): | ||
df = [] | ||
wb = load_workbook(xls) | ||
|
||
if len(wb.sheetnames) > 1: | ||
with NamedTemporaryFile() as tmp: | ||
wb.save(tmp.name) | ||
for sheetname in wb.sheetnames: | ||
for item in DATASOURCE_MAPPING: | ||
if 'sheetname' in DATASOURCE_MAPPING[item]: | ||
if DATASOURCE_MAPPING[item]['sheetname'] == sheetname: | ||
tmp.seek(0) | ||
df.append(pd.read_excel(tmp.read(), sheetname)) | ||
else: | ||
df.append(pd.read_excel(xls)) | ||
|
||
return df | ||
|
||
if file_extension == "xlsx": | ||
excel_file = pd.ExcelFile(file) | ||
if {"Master", "Service"}.issubset(excel_file.sheet_names): | ||
# Volgistics | ||
validate_import_vs(file, conn) | ||
Volgistics.insert_from_file(excel_file, conn) | ||
return | ||
|
||
def clean_current_folder(src_type): | ||
if os.listdir(RAW_DATA_PATH): | ||
for file_name in os.listdir(RAW_DATA_PATH): | ||
file_path = os.path.join(RAW_DATA_PATH, file_name) | ||
file_name_striped = file_path.split('-')[0].split('/')[-1] | ||
df = pd.read_excel(excel_file) | ||
if "Contact ID 18" in df.columns: | ||
# Salesforce something-or-other | ||
if "Amount" in df.columns: | ||
# Salesforce donations | ||
validate_import_sfd(file, conn) | ||
return | ||
else: | ||
# Salesforce contacts | ||
SalesForceContacts.insert_from_file_df(df, conn) | ||
return | ||
|
||
if file_name_striped == src_type: | ||
current_app.logger.info('File to remove: ' + file_path) | ||
os.remove(file_path) | ||
current_app.logger.info(" -Removed file: " + file_name + " from Current files folder") | ||
current_app.logger.error(f"Don't know how to process file {file.filename}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UI is a bit wonky with this change; the whole "new files" concept is now kind of meaningless.