diff --git a/src/client/src/pages/Admin.js b/src/client/src/pages/Admin.js index 74e9084f..f3b8c51b 100755 --- a/src/client/src/pages/Admin.js +++ b/src/client/src/pages/Admin.js @@ -243,7 +243,7 @@ export default function Admin(props) { Run New Analysis
diff --git a/src/server/alembic/versions/45a668fa6325_postgres_matching.py b/src/server/alembic/versions/45a668fa6325_postgres_matching.py new file mode 100644 index 00000000..86c5257a --- /dev/null +++ b/src/server/alembic/versions/45a668fa6325_postgres_matching.py @@ -0,0 +1,106 @@ +"""postgres matching + +Revision ID: 45a668fa6325 +Revises: fc7325372396 +Create Date: 2022-02-10 16:19:13.283250 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '45a668fa6325' +down_revision = 'fc7325372396' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('manual_matches', + sa.Column('source_type_1', sa.String(), nullable=False), + sa.Column('source_id_1', sa.String(), nullable=False), + sa.Column('source_type_2', sa.String(), nullable=False), + sa.Column('source_id_2', sa.String(), nullable=False), + sa.PrimaryKeyConstraint('source_type_1', 'source_id_1', 'source_type_2', 'source_id_2') + ) + op.create_table('salesforcecontacts', + sa.Column('_id', sa.Integer(), nullable=False), + sa.Column('contact_id', sa.String(), nullable=True), + sa.Column('first_name', sa.String(), nullable=True), + sa.Column('last_name', sa.String(), nullable=True), + sa.Column('account_name', sa.String(), nullable=True), + sa.Column('mailing_country', sa.String(), nullable=True), + sa.Column('mailing_street', sa.String(), nullable=True), + sa.Column('mailing_city', sa.String(), nullable=True), + sa.Column('mailing_state_province', sa.String(), nullable=True), + sa.Column('mailing_zip_postal_code', sa.String(), nullable=True), + sa.Column('phone', sa.String(), nullable=True), + sa.Column('mobile', sa.String(), nullable=True), + sa.Column('email', sa.String(), nullable=True), + sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column('created_date', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('_id') + ) + op.create_table('shelterluvpeople', + sa.Column('_id', sa.Integer(), nullable=False), + sa.Column('firstname', sa.String(), nullable=True), + sa.Column('lastname', sa.String(), nullable=True), + sa.Column('id', sa.String(), nullable=True), + sa.Column('internal_id', sa.String(), nullable=True), + sa.Column('associated', sa.String(), nullable=True), + sa.Column('street', sa.String(), nullable=True), + sa.Column('apartment', sa.String(), nullable=True), + sa.Column('city', sa.String(), nullable=True), + sa.Column('state', sa.String(), nullable=True), + sa.Column('zip', sa.String(), nullable=True), + sa.Column('email', sa.String(), nullable=True), + sa.Column('phone', sa.String(), nullable=True), + sa.Column('animal_ids', postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column('created_date', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('_id') + ) + op.create_table('volgistics', + sa.Column('_id', sa.Integer(), nullable=False), + sa.Column('number', sa.String(), nullable=True), + sa.Column('last_name', sa.String(), nullable=True), + sa.Column('first_name', sa.String(), nullable=True), + sa.Column('middle_name', sa.String(), nullable=True), + sa.Column('complete_address', sa.String(), nullable=True), + sa.Column('street_1', sa.String(), nullable=True), + sa.Column('street_2', sa.String(), nullable=True), + sa.Column('street_3', sa.String(), nullable=True), + sa.Column('city', sa.String(), nullable=True), + sa.Column('state', sa.String(), nullable=True), + sa.Column('zip', sa.String(), nullable=True), + sa.Column('all_phone_numbers', sa.String(), nullable=True), + sa.Column('home', sa.String(), nullable=True), + sa.Column('work', sa.String(), nullable=True), + sa.Column('cell', sa.String(), nullable=True), + sa.Column('email', sa.String(), nullable=True), + sa.Column('json', postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column('created_date', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('_id') + ) + op.create_index('idx_pdp_contacts_source_type_and_id', 'pdp_contacts', ['source_type', 'source_id'], unique=False) + op.create_index(op.f('ix_pdp_contacts_mobile'), 'pdp_contacts', ['mobile'], unique=False) + op.create_index(op.f('idx_pdp_contacts_lower_first_name'), 'pdp_contacts', [sa.text('lower(first_name)')], unique=False) + op.create_index(op.f('idx_pdp_contacts_lower_last_name'), 'pdp_contacts', [sa.text('lower(last_name)')], unique=False) + op.create_index(op.f('idx_pdp_contacts_lower_email'), 'pdp_contacts', [sa.text('lower(email)')], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_pdp_contacts_lower_email'), table_name='pdp_contacts') + op.drop_index(op.f('ix_pdp_contacts_lower_last_name'), table_name='pdp_contacts') + op.drop_index(op.f('ix_pdp_contacts_lower_first_name'), table_name='pdp_contacts') + op.drop_index(op.f('ix_pdp_contacts_mobile'), table_name='pdp_contacts') + op.drop_index('idx_pdp_contacts_source_type_and_id', table_name='pdp_contacts') + op.drop_table('volgistics') + op.drop_table('shelterluvpeople') + op.drop_table('salesforcecontacts') + op.drop_table('manual_matches') + # ### end Alembic commands ### diff --git a/src/server/api/API_ingest/ingest_sources_from_api.py b/src/server/api/API_ingest/ingest_sources_from_api.py index 3c1d8538..25e92f8c 100644 --- a/src/server/api/API_ingest/ingest_sources_from_api.py +++ b/src/server/api/API_ingest/ingest_sources_from_api.py @@ -1,7 +1,7 @@ from api.API_ingest import shelterluv_api_handler -def start(): +def start(conn): print("Start Fetching raw data from different API sources") #Run each source to store the output in dropbox and in the container as a CSV - shelterluv_api_handler.store_shelterluv_people_all() + shelterluv_api_handler.store_shelterluv_people_all(conn) print("Finish Fetching raw data from different API sources") \ No newline at end of file diff --git a/src/server/api/API_ingest/shelterluv_api_handler.py b/src/server/api/API_ingest/shelterluv_api_handler.py index 1b22dcab..e757d10a 100644 --- a/src/server/api/API_ingest/shelterluv_api_handler.py +++ b/src/server/api/API_ingest/shelterluv_api_handler.py @@ -1,10 +1,12 @@ -import os -import requests import csv +import os import time -from constants import RAW_DATA_PATH +import requests +import pandas as pd from api.API_ingest.dropbox_handler import upload_file_to_dropbox +from constants import RAW_DATA_PATH +from models import ShelterluvPeople try: from secrets_dict import SHELTERLUV_SECRET_TOKEN @@ -60,7 +62,7 @@ def write_csv(json_data): ''' Iterate over all shelterlove people and store in json file in the raw data folder We fetch 100 items in each request, since that is the limit based on our research ''' -def store_shelterluv_people_all(): +def store_shelterluv_people_all(conn): offset = 0 LIMIT = 100 has_more = True @@ -90,8 +92,9 @@ def store_shelterluv_people_all(): file_path = write_csv(shelterluv_people) print("Finish storing latest shelterluvpeople results to container") - print("Start storing " + '/shelterluv/' + "results to dropbox") upload_file_to_dropbox(file_path, '/shelterluv/' + file_path.split('/')[-1]) print("Finish storing " + '/shelterluv/' + "results to dropbox") + print("Uploading shelterluvpeople csv to database") + ShelterluvPeople.insert_from_df(pd.read_csv(file_path, dtype="string"), conn) diff --git a/src/server/api/file_uploader.py b/src/server/api/file_uploader.py index d84adb9f..256ecbaf 100644 --- a/src/server/api/file_uploader.py +++ b/src/server/api/file_uploader.py @@ -1,101 +1,55 @@ -import os -import time import pandas as pd -import threading -import io -import re - -from werkzeug.utils import secure_filename -from flask import flash, current_app -from datasource_manager import CSV_HEADERS -from datasource_manager import DATASOURCE_MAPPING -from openpyxl import load_workbook -from tempfile import NamedTemporaryFile +from config import engine from donations_importer import validate_import_sfd +from flask import current_app +from models import ManualMatches, SalesForceContacts, ShelterluvPeople, Volgistics from shifts_importer import validate_import_vs +from werkzeug.utils import secure_filename -from constants import RAW_DATA_PATH - -SUCCESS_MSG = 'Uploaded Successfully!' -lock = threading.Lock() +SUCCESS_MSG = "Uploaded Successfully!" def validate_and_arrange_upload(file): current_app.logger.info("Start uploading file: " + file.filename) filename = secure_filename(file.filename) - file_extension = filename.rpartition('.')[2] - determine_upload_type(file, file_extension) + file_extension = filename.rpartition(".")[2] + with engine.begin() as conn: + determine_upload_type(file, file_extension, conn) -def determine_upload_type(file, file_extension): - df = None +def determine_upload_type(file, file_extension, conn): + # Yes, this method of discovering what kind of file we have by looking at + # the extension and columns is silly. We'd like to get more of our data from + # automatically pulling from vendor APIs directly, in which case we'd know + # what kind of data we had. + if file_extension == "csv": + df = pd.read_csv(file, dtype="string") - if file_extension == 'csv': - dfs = [pd.read_csv(io.BytesIO(file.stream.read()), encoding='iso-8859-1')] - file.close() - else: - - match = re.search('donat', file.filename, re.I) - - if match: # It's a SalesForce Donations file - validate_import_sfd(file) + if {"salesforcecontacts", "volgistics", "shelterluvpeople"}.issubset(df.columns): + ManualMatches.insert_from_df(df, conn) + return + elif {"Animal_ids", "Internal-ID"}.issubset(df.columns): + ShelterluvPeople.insert_from_df(df, conn) return - else: - match = re.search('volunteer', file.filename, re.I) - if match: # It's a Volgistics file - validate_import_vs(file) - dfs = excel_to_dataframes(file) # Also need to run Volgistics through match processing - else: - dfs = excel_to_dataframes(file) # It's a non-Volgistics, non-Shelterluv XLS? file - - - - - found_sources = 0 - for df in dfs: - for src_type in CSV_HEADERS: - if set(CSV_HEADERS[src_type]).issubset(df.columns): - with lock: - found_sources += 1 - filename = secure_filename(file.filename) - now = time.localtime() - now_date = time.strftime("%Y-%m-%d--%H-%M-%S", now) - current_app.logger.info(" -File: " + filename + " Matches files type: " + src_type) - df.to_csv(os.path.join(RAW_DATA_PATH, src_type + '-' + now_date + '.csv')) - clean_current_folder(src_type) - df.to_csv(os.path.join(RAW_DATA_PATH, src_type + '-' + now_date + '.csv')) - current_app.logger.info(" -Uploaded successfully as : " + src_type + '-' + now_date + '.' + file_extension) - flash(src_type + " {0} ".format(SUCCESS_MSG), 'info') - if found_sources == 0: - current_app.logger.error("\n\n !!!!!!! No sources found in upload !!!! \n Uploaded file " + file.filename + " is probably from wrong report \n !!!!!!!!!!!") - - -def excel_to_dataframes(xls): - df = [] - wb = load_workbook(xls) - - if len(wb.sheetnames) > 1: - with NamedTemporaryFile() as tmp: - wb.save(tmp.name) - for sheetname in wb.sheetnames: - for item in DATASOURCE_MAPPING: - if 'sheetname' in DATASOURCE_MAPPING[item]: - if DATASOURCE_MAPPING[item]['sheetname'] == sheetname: - tmp.seek(0) - df.append(pd.read_excel(tmp.read(), sheetname)) - else: - df.append(pd.read_excel(xls)) - - return df + if file_extension == "xlsx": + excel_file = pd.ExcelFile(file) + if {"Master", "Service"}.issubset(excel_file.sheet_names): + # Volgistics + validate_import_vs(file, conn) + Volgistics.insert_from_file(excel_file, conn) + return -def clean_current_folder(src_type): - if os.listdir(RAW_DATA_PATH): - for file_name in os.listdir(RAW_DATA_PATH): - file_path = os.path.join(RAW_DATA_PATH, file_name) - file_name_striped = file_path.split('-')[0].split('/')[-1] + df = pd.read_excel(excel_file) + if "Contact ID 18" in df.columns: + # Salesforce something-or-other + if "Amount" in df.columns: + # Salesforce donations + validate_import_sfd(file, conn) + return + else: + # Salesforce contacts + SalesForceContacts.insert_from_file_df(df, conn) + return - if file_name_striped == src_type: - current_app.logger.info('File to remove: ' + file_path) - os.remove(file_path) - current_app.logger.info(" -Removed file: " + file_name + " from Current files folder") + current_app.logger.error(f"Don't know how to process file {file.filename}") \ No newline at end of file diff --git a/src/server/api/internal_api.py b/src/server/api/internal_api.py index 324c8b53..81e1ee89 100644 --- a/src/server/api/internal_api.py +++ b/src/server/api/internal_api.py @@ -1,4 +1,5 @@ from api.api import internal_api +from config import engine from flask import jsonify, current_app from datetime import datetime from api.API_ingest import ingest_sources_from_api @@ -24,7 +25,8 @@ def user_test2(): @internal_api.route("/api/ingestRawData", methods=["GET"]) def ingest_raw_data(): try: - ingest_sources_from_api.start() + with engine.begin() as conn: + ingest_sources_from_api.start(conn) except Exception as e: current_app.logger.exception(e) diff --git a/src/server/datasource_manager.py b/src/server/datasource_manager.py deleted file mode 100644 index bb243a13..00000000 --- a/src/server/datasource_manager.py +++ /dev/null @@ -1,166 +0,0 @@ -import re - - -def __clean_csv_headers(header): - header = re.sub(r'\s\(.*\)', '', header) - header = re.sub(r'\.+', '_', header.lower().strip().replace(' ', '_').replace('/', '_')) - return header.replace('#', 'num') - - -CSV_HEADERS = { - 'shelterluvpeople': ['Firstname', 'Lastname', 'ID', 'Internal-ID', 'PreviousIds', 'Associated', 'Street', - 'Apartment', 'City', 'State', 'Zip', 'Email', 'Phone', 'Animal_ids'], - 'volgistics': ['Last name', 'First name', 'Middle name', 'Number', 'Complete address', 'Street 1', 'Street 2', - 'Street 3', 'City', 'State', 'Zip', 'All phone numbers', 'Home', 'Work', 'Cell', 'Email'], - 'salesforcecontacts': ['Account Name', 'Contact ID 18', 'First Name', 'Last Name', 'Mailing Street', 'Mailing City', - 'Mailing State/Province', 'Mailing Zip/Postal Code', 'Mailing Country', 'Phone', 'Mobile', - 'Email', 'Account ID 18', 'Volgistics ID', 'Person ID'], - 'volgisticsshifts': ['Number', 'Place', 'Assignment', 'From date', 'To date', 'Hours'], - 'salesforcedonations': ['Recurring donor', 'Opportunity Owner', 'Account Name', 'Opportunity ID (18 Digit)', - 'Account ID (18 digit)', - 'Opportunity Name', 'Stage', 'Fiscal Period', 'Amount', 'Probability (%)', 'Age', - 'Close Date', 'Created Date', 'Type', 'Primary Campaign Source', - 'Source', 'Contact ID (18 Digit)', 'Primary Contact'], - 'manualmatches': ['salesforcecontacts', 'volgistics', 'shelterluvpeople'] -} - - # TODO: Now that volgisticsshifts and salesforcedonations are loaded directly, what should their records above and below reflect - # to be processed by clean_and_load_data (L34) ? - - - -DATASOURCE_MAPPING = { - 'salesforcecontacts': { - 'id': 'contact_id_18', - 'csv_names': CSV_HEADERS['salesforcecontacts'], - 'tracked_columns': list(map(__clean_csv_headers, CSV_HEADERS['salesforcecontacts'])), - 'table_email': 'email', - '_table_name': ['first_name', 'last_name'], - 'should_drop_first_column': True - }, - 'volgistics': { - 'id': 'number', - 'csv_names': CSV_HEADERS['volgistics'], - 'tracked_columns': list(map(__clean_csv_headers, CSV_HEADERS['volgistics'])), - 'table_email': 'Email'.lower(), - '_table_name': ['first_name', 'last_name'], - 'sheetname': 'Master', - 'should_drop_first_column': True - }, - 'shelterluvpeople': { - 'id': 'id', - 'csv_names': CSV_HEADERS['shelterluvpeople'], - 'tracked_columns': list(map(__clean_csv_headers, CSV_HEADERS['shelterluvpeople'])), - 'table_email': 'Email'.lower(), - '_table_name': ['Firstname'.lower(), 'Lastname'.lower()], - 'should_drop_first_column': False - }, - 'volgisticsshifts': { - 'id': 'volg_id', - 'csv_names': CSV_HEADERS['volgisticsshifts'], - 'tracked_columns': list(map(__clean_csv_headers, CSV_HEADERS['volgisticsshifts'])), - 'table_email': None, - '_table_name': None, - 'sheetname': 'Service', - 'should_drop_first_column': False - }, - 'salesforcedonations': { - 'id': 'contact_id', - 'csv_names': CSV_HEADERS['salesforcedonations'], - 'tracked_columns': list(map(__clean_csv_headers, CSV_HEADERS['salesforcedonations'])), - 'table_email': None, - '_table_name': None, - 'should_drop_first_column': True - } -} - - -def volgistics_address(street, index): - result = "" - - if isinstance(street, str): - if " " in street: - if index == 1: - result = " ".join(street.split()[1:]) - else: - result = street.split()[index] - - return result - - -def normalize_phone_number(number): - result = None - - if number and str(number) != 'nan': - number = re.sub('[() -.+]', '', str(number)) - - if number and number[0] == '1': - number = number[1:] - - if number.isdigit() and len(number) == 10: - result = number - - return result - - -SOURCE_NORMALIZATION_MAPPING = { - "salesforcecontacts": { - "source_id": "contact_id_18", - "first_name": "first_name", - "last_name": "last_name", - "email": "email", - "mobile": lambda df: df["mobile"].combine_first(df["phone"]).apply(normalize_phone_number), - "street_and_number": "mailing_street", - "apartment": "mailing_street", - "city": "mailing_city", - "state": "mailing_state_province", - "zip": "mailing_zip_postal_code", - "account_name": "account_name", - "others": { - "should_drop_first_column": True - } - - }, - "salesforcedonations": { - "parent": "salesforcecontacts", - "others": { - "should_drop_first_column": True - } - }, - "shelterluvpeople": { - "source_id": "internal-id", - "first_name": "firstname", - "last_name": "lastname", - "email": "email", - "mobile": lambda df: df["phone"].apply(normalize_phone_number), - "street_and_number": "street", - "apartment": "apartment", - "city": "city", - "state": "state", - "zip": "zip", - "others": { - "should_drop_first_column": False - } - }, - "volgistics": { - "source_id": "number", - "first_name": "first_name", - "last_name": "last_name", - "email": "email", - "mobile": lambda df: df["cell"].combine_first(df["home"]).apply(normalize_phone_number), - "street_and_number": lambda df: df["street_1"].apply(volgistics_address, index=1), - "apartment": lambda df: df["street_1"].apply(volgistics_address, index=0), - "city": "city", - "state": "state", - "zip": "zip", - "others": { - "should_drop_first_column": True - } - }, - "volgisticsshifts": { - "parent": "volgistics", - "others": { - "should_drop_first_column": True - } - } -} diff --git a/src/server/donations_importer.py b/src/server/donations_importer.py index 25c464e3..41d6e80f 100644 --- a/src/server/donations_importer.py +++ b/src/server/donations_importer.py @@ -6,7 +6,6 @@ from config import engine -from sqlalchemy.orm import Session, sessionmaker from sqlalchemy import insert, Table, Column, MetaData, exc from sqlalchemy.dialects.postgresql import Insert metadata = MetaData() @@ -35,7 +34,7 @@ 'Source': None } -def validate_import_sfd(filename): +def validate_import_sfd(filename, conn): """ Validate that the XLSX column names int the file are close enough to expectations that we can trust the data. If so, insert the data into the salseforcedonations table. """ @@ -70,8 +69,6 @@ def validate_import_sfd(filename): if min_similarity >= MINIMUM_SIMILARITY : # Good enough to trust - Session = sessionmaker(engine) - session = Session() sfd = Table("salesforcedonations", metadata, autoload=True, autoload_with=engine) seen_header = False # Used to skip header row @@ -118,7 +115,7 @@ def validate_import_sfd(filename): constraint='uq_donation' ) try: - result = session.execute(skip_dupes) + result = conn.execute(skip_dupes) except exc.IntegrityError as e: # Catch-all for several more specific exceptions if re.match('duplicate key value', str(e.orig) ): dupes += 1 @@ -137,12 +134,12 @@ def validate_import_sfd(filename): else: # Haven't seen header, so this was first row. seen_header = True - session.commit() # Commit all inserted rows + # NOTE: we now run this in a engine.begin() context manager, so our + # parent will commit. Don't commit here! current_app.logger.info("--------------------------------- Stats -------------------------------------------------") current_app.logger.info("Total rows: " + str(row_count) + " Dupes: " + str(dupes) + " Missing contact_id: " + str(missing_contact_id) ) current_app.logger.info("Other integrity exceptions: " + str(other_integrity) + " Other exceptions: " + str(other_exceptions) ) - session.close() wb.close() return { True : "File imported" } diff --git a/src/server/models.py b/src/server/models.py index da63d989..8bffa419 100644 --- a/src/server/models.py +++ b/src/server/models.py @@ -1,20 +1,127 @@ import datetime -import copy +import re +from itertools import combinations -from sqlalchemy import Column, Integer, String, DateTime -from sqlalchemy.dialects.postgresql import JSONB +import pandas as pd +import sqlalchemy as sa +from sqlalchemy import ( + Boolean, + DateTime, + Index, + Integer, + String, + delete, + desc, + func, + literal_column, + select, + text, + tuple_, +) +from sqlalchemy.dialects.postgresql import JSONB, insert from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.sql.functions import coalesce Base = declarative_base() +def Column(*colargs, source_column=None, contacts_column=None, **kwargs): + # Currently many of our database tables are populated by loading some csv or + # excel table and inserting those columns with little more than a column + # name change. Many of those databse columns will also later be copied into + # pdp_contacts with little processing. This drop-in replacement for + # sqlalchemy's Column function lets us easily provide this extra information + # for some automated processing. + + info = kwargs.get("info", {}) + if source_column: + info["source_column"] = source_column + if contacts_column: + info["contacts_column"] = contacts_column + + return sa.Column(*colargs, info=info, **kwargs) + + +def get_source_column_translation(cls): + # Produce a mapping of source column to database column for a class that + # uses the Column helper + + return { + col.info["source_column"]: col.name + for col in cls.__table__.columns + if "source_column" in col.info + } + + +def get_contacts_mapping(cls): + # Produce an association of pdp_contacts columns with some other table's + # columns for use in an INSERT ... FROM SELECT + + return [ + (PdpContacts.matching_id, 0), + (PdpContacts.source_type, literal_column(f"'{cls.__tablename__}'")), + ] + [ + (col.info["contacts_column"], col) + for col in cls.__table__.columns + if "contacts_column" in col.info + ] + + +def dedup_consecutive(table, unique_id, id, order_by, dedup_on): + # Many of our raw data tables have a similar structure: a contact id column, + # an insert time column, and several other pieces of raw data. If someone + # inserts a "new" record for a certain id, but none of the raw data is + # different from the previous record, we'd like to get rid of it. + # + # This bit of SQL magic partitions a table by a given id column, orders it + # by some order_by column, and removes duplicate consecutive entries based + # on some dedup_on expression. + # + # Note the use of "IS NOT DISTINCT FROM" instead of "!="; the latter does + # not work well on null values. + + sq = select( + unique_id, + id, + order_by, + dedup_on.bool_op("IS NOT DISTINCT FROM")( + func.lag(dedup_on).over(partition_by=id, order_by=order_by) + ).label("is_dupe"), + ).subquery() + + to_delete = select(sq.c[0]).where(sq.c[3]).subquery() + return delete(table).where(unique_id == to_delete.c[0]) + + +def normalize_phone_number(number): + result = None + + if number and str(number) != "nan": + number = re.sub("[() -.+]", "", str(number)) + + if number and number[0] == "1": + number = number[1:] + + if number.isdigit() and len(number) == 10: + result = number + + return result + + class PdpContacts(Base): __tablename__ = "pdp_contacts" + __table_args__ = ( + Index("idx_pdp_contacts_lower_first_name", text("lower(first_name)")), + Index("idx_pdp_contacts_lower_last_name", text("lower(last_name)")), + Index("idx_pdp_contacts_lower_email", text("lower(email)")), + Index("idx_pdp_contacts_source_type_and_id", "source_type", "source_id"), + ) _id = Column(Integer, primary_key=True, autoincrement=True) matching_id = Column(Integer) source_type = Column(String) source_id = Column(String) + is_organization = Column(Boolean, default=False) first_name = Column(String, default=None) last_name = Column(String, default=None) email = Column(String, default=None) @@ -29,67 +136,246 @@ class PdpContacts(Base): archived_date = Column(DateTime, default=None) -TempPdpContactsLoader = copy.deepcopy(PdpContacts) -TempPdpContactsLoader.__tablename__ = "_temp_pdp_contacts_loader" - +P = PdpContacts class SalesForceContacts(Base): __tablename__ = "salesforcecontacts" _id = Column(Integer, primary_key=True) - contact_id = Column(String) - first_name = Column(String) - last_name = Column(String) - mailing_country = Column(String) - mailing_street = Column(String) - mailing_city = Column(String) - mailing_state_province = Column(String) - mailing_zip_postal_code = Column(String) - phone = Column(String) - mobile = Column(String) - email = Column(String) + contact_id = Column( + String, source_column="Contact ID 18", contacts_column=P.source_id + ) + first_name = Column( + String, source_column="First Name", contacts_column=P.first_name + ) + last_name = Column(String, source_column="Last Name", contacts_column=P.last_name) + account_name = Column(String, source_column="Account Name") + mailing_country = Column(String, source_column="Mailing Country") + mailing_street = Column( + String, source_column="Mailing Street", contacts_column=P.street_and_number + ) + mailing_city = Column(String, source_column="Mailing City", contacts_column=P.city) + mailing_state_province = Column( + String, source_column="Mailing State/Province", contacts_column=P.state + ) + mailing_zip_postal_code = Column( + String, source_column="Mailing Zip/Postal Code", contacts_column=P.zip + ) + phone = Column(String, source_column="Phone") + mobile = Column(String, source_column="Mobile") + email = Column(String, source_column="Email", contacts_column=P.email) json = Column(JSONB) + created_date = Column(DateTime, default=datetime.datetime.utcnow) + + @classmethod + def insert_from_file_df(cls, df, conn): + column_translation = get_source_column_translation(cls) + df = df[column_translation.keys()] + df = df.rename(columns=column_translation) + + df["phone"] = df["phone"].apply(normalize_phone_number) + df["mobile"] = df["mobile"].apply(normalize_phone_number) + + dedup_on = [col for col in cls.__table__.columns if col.name in df.columns] + df["created_date"] = datetime.datetime.utcnow() + df.to_sql(cls.__tablename__, conn, if_exists="append", index=False) + conn.execute( + dedup_consecutive( + cls.__table__, + unique_id=cls._id, + id=cls.contact_id, + order_by=cls.created_date, + dedup_on=tuple_(*dedup_on), + ) + ) + + @classmethod + def insert_into_pdp_contacts(cls): + column_mapping = get_contacts_mapping(cls) + [ + # Note: current version of SQLalchemy doesn't like seeing the same + # column object twice in insert().from_select, hence this + # literal_column. I think this is fixed in a later version? + (P.apartment, literal_column("mailing_street")), + (P.mobile, coalesce(cls.mobile, cls.phone)), + (P.is_organization, cls.account_name.not_like("% Household")), + ] + contacts_columns, this_columns = zip(*column_mapping) + + return insert(PdpContacts).from_select( + list(contacts_columns), + select(*this_columns) + .distinct(cls.contact_id) + .order_by(cls.contact_id, desc(cls.created_date)), + ) class ShelterluvPeople(Base): __tablename__ = "shelterluvpeople" _id = Column(Integer, primary_key=True) - firstname = Column(String) - lastname = Column(String) - id = Column(String) - associated = Column(String) - street = Column(String) - apartment = Column(String) - city = Column(String) - state = Column(String) - zip = Column(String) - email = Column(String) - phone = Column(String) - animal_ids = Column(JSONB) + firstname = Column(String, source_column="Firstname", contacts_column=P.first_name) + lastname = Column(String, source_column="Lastname", contacts_column=P.last_name) + id = Column(String, source_column="ID") + internal_id = Column( + String, source_column="Internal-ID", contacts_column=P.source_id + ) + associated = Column(String, source_column="Associated") + street = Column(String, source_column="Street", contacts_column=P.street_and_number) + apartment = Column(String, source_column="Apartment", contacts_column=P.apartment) + city = Column(String, source_column="City", contacts_column=P.city) + state = Column(String, source_column="State", contacts_column=P.state) + zip = Column(String, source_column="Zip", contacts_column=P.zip) + email = Column(String, source_column="Email", contacts_column=P.email) + phone = Column(String, source_column="Phone", contacts_column=P.mobile) + animal_ids = Column(JSONB, source_column="Animal_ids") json = Column(JSONB) + created_date = Column(DateTime, default=datetime.datetime.utcnow) + + @classmethod + def insert_from_df(cls, df, conn): + column_translation = get_source_column_translation(cls) + df = df[column_translation.keys()] + df = df.rename(columns=column_translation) + + df["phone"] = df["phone"].apply(normalize_phone_number) + + dedup_on = [col for col in cls.__table__.columns if col.name in df.columns] + df["created_date"] = datetime.datetime.utcnow() + df.to_sql( + cls.__tablename__, + conn, + dtype={"animal_ids": JSONB}, + if_exists="append", + index=False, + ) + conn.execute( + dedup_consecutive( + cls.__table__, + unique_id=cls._id, + id=cls.internal_id, + order_by=cls.created_date, + dedup_on=tuple_(*dedup_on), + ) + ) + + @classmethod + def insert_into_pdp_contacts(cls): + column_mapping = get_contacts_mapping(cls) + contacts_columns, this_columns = zip(*column_mapping) + return insert(PdpContacts).from_select( + list(contacts_columns), + select(*this_columns) + .distinct(cls.internal_id) + .order_by(cls.internal_id, desc(cls.created_date)), + ) class Volgistics(Base): __tablename__ = "volgistics" _id = Column(Integer, primary_key=True) - number = Column(String) - last_name = Column(String) - first_name = Column(String) - middle_name = Column(String) - complete_address = Column(String) - street_1 = Column(String) - street_2 = Column(String) - street_3 = Column(String) - city = Column(String) - state = Column(String) - zip = Column(String) - all_phone_numbers = Column(String) - home = Column(String) - work = Column(String) - cell = Column(String) - email = Column(String) + number = Column(String, source_column="Number", contacts_column=P.source_id) + last_name = Column(String, source_column="Last name", contacts_column=P.last_name) + first_name = Column( + String, source_column="First name", contacts_column=P.first_name + ) + middle_name = Column(String, source_column="Middle name") + complete_address = Column(String, source_column="Complete address") + street_1 = Column(String, source_column="Street 1") + street_2 = Column(String, source_column="Street 2") + street_3 = Column(String, source_column="Street 3") + city = Column(String, source_column="City", contacts_column=P.city) + state = Column(String, source_column="State", contacts_column=P.state) + zip = Column(String, source_column="Zip", contacts_column=P.zip) + all_phone_numbers = Column(String, source_column="All phone numbers") + home = Column(String, source_column="Home") + work = Column(String, source_column="Work") + cell = Column(String, source_column="Cell") + email = Column(String, source_column="Email", contacts_column=P.email) json = Column(JSONB) + created_date = Column(DateTime, default=datetime.datetime.utcnow) + + @classmethod + def insert_from_file(cls, xl_file, conn): + df = pd.read_excel(xl_file, sheet_name="Master") + + column_translation = get_source_column_translation(cls) + df = df[column_translation.keys()] + df = df.rename(columns=column_translation) + + df["home"] = df["home"].apply(normalize_phone_number) + df["work"] = df["work"].apply(normalize_phone_number) + df["cell"] = df["cell"].apply(normalize_phone_number) + + dedup_on = [col for col in cls.__table__.columns if col.name in df.columns] + df["created_date"] = datetime.datetime.utcnow() + df.to_sql( + cls.__tablename__, + conn, + if_exists="append", + index=False, + ) + conn.execute( + dedup_consecutive( + cls.__table__, + unique_id=cls._id, + id=cls.number, + order_by=cls.created_date, + dedup_on=tuple_(*dedup_on), + ) + ) + + @classmethod + def insert_into_pdp_contacts(cls): + column_mapping = get_contacts_mapping(cls) + [ + # NOTE: This logic seems wrong. It peels off the street number and + # calls it the "apartment," and calls the rest of the address the + # "street and number." + ( + P.street_and_number, + literal_column("regexp_replace(street_1, '^[^ ]* ?', '')"), + ), + (P.apartment, literal_column("(regexp_match(street_1, '^([^ ]*) '))[1]")), + (P.mobile, coalesce(cls.cell, cls.home)), + ] + contacts_columns, this_columns = zip(*column_mapping) + return insert(PdpContacts).from_select( + list(contacts_columns), + select(*this_columns) + .distinct(cls.number) + .order_by(cls.number, desc(cls.created_date)), + ) + + +class ManualMatches(Base): + __tablename__ = "manual_matches" + + source_type_1 = Column(String, primary_key=True) + source_id_1 = Column(String, primary_key=True) + source_type_2 = Column(String, primary_key=True) + source_id_2 = Column(String, primary_key=True) + + @classmethod + def insert_from_df(cls, df, conn): + # Our input csv has columns like "salesforcecontacts," "volgistics," and + # "shelterluvpeople," where two columns are non-null if there is an + # association between those two ids. We massage this table into one that + # is easier to join on. + + match_dicts = df.to_dict(orient="records") + + matched_pairs = [] + for match in match_dicts: + non_nulls = {k: v for (k, v) in match.items() if not pd.isna(v)} + for ((st1, sid1), (st2, sid2)) in combinations(non_nulls.items(), 2): + matched_pairs.append( + { + "source_type_1": st1, + "source_id_1": sid1, + "source_type_2": st2, + "source_id_2": sid2, + } + ) + conn.execute(insert(cls).values(matched_pairs).on_conflict_do_nothing()) diff --git a/src/server/pipeline/archive_rows.py b/src/server/pipeline/archive_rows.py deleted file mode 100644 index 6910b763..00000000 --- a/src/server/pipeline/archive_rows.py +++ /dev/null @@ -1,11 +0,0 @@ - -def archive(pdp_db, updated_rows): - for index, row in updated_rows.iterrows(): - source_type = row["source_type"] - source_id = row["source_id"] - - mark_deleted = '''update pdp_contacts set archived_date = now() \ - where source_type like '{}' and \ - source_id like '{}' and archived_date is null'''.format(source_type, source_id) - - pdp_db.execute(mark_deleted) diff --git a/src/server/pipeline/calssify_new_data.py b/src/server/pipeline/calssify_new_data.py deleted file mode 100644 index 4e2f7b30..00000000 --- a/src/server/pipeline/calssify_new_data.py +++ /dev/null @@ -1,62 +0,0 @@ -import pandas as pd -from flask import current_app - - -def venn_diagram_join(df1, df2): - # Calculates the join between two dataframes like a Venn diagram - # - # Join criteria is all columns in common between them. - # Returns which rows are rows are only present in the left, which overlap, - # and which are only on the right table. - # - # An evolution of earlier work from match_data.join_on_all_columns - venn_indicator = '_merge' # temporary column name for designating the match type - join_results = df1.merge(df2, how='outer', indicator=venn_indicator) - return ( - join_results[join_results[venn_indicator] == 'left_only'].drop(columns=venn_indicator), - join_results[join_results[venn_indicator] == 'both'].drop(columns=venn_indicator), - join_results[join_results[venn_indicator] == 'right_only'].drop(columns=venn_indicator) - ) - - -def filter_rows_by_ids(df, ids): - assert "source_id" in ids and "source_type" in ids and len(ids.columns) == 2 - return df.merge(ids, how="inner") # rows in df with the expected id - - -def start(pdp_contacts_df, normalized_data): - current_app.logger.info("Starting classification of rows") - current_app.logger.info(" - {} rows in incoming data and {} in existing pdp_contacts".format( - normalized_data.shape[0], pdp_contacts_df.shape[0] - )) - result = { - "new": pd.DataFrame(columns=pdp_contacts_df.columns), - "updated": pd.DataFrame(columns=pdp_contacts_df.columns), - "old": pd.DataFrame(columns=pdp_contacts_df.columns) - } - - incoming_ids = normalized_data[["source_id", "source_type"]].drop_duplicates() - existing_ids = pdp_contacts_df[["source_id", "source_type"]].drop_duplicates() - #probably need a smarter method of dropping duplicates, e.g. row with least amount of null values - normalized_data = normalized_data.drop_duplicates(["source_id", "source_type"]) - new_ids, reused_ids, old_ids = venn_diagram_join(incoming_ids, existing_ids) - current_app.logger.info(" - ID's identified as {} new, {} reused, and {} old".format( - new_ids.shape[0], reused_ids.shape[0], old_ids.shape[0] - )) - - # Process updated results - incoming_reused_rows = filter_rows_by_ids(normalized_data, reused_ids) - existing_reused_rows = filter_rows_by_ids(pdp_contacts_df, reused_ids) - fresh_rows, unchanged_rows, old_version_rows = venn_diagram_join(incoming_reused_rows, existing_reused_rows) - # We don't need to consider unchanged rows, since we've already recorded that data and matching. - result["updated"] = fresh_rows - result["old"] = pd.concat([filter_rows_by_ids(pdp_contacts_df, old_ids), unchanged_rows]) - # could also consider adding old_version_rows to result["old"] - result["new"] = filter_rows_by_ids(normalized_data, new_ids) - - current_app.logger.info( - " - Classified {} new rows and {} updated rows in the normalized data" - .format(result["new"].shape[0], result["updated"].shape[0]) - ) - - return result diff --git a/src/server/pipeline/clean_and_load_data.py b/src/server/pipeline/clean_and_load_data.py deleted file mode 100644 index ed39456c..00000000 --- a/src/server/pipeline/clean_and_load_data.py +++ /dev/null @@ -1,95 +0,0 @@ -import pandas as pd -import re -import os -import io -import copy - -from datasource_manager import DATASOURCE_MAPPING, SOURCE_NORMALIZATION_MAPPING -from flask import current_app -import sqlalchemy -from config import RAW_DATA_PATH -from pipeline import log_db - -def start(connection, pdp_contacts_df, file_path_list): - result = pd.DataFrame(columns=pdp_contacts_df.columns) - json_rows = pd.DataFrame(columns=["source_type", "source_id", "json"]) - manual_matches_df = pd.DataFrame() - - for uploaded_file in file_path_list: - file_path = os.path.join(RAW_DATA_PATH, uploaded_file) - table_name = file_path.split('/')[-1].split('-')[0] - if table_name == 'manualmatches': - manual_matches_df = pd.read_csv((io.BytesIO(open(file_path, "rb").read())), encoding='iso-8859-1') - manual_matches_df[["volgistics", "shelterluvpeople"]] = manual_matches_df[["volgistics", "shelterluvpeople"]].fillna(0).astype(int).astype(str) - continue - - current_app.logger.info('Running load_paws_data on: ' + uploaded_file) - - df = pd.read_csv((io.BytesIO(open(file_path, "rb").read())), encoding='iso-8859-1') - current_app.logger.info(' - Populated DF') - - df = __clean_raw_data(df, table_name) - current_app.logger.info(' - Cleaned DF') - - normalization_without_others = copy.deepcopy(SOURCE_NORMALIZATION_MAPPING[table_name]) - normalization_without_others.pop("others") # copy avoids modifying the imported mapping - - if "parent" not in normalization_without_others: # not a child table - source_df = create_normalized_df(df, normalization_without_others, table_name) - df_jsonl = df.to_json(orient="records", lines=True) # original df with normalized column names - # source_json = pd.DataFrame({ - # "source_type": table_name, - # "source_id": source_df["source_id"].astype(str), - # "json": df_jsonl.split("\n") # list of jsons, one per row - # }) - - if result.empty: - result = source_df - # json_rows = source_json - else: - result = pd.concat([result, source_df]) - # json_rows = pd.concat([json_rows, source_json]) - - # else: # it is a child table, processed in file_uploader.py - current_app.logger.info(' - Finish load_paws_data on: ' + uploaded_file) - - return result, json_rows, manual_matches_df - - -def create_normalized_df(df, normalized_df, table_name): - result = pd.DataFrame(columns=["matching_id"]) - - for new_column, table_column in normalized_df.items(): - if isinstance(table_column, str): - result[new_column] = df[table_column] - elif callable(table_column): - result[new_column] = table_column(df) - else: - raise ValueError("Unknown mapping operation") - - result["source_type"] = table_name - # Enforce ID datatype to avoid inconsistency when reading/writing table to SQL - result["source_id"] = result["source_id"].astype(str) - - current_app.logger.info(' - Normalized DF') - - return result - - - - -def __clean_raw_data(df, table_name): - # drop the first column - so far all csvs have had a first column that's an index and doesn't have a name - if DATASOURCE_MAPPING[table_name]["should_drop_first_column"]: - df = df.drop(df.columns[0], axis=1) - - # strip whitespace and periods from headers, convert to lowercase - df.columns = df.columns.str.replace(r"\.*\(%\)\.*", "") - df.columns = df.columns.str.lower().str.strip() - df.columns = df.columns.map(lambda x: re.sub(r'\s\(.*\)', '', x)) - df.columns = df.columns.str.replace(' ', '_') - df.columns = df.columns.str.replace('#', 'num') - df.columns = df.columns.str.replace('/', '_') - df.columns = df.columns.map(lambda x: re.sub(r'\.+', '_', x)) - - return df \ No newline at end of file diff --git a/src/server/pipeline/flow_script.py b/src/server/pipeline/flow_script.py index 726bc2f3..1acb572a 100644 --- a/src/server/pipeline/flow_script.py +++ b/src/server/pipeline/flow_script.py @@ -1,16 +1,34 @@ -import os , sys, traceback -import pandas as pd +import time +import traceback -from flask import current_app from api import admin_api -from pipeline import calssify_new_data, clean_and_load_data, archive_rows, match_data, log_db -from config import RAW_DATA_PATH from config import engine -from models import Base - -import time +from flask import current_app +from models import ( + ManualMatches, + PdpContacts, + SalesForceContacts, + ShelterluvPeople, + Volgistics, +) +from networkx import Graph, connected_components +from sqlalchemy import ( + Column, + Integer, + MetaData, + Table, + and_, + delete, + func, + insert, + or_, + select, + text, + update, +) + +from pipeline import log_db -from rfm_funcs.create_scores import create_scores def start_flow(): start = time.time() @@ -18,90 +36,170 @@ def start_flow(): job_outcome = None trace_back_string = None - if not job_id: - current_app.logger.info('Failed to get job_id') - job_outcome = 'busy' - - else: - - - try: - - log_db.log_exec_status(job_id, 'start_flow', 'executing', '') - - file_path_list = os.listdir(RAW_DATA_PATH) - - if file_path_list: - with engine.begin() as connection: - - # Get previous version of pdp_contacts table, which is used later to classify new records - pdp_contacts_df = pd.read_sql_table('pdp_contacts', connection) - pdp_contacts_df = pdp_contacts_df[pdp_contacts_df["archived_date"].isnull()] - pdp_contacts_df = pdp_contacts_df.drop(columns=['archived_date', 'created_date', '_id', 'matching_id']) - - current_app.logger.info('Loaded {} records from pdp_contacts table'.format(pdp_contacts_df.shape[0])) - - # Clean the input data and normalize/rename columns - # Populate new records in secondary tables (donations, volunteer shifts) - # input - existing files in path - # output - normalized object of all entries, as well as the input json rows for primary sources - log_db.log_exec_status(job_id, 'clean_and_load', 'executing', '') - normalized_data, source_json, manual_matches_df = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list) - # Standardize column data types via postgres (e.g. reading a csv column as int vs. str) - # (If additional inconsistencies are encountered, may need to enforce the schema of - # the contacts loader by initializing it from pdp_contacts.) - normalized_data.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace') - normalized_data = pd.read_sql_table('_temp_pdp_contacts_loader', connection) - - # Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB - log_db.log_exec_status(job_id, 'classify', 'executing', '') - rows_classified = calssify_new_data.start(pdp_contacts_df, normalized_data) - - # Archives rows the were updated in the current state of the DB (changes their archived_date to now) - archive_rows.archive(connection, rows_classified["updated"]) - - # Match new+updated records against previous version of pdp_contacts database, and - # write these rows to the database. - match_data.start(connection, rows_classified, manual_matches_df, job_id) - - # Copy raw input rows to json fields in pdp_contacts, - # using a temporary table to simplify the update code. - current_app.logger.info('Saving json of original rows to pdp_contacts') - source_json.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace') - # https://www.postgresql.org/docs/8.4/sql-update.html - connection.execute(''' - UPDATE pdp_contacts pdp - SET json = to_json(temp.json) - FROM _temp_pdp_contacts_loader temp - WHERE - pdp.source_type = temp.source_type AND - pdp.source_id = temp.source_id AND - pdp.archived_date IS NULL - ''') - - current_app.logger.info('Finished flow script run') - job_outcome = 'completed' - log_db.log_exec_status(job_id, 'flow', 'complete', '' ) - - else: # No files in list - current_app.logger.info('No files to process') - job_outcome = 'nothing to do' - log_db.log_exec_status(job_id, 'flow', 'complete', '' ) - - - except Exception as e: - current_app.logger.error(e) - trace_back_string = traceback.format_exc() - current_app.logger.error(trace_back_string) - - finally: - if job_outcome != 'completed': - - log_db.log_exec_status(job_id, 'flow', 'error', trace_back_string ) - current_app.logger.error("Uncaught error status, setting job status to \'error\' ") - job_outcome = 'error' - return 'error' - - current_app.logger.info('Pipeline execution took {} seconds '.format(time.time() - start)) - return job_outcome \ No newline at end of file + current_app.logger.info("Failed to get job_id") + return "busy" + + try: + log_db.log_exec_status(job_id, "start_flow", "executing", "") + + with engine.begin() as conn: + # Here's how we match: + # 1. Clear pdp_contacts (the old matches). + # 2. Go through each raw data source table (e.g. salesforcecontacts, + # volgistics) and copy the latest data for each contact into + # pdp_contacts. + # 3. Execute a join of pdp_contacts to itself using names, emails, + # phone numbers, etc. to get a list of pairs of pdp_contacts ids + # that "match." + # 4. Join manual_matches to pdp_contacts to get the pdp_contacts ids + # of our manual matches. + # + # Steps 3 and 4 both produce lists of pairs of ids. Next we need to + # associate an id with each group of matches. Note that if A matches + # B and B matches C, then A and C should get the same match id. We + # can thus think of "matches" as edges in a graph of id vertices, + # and match groups as connected components in that graph. So: + # + # 5. Load all the matches into a Graph() and compute its connected + # components. + # 6. Update each row in pdp_contacts to give it a match id + # corresponding to its connected componenet. + + current_app.logger.info("Clearing pdp_contacts to prepare for match") + reset_pdp_contacts_with_unmatched(conn) + + current_app.logger.info("Computing matches") + automatic_matches = get_automatic_matches(conn) + manual_matches = get_manual_matches(conn) + + match_graph = Graph() + match_graph.add_edges_from(automatic_matches) + match_graph.add_edges_from(manual_matches) + match_groups = connected_components(match_graph) + + current_app.logger.info("Updating pdp_contacts with match ids") + update_matching_ids(match_groups, conn) + + current_app.logger.info("Finished flow script run") + job_outcome = "completed" + log_db.log_exec_status(job_id, "flow", "complete", "") + + except Exception as e: + current_app.logger.error(e) + trace_back_string = traceback.format_exc() + current_app.logger.error(trace_back_string) + + finally: + if job_outcome != "completed": + + log_db.log_exec_status(job_id, "flow", "error", trace_back_string) + current_app.logger.error( + "Uncaught error status, setting job status to 'error' " + ) + job_outcome = "error" + return "error" + + current_app.logger.info( + "Pipeline execution took {} seconds ".format(time.time() - start) + ) + return job_outcome + + +def reset_pdp_contacts_with_unmatched(conn): + conn.execute(delete(PdpContacts)) + conn.execute(SalesForceContacts.insert_into_pdp_contacts()) + conn.execute(Volgistics.insert_into_pdp_contacts()) + conn.execute(ShelterluvPeople.insert_into_pdp_contacts()) + + +def name_to_array(n): + delims = text("'( and | & |, | )'") + return func.regexp_split_to_array( + func.lower(func.translate(n, text("'\"'"), text("''"))), delims + ) + + +def compare_names(n1, n2): + return name_to_array(n1).bool_op("&&")(name_to_array(n2)) + + +def get_automatic_matches(conn): + pc1 = PdpContacts.__table__.alias() + pc2 = PdpContacts.__table__.alias() + match_stmt = select(pc1.c._id, pc2.c._id).join( + pc2, + and_( + or_( + and_( + compare_names(pc1.c.first_name, pc2.c.first_name), + compare_names(pc1.c.last_name, pc2.c.last_name), + ), + and_( + compare_names(pc1.c.first_name, pc2.c.last_name), + compare_names(pc1.c.last_name, pc2.c.first_name), + ), + ), + or_( + func.lower(pc1.c.email) == func.lower(pc2.c.email), + pc1.c.mobile == pc2.c.mobile, + ), + # This ensures we don't get e.g. every row matching itself + pc1.c._id < pc2.c._id, + ), + ) + return conn.execute(match_stmt) + + +def get_manual_matches(conn): + pc1 = PdpContacts.__table__.alias() + pc2 = PdpContacts.__table__.alias() + stmt = ( + select(pc1.c._id, pc2.c._id) + .select_from(ManualMatches) + .join( + pc1, + (ManualMatches.source_type_1 == pc1.c.source_type) + & (ManualMatches.source_id_1 == pc1.c.source_id), + ) + .join( + pc2, + (ManualMatches.source_type_2 == pc2.c.source_type) + & (ManualMatches.source_id_2 == pc2.c.source_id), + ) + ) + return conn.execute(stmt) + + +def update_matching_ids(match_groups, conn): + # match_groups doesn't include singletons, but we should still each + # unmatched record gets a sane matching_id (that is, its own id) + matching_ids_by_id = {id: id for (id,) in conn.execute(select(PdpContacts._id))} + for match_group in match_groups: + matching_id = min(match_group) + for id in match_group: + matching_ids_by_id[id] = matching_id + + # Load all the new id/matching-id pairs into a temp table so that we can do + # a fast UPDATE FROM to set all the matching ids in pdp_contacts + temp_table = Table( + "_tmp_matching_id_update", + MetaData(), # this is a temp table, we don't want to affect our knowledge of "real" tables + Column("_id", Integer, primary_key=True), + Column("matching_id", Integer), + prefixes=["TEMPORARY"], + postgresql_on_commit="DROP", + ) + temp_table.create(conn) + conn.execute( + insert(temp_table), + [ + {"_id": _id, "matching_id": matching_id} + for (_id, matching_id) in matching_ids_by_id.items() + ], + ) + conn.execute( + update(PdpContacts) + .where(PdpContacts._id == temp_table.c._id) + .values(matching_id=temp_table.c.matching_id) + ) diff --git a/src/server/pipeline/flow_script.py.orig b/src/server/pipeline/flow_script.py.orig deleted file mode 100644 index b3d0f4e5..00000000 --- a/src/server/pipeline/flow_script.py.orig +++ /dev/null @@ -1,174 +0,0 @@ -import os , sys, traceback -import pandas as pd - -from flask import current_app -from api import admin_api -from pipeline import calssify_new_data, clean_and_load_data, archive_rows, match_data, log_db -from config import RAW_DATA_PATH -from config import engine -from models import Base - -import time - -from rfm_funcs.create_scores import create_scores - -def start_flow(): - start = time.time() - job_id = admin_api.start_job() - job_outcome = None - trace_back_string = None - - - if not job_id: - current_app.logger.info('Failed to get job_id') - job_outcome = 'busy' - - else: - -<<<<<<< HEAD - try: - - log_db.log_exec_status(job_id, 'start_flow', 'executing', '') - - file_path_list = os.listdir(RAW_DATA_PATH) - - if file_path_list: - with engine.begin() as connection: - - # Get previous version of pdp_contacts table, which is used later to classify new records - pdp_contacts_df = pd.read_sql_table('pdp_contacts', connection) - pdp_contacts_df = pdp_contacts_df[pdp_contacts_df["archived_date"].isnull()] - pdp_contacts_df = pdp_contacts_df.drop(columns=['archived_date', 'created_date', '_id', 'matching_id']) - - current_app.logger.info('Loaded {} records from pdp_contacts table'.format(pdp_contacts_df.shape[0])) - - # Clean the input data and normalize/rename columns - # Populate new records in secondary tables (donations, volunteer shifts) - # input - existing files in path - # output - normalized object of all entries, as well as the input json rows for primary sources - log_db.log_exec_status(job_id, 'clean_and_load', 'executing', '') - normalized_data, source_json, manual_matches_df = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list) - # Standardize column data types via postgres (e.g. reading a csv column as int vs. str) - # (If additional inconsistencies are encountered, may need to enforce the schema of - # the contacts loader by initializing it from pdp_contacts.) - normalized_data.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace') - normalized_data = pd.read_sql_table('_temp_pdp_contacts_loader', connection) - - # Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB - log_db.log_exec_status(job_id, 'classify', 'executing', '') - rows_classified = calssify_new_data.start(pdp_contacts_df, normalized_data) - - # Archives rows the were updated in the current state of the DB (changes their archived_date to now) - archive_rows.archive(connection, rows_classified["updated"]) - - # Match new+updated records against previous version of pdp_contacts database, and - # write these rows to the database. - match_data.start(connection, rows_classified, manual_matches_df, job_id) - - # Copy raw input rows to json fields in pdp_contacts, - # using a temporary table to simplify the update code. - current_app.logger.info('Saving json of original rows to pdp_contacts') - source_json.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace') - # https://www.postgresql.org/docs/8.4/sql-update.html - connection.execute(''' - UPDATE pdp_contacts pdp - SET json = to_json(temp.json) - FROM _temp_pdp_contacts_loader temp - WHERE - pdp.source_type = temp.source_type AND - pdp.source_id = temp.source_id AND - pdp.archived_date IS NULL - ''') - - current_app.logger.info('Finished flow script run') - job_outcome = 'completed' - log_db.log_exec_status(job_id, 'flow', 'complete', '' ) - - - - else: # No files in list - current_app.logger.info('No files to process') - job_outcome = 'nothing to do' - log_db.log_exec_status(job_id, 'flow', 'complete', '' ) - - - except Exception as e: - current_app.logger.error(e) - trace_back_string = traceback.format_exc() - current_app.logger.error(trace_back_string) - - finally: - if job_outcome != 'completed': - - log_db.log_exec_status(job_id, 'flow', 'error', trace_back_string ) - current_app.logger.error("Uncaught error status, setting job status to \'error\' ") - job_outcome = 'error' - return 'error' -======= - log_db.log_exec_status(job_id, 'start_flow', 'executing', '') - - file_path_list = os.listdir(CURRENT_SOURCE_FILES_PATH) - - - - if file_path_list: - with engine.begin() as connection: - # Get previous version of pdp_contacts table, which is used later to classify new records - pdp_contacts_df = pd.read_sql_table('pdp_contacts', connection) - pdp_contacts_df = pdp_contacts_df[pdp_contacts_df["archived_date"].isnull()] - pdp_contacts_df = pdp_contacts_df.drop(columns=['archived_date', 'created_date', '_id', 'matching_id']) - - current_app.logger.info('Loaded {} records from pdp_contacts table'.format(pdp_contacts_df.shape[0])) - - # Clean the input data and normalize/rename columns - # Populate new records in secondary tables (donations, volunteer shifts) - # input - existing files in path - # output - normalized object of all entries, as well as the input json rows for primary sources - log_db.log_exec_status(job_id, 'clean_and_load', 'executing', '') - normalized_data, source_json, manual_matches_df = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list) - - # Standardize column data types via postgres (e.g. reading a csv column as int vs. str) - # (If additional inconsistencies are encountered, may need to enforce the schema of - # the contacts loader by initializing it from pdp_contacts.) - normalized_data.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace') - normalized_data = pd.read_sql_table('_temp_pdp_contacts_loader', connection) - - # Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB - log_db.log_exec_status(job_id, 'classify', 'executing', '') - rows_classified = calssify_new_data.start(pdp_contacts_df, normalized_data) - - # Archives rows the were updated in the current state of the DB (changes their archived_date to now) - archive_rows.archive(connection, rows_classified["updated"]) - - # Match new+updated records against previous version of pdp_contacts database, and - # write these rows to the database. - match_data.start(connection, rows_classified, manual_matches_df, job_id) - - # Copy raw input rows to json fields in pdp_contacts, - # using a temporary table to simplify the update code. - current_app.logger.info('Saving json of original rows to pdp_contacts') - source_json.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace') - # https://www.postgresql.org/docs/8.4/sql-update.html - connection.execute(''' - UPDATE pdp_contacts pdp - SET json = to_json(temp.json) - FROM _temp_pdp_contacts_loader temp - WHERE - pdp.source_type = temp.source_type AND - pdp.source_id = temp.source_id AND - pdp.archived_date IS NULL - ''') - - current_app.logger.info('Finished flow script run') - job_outcome = 'completed' - - else: # No files in list - current_app.logger.info('No files to process') - job_outcome = 'nothing to do' - - log_db.log_exec_status(job_id, 'flow', 'complete', '' ) - ->>>>>>> 7304841623130ff834249e2e66458e7e7a47e15f - - current_app.logger.info('Pipeline execution took {} seconds '.format(time.time() - start)) - return job_outcome \ No newline at end of file diff --git a/src/server/pipeline/match_data.py b/src/server/pipeline/match_data.py deleted file mode 100644 index 642d33d4..00000000 --- a/src/server/pipeline/match_data.py +++ /dev/null @@ -1,112 +0,0 @@ -import datetime, time -import pandas as pd -import numpy as np -from sqlalchemy.sql import text -import re - -from flask import current_app -from pipeline import log_db - - -def normalize_before_match(value): - result = None - - if isinstance(value, str): - result = value.lower().replace('"', '') - - return result - - -def start(connection, added_or_updated_rows, manual_matches_df, job_id): - # Match new records to each other and existing pdp_contacts data. - # Assigns matching ID's to records, as well. - # WARNING: not thread-safe and could lead to concurrency issues if two users /execute simultaneously - current_app.logger.info('Start record matching') - # Will need to consider updating the existing row contents (filter by active), deactivate, - # try to match, and merge previous matching groups if applicable - # job_id = str(int(time.time())) - log_db.log_exec_status(job_id, 'matching', 'executing', '') - - current_app.logger.info("***** Running execute job ID " + job_id + " *****") - items_to_update = pd.concat([added_or_updated_rows["new"], added_or_updated_rows["updated"]], ignore_index=True) - - query = "select max(matching_id) as matching_id from pdp_contacts where archived_date is null" - max_matching_id = connection.execute(query).fetchone()[0] - if max_matching_id == None: - max_matching_id = 0 - - # Initialize column metadata we'll write to pdp_contacts - items_to_update["matching_id"] = 0 # initializing an int and overwrite in the loop - items_to_update["archived_date"] = np.nan - items_to_update["created_date"] = datetime.datetime.now() - - # Create Normalized columns for matching - items_to_update["first_name_normalized"] = items_to_update["first_name"].apply(normalize_before_match) - items_to_update["last_name_normalized"] = items_to_update["last_name"].apply(normalize_before_match) - items_to_update["email_normalized"] = items_to_update["email"].apply(normalize_before_match) - - rows = items_to_update.to_dict(orient="records") - row_print_freq = 1000 - db_update_freq = 100 # update db after this many rows - - for row_num, row in enumerate(rows): - if row_num % row_print_freq == 0: # Write to log - current_app.logger.info("- Matching rows {}-{} of {}".format( - row_num + 1, min(len(rows), row_num + row_print_freq), len(rows)) - ) - - if row_num % db_update_freq == 0: # Update execution_status table - log_db.log_exec_status(job_id, 'matching', 'executing', str({'at_row': row_num + 1, 'of_rows': len(rows) }) ) - - # Exact matches based on specified columns - query = text("""select matching_id from pdp_contacts where archived_date is null and ( - ( - (((string_to_array(lower(first_name), ',') && :first_name) and (string_to_array(lower(last_name), ',') && :last_name)) - or - ((string_to_array(lower(first_name), ',') && :last_name) and (string_to_array(lower(last_name), ',') && :first_name))) - and - (lower(email) = :email or mobile = :mobile) - ))""") - - #TODO revist name tokenization - delimiters = ' and | & |, | ' - first_name_tokenized = re.split(delimiters, row["first_name_normalized"]) if row["first_name_normalized"] is not None else [] - last_name_tokenized = re.split(delimiters, row["last_name_normalized"]) if row["last_name_normalized"] is not None else [] - - results = connection.execute(query, first_name=first_name_tokenized, last_name=last_name_tokenized, email=row["email_normalized"], mobile=row["mobile"]) - existing_ids = list(map(lambda x: x.matching_id, results.fetchall())) - - #collect other linked ids from manual matches source - if not manual_matches_df.empty: - linked_ids = manual_matches_df[(manual_matches_df[row["source_type"]] == row["source_type"])] - ids = linked_ids.to_dict(orient="records") - for id_num, row_dict in enumerate(ids): - for column, value in row_dict.items(): - query = "select matching_id from pdp_contacts where source_type = :source_type and and source_id = :source_id and archived date is null" - results = connection.execute(query, source_type=column, source_id=value) - #TODO log ids provided by manual matches and not part of auto-matching - existing_ids = existing_ids + list(map(lambda x: x.matching_id, results.fetchall())) - - if len(existing_ids) == 0: - max_matching_id += 1 - matching_id = max_matching_id - else: - matching_id = max(existing_ids) - same_id = all(id == existing_ids[0] for id in existing_ids) - if not same_id: - old_ids = [id for id in existing_ids if id != matching_id] - for old_id in old_ids: - query = text("update pdp_contacts set matching_id = :matching_id where matching_id = :old_id and archived_date is null") - connection.execute(query, matching_id=matching_id, old_id=old_id) - current_app.logger.info("glue record found, changing id {} to {}".format(old_id, matching_id)) - - is_organization = False - if "account_name" in row.keys(): - if row["account_name"] != None and not row["account_name"].lower().endswith("household"): - is_organization = True - insert = text("insert into pdp_contacts(matching_id, source_type, source_id, is_organization, first_name, last_name, email, mobile, street_and_number, apartment, city, state, zip) \ - values(:matching_id, :source_type, :source_id, :is_organization, :first_name, :last_name, :email, :mobile, :street_and_number, :apartment, :city, :state, :zip)") - connection.execute(insert, matching_id=matching_id, source_type=row["source_type"], source_id=row["source_id"], is_organization=is_organization, first_name=row["first_name"], last_name=row["last_name"], email=row["email"], mobile=row["mobile"], street_and_number=row["street_and_number"], apartment=row["apartment"], city=row["city"], state=row["state"], zip=row["zip"]) - - current_app.logger.info("- Finished load to pdp_contacts table") - log_db.log_exec_status(job_id, 'matching', 'executing', str({'at_row': len(rows), 'of_rows': len(rows) }) ) diff --git a/src/server/requirements.txt b/src/server/requirements.txt index f9b60f7f..1e529959 100644 --- a/src/server/requirements.txt +++ b/src/server/requirements.txt @@ -12,3 +12,5 @@ alembic flask-cors dropbox jellyfish +networkx +itsdangerous==2.0.1 # TODO: upgrade flask so we don't have to do this \ No newline at end of file diff --git a/src/server/shifts_importer.py b/src/server/shifts_importer.py index e7cf3193..85a4eac9 100644 --- a/src/server/shifts_importer.py +++ b/src/server/shifts_importer.py @@ -6,7 +6,6 @@ from config import engine -from sqlalchemy.orm import Session, sessionmaker from sqlalchemy import insert, Table, Column, MetaData, exc from sqlalchemy.dialects.postgresql import Insert metadata = MetaData() @@ -30,7 +29,7 @@ 'Volunteers' : None } -def validate_import_vs(filename): +def validate_import_vs(filename, conn): """ Validate that the XLSX column names int the file are close enough to expectations that we can trust the data. If so, insert the data into the volgisticsshifts table. """ @@ -66,8 +65,6 @@ def validate_import_vs(filename): if min_similarity >= MINIMUM_SIMILARITY : # Good enough to trust - Session = sessionmaker(engine) - session = Session() vs = Table("volgisticsshifts", metadata, autoload=True, autoload_with=engine) seen_header = False # Used to skip header row @@ -106,7 +103,7 @@ def validate_import_vs(filename): constraint='uq_shift' ) try: - result = session.execute(skip_dupes) + result = conn.execute(skip_dupes) except exc.IntegrityError as e: # Catch-all for several more specific exceptions if re.match('duplicate key value', str(e.orig) ): dupes += 1 @@ -125,11 +122,11 @@ def validate_import_vs(filename): else: # Haven't seen header, so this was first row. seen_header = True - session.commit() # Commit all inserted rows + # NOTE: we now run this in a engine.begin() context manager, so our + # parent will commit. Don't commit here! current_app.logger.info("--------------------------------- Stats -------------------------------------------------") current_app.logger.info("Total rows: " + str(row_count) + " Dupes: " + str(dupes) + " Missing volgistics id: " + str(missing_volgistics_id) ) current_app.logger.info("Other integrity exceptions: " + str(other_integrity) + " Other exceptions: " + str(other_exceptions) ) - session.close() wb.close() return { True : "File imported" } \ No newline at end of file