diff --git a/.gitignore b/.gitignore index 7df44560..b90b0ab5 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,5 @@ start_env.sh *secrets* *kustomization* src/.venv/ +src/server/secrets_dict.py + diff --git a/src/server/api/.optic/.gitignore b/src/server/api/.optic/.gitignore deleted file mode 100644 index 2eb71261..00000000 --- a/src/server/api/.optic/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ - -captures/ diff --git a/src/server/api/API_ingest/ingest_sources_from_api.py b/src/server/api/API_ingest/ingest_sources_from_api.py index d9915a2c..a41ef6cb 100644 --- a/src/server/api/API_ingest/ingest_sources_from_api.py +++ b/src/server/api/API_ingest/ingest_sources_from_api.py @@ -1,22 +1,21 @@ +from api.API_ingest import shelterluv_people, salesforce_contacts, sl_animal_events import structlog logger = structlog.get_logger() -from api.API_ingest import shelterluv_api_handler, sl_animal_events +def start(): + logger.debug("Start Fetching raw data from different API sources") -def start(conn): - logger.debug("Start fetching raw data from different API sources") + logger.debug(" Fetching Salesforce contacts") + salesforce_contacts.store_contacts_all() + logger.debug(" Finished fetching Salesforce contacts") logger.debug(" Fetching Shelterluv people") - #Run each source to store the output in dropbox and in the container as a CSV - slp_count = shelterluv_api_handler.store_shelterluv_people_all(conn) + slp_count = shelterluv_people.store_shelterluv_people_all() logger.debug(" Finished fetching Shelterluv people - %d records" , slp_count) logger.debug(" Fetching Shelterluv events") - #Run each source to store the output in dropbox and in the container as a CSV - sle_count = sl_animal_events.slae_test() + sle_count = sl_animal_events.store_all_animals_and_events() logger.debug(" Finished fetching Shelterluv events - %d records" , sle_count) logger.debug("Finished fetching raw data from different API sources") - - #TODO: Return object with count for each data source? diff --git a/src/server/api/API_ingest/salesforce_contacts.py b/src/server/api/API_ingest/salesforce_contacts.py new file mode 100644 index 00000000..c1957f68 --- /dev/null +++ b/src/server/api/API_ingest/salesforce_contacts.py @@ -0,0 +1,44 @@ +import os + +from sqlalchemy.orm import sessionmaker +from simple_salesforce import Salesforce +from config import engine +from models import SalesForceContacts + +import structlog +logger = structlog.get_logger() + +def store_contacts_all(): + Session = sessionmaker(engine) + with Session() as session: + + logger.debug("truncating table salesforcecontacts") + session.execute("TRUNCATE TABLE salesforcecontacts") + + logger.debug("retrieving the latest salesforce contacts data") + sf = Salesforce(domain=os.getenv('SALESFORCE_DOMAIN'), password=os.getenv('SALESFORCE_PW'), username=os.getenv('SALESFORCE_USERNAME'), security_token=os.getenv('SALESFORCE_SECURITY_TOKEN')) + results = sf.query("SELECT Contact_ID_18__c, FirstName, LastName, Contact.Account.Name, MailingCountry, MailingStreet, MailingCity, MailingState, MailingPostalCode, Phone, MobilePhone, Email FROM Contact") + logger.debug("Query returned %d Salesforce contact records", len(results['records']) ) + + done = False + while not done: + for row in results['records']: + account_name = row['Account']['Name'] if row['Account'] is not None else None + contact = SalesForceContacts(contact_id=row['Contact_ID_18__c'], + first_name=row['FirstName'], + last_name=row['LastName'], + account_name=account_name, + mailing_country=row['MailingCountry'], + mailing_street=row['MailingStreet'], + mailing_city=row['MailingCity'], + mailing_state_province=row['MailingState'], + mailing_zip_postal_code=row['MailingPostalCode'], + phone=row['Phone'], + mobile=row['MobilePhone'], + email=['Email']) + session.add(contact) + done = results['done'] + if not done: + results = sf.query_more(results['nextRecordsUrl']) + session.commit() + logger.debug("finished downloading latest salesforce contacts data") \ No newline at end of file diff --git a/src/server/api/API_ingest/shelterluv_api_handler.py b/src/server/api/API_ingest/shelterluv_api_handler.py deleted file mode 100644 index 84572efc..00000000 --- a/src/server/api/API_ingest/shelterluv_api_handler.py +++ /dev/null @@ -1,114 +0,0 @@ -import csv -import os -import time - -import requests -import pandas as pd -from api.API_ingest.dropbox_handler import upload_file_to_dropbox -from constants import RAW_DATA_PATH -from models import ShelterluvPeople -import structlog -logger = structlog.get_logger() - - -TEST_MODE = os.getenv("TEST_MODE") - -try: - from secrets_dict import SHELTERLUV_SECRET_TOKEN -except ImportError: - # Not running locally - logger.debug("Couldn't get SHELTERLUV_SECRET_TOKEN from file, trying environment **********") - from os import environ - - try: - SHELTERLUV_SECRET_TOKEN = environ['SHELTERLUV_SECRET_TOKEN'] - except KeyError: - # Not in environment - # You're SOL for now - logger.error("Couldn't get SHELTERLUV_SECRET_TOKEN from file or environment") - - -def write_csv(json_data): - now = time.localtime() - now_date = time.strftime("%Y-%m-%d--%H-%M-%S", now) - - path = RAW_DATA_PATH + "shelterluvpeople-" + now_date + ".csv" # store file name to use for dropbox - - file_handle = open(path, "w") - - csv_writer = csv.writer(file_handle) - - count = 0 - for item in json_data: - if count == 0: - # Writing headers of CSV file - header = item.keys() - csv_writer.writerow(header) - count += 1 - - # Writing data of CSV file - csv_writer.writerow(item.values()) - - file_handle.close() - - return path - -################################# -# This script is used to fetch data from shelterluv API. -# Please be mindful of your usage. -# example: /people will fetch the data of all people. and send approximately 300 requests. -# https://help.shelterluv.com/hc/en-us/articles/115000580127-Shelterluv-API-Overview -################################# - -######## Insights ############### -# Max result items is 100 - even though it's not specifically specified in the above reference -# /people has all the data. it seems that /person/:id isn't used -################################# - -''' Iterate over all shelterlove people and store in json file in the raw data folder -We fetch 100 items in each request, since that is the limit based on our research ''' -def store_shelterluv_people_all(conn): - offset = 0 - LIMIT = 100 - has_more = True - shelterluv_people = [] - - logger.debug("Start getting shelterluv contacts from people table") - - while has_more: - r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset), - headers={"x-api-key": SHELTERLUV_SECRET_TOKEN}) - response = r.json() - shelterluv_people += response["people"] - has_more = response["has_more"] - offset += 100 - - if offset % 1000 == 0: - print("Reading offset ", str(offset)) - if TEST_MODE and offset > 1000: - has_more=False # Break out early - - - - print("Finish getting shelterluv contacts from people table") - - logger.debug("Start storing latest shelterluvpeople results to container") - if os.listdir(RAW_DATA_PATH): - for file_name in os.listdir(RAW_DATA_PATH): - file_path = os.path.join(RAW_DATA_PATH, file_name) - file_name_striped = file_path.split('-')[0].split('/')[-1] - - if file_name_striped == "shelterluvpeople": - os.remove(file_path) - - file_path = write_csv(shelterluv_people) - logger.debug("Finish storing latest shelterluvpeople results to container") - - logger.debug("Start storing " + '/shelterluv/' + "results to dropbox") - upload_file_to_dropbox(file_path, '/shelterluv/' + file_path.split('/')[-1]) - logger.debug("Finish storing " + '/shelterluv/' + "results to dropbox") - - logger.debug("Uploading shelterluvpeople csv to database") - ShelterluvPeople.insert_from_df(pd.read_csv(file_path, dtype="string"), conn) - - return offset diff --git a/src/server/api/API_ingest/shelterluv_db.py b/src/server/api/API_ingest/shelterluv_db.py index b3518cf5..dd952848 100644 --- a/src/server/api/API_ingest/shelterluv_db.py +++ b/src/server/api/API_ingest/shelterluv_db.py @@ -1,18 +1,10 @@ -from api.api import common_api -from config import engine -from flask import jsonify, current_app -from sqlalchemy.sql import text -import requests -import time -from datetime import datetime - -from sqlalchemy.dialects.postgresql import insert from sqlalchemy import Table, MetaData -from pipeline import flow_script +from sqlalchemy.orm import sessionmaker + from config import engine -from flask import request, redirect, jsonify, current_app -from api.file_uploader import validate_and_arrange_upload -from sqlalchemy.orm import Session, sessionmaker + +import structlog +logger = structlog.get_logger() def insert_animals(animal_list): @@ -53,11 +45,9 @@ def truncate_animals(): Session = sessionmaker(engine) session = Session() - metadata = MetaData() - sla = Table("shelterluv_animals", metadata, autoload=True, autoload_with=engine) truncate = "TRUNCATE table shelterluv_animals;" - result = session.execute(truncate) + session.execute(truncate) session.commit() # Commit all inserted rows session.close() @@ -69,19 +59,13 @@ def truncate_events(): """Truncate the shelterluv_events table""" Session = sessionmaker(engine) - session = Session() - metadata = MetaData() - sla = Table("sl_animal_events", metadata, autoload=True, autoload_with=engine) - - truncate = "TRUNCATE table sl_animal_events;" - result = session.execute(truncate) - - session.commit() # Commit all inserted rows - session.close() + with Session() as session: + truncate = "TRUNCATE table sl_animal_events;" + session.execute(truncate) + session.commit() return 0 - def insert_events(event_list): """Insert event records into sl_animal_events table and return row count. """ @@ -89,61 +73,60 @@ def insert_events(event_list): truncate_events() Session = sessionmaker(engine) - session = Session() - metadata = MetaData() - sla = Table("sl_animal_events", metadata, autoload=True, autoload_with=engine) - - # TODO: Pull from DB - inserted in db_setup/base_users.py/populate_sl_event_types() - event_map = { - "Outcome.Adoption": 1, - "Outcome.Foster": 2, - "Outcome.ReturnToOwner": 3, - "Intake.AdoptionReturn": 4, - "Intake.FosterReturn":5 - } - - # """ INSERT INTO "sl_event_types" ("id","event_name") VALUES - # ( 1,'Outcome.Adoption' ), - # ( 2,'Outcome.Foster' ), - # ( 3,'Outcome.ReturnToOwner' ), - # ( 4,'Intake.AdoptionReturn' ), - # ( 5,'Intake.FosterReturn' ) """ - - - - - # Event record: [ AssociatedRecords[Type = Person]["Id"]', - # AssociatedRecords[Type = Animal]["Id"]', - # "Type", - # "Time" - # ] - # - # In db: ['id', - # 'person_id', - # 'animal_id', - # 'event_type', - # 'time'] - - ins_list = [] # Create a list of per-row dicts - for rec in event_list: - ins_list.append( - { - "person_id": next( - filter(lambda x: x["Type"] == "Person", rec["AssociatedRecords"]) - )["Id"], - "animal_id": next( - filter(lambda x: x["Type"] == "Animal", rec["AssociatedRecords"]) - )["Id"], - "event_type": event_map[rec["Type"]], - "time": rec["Time"], - } - ) - - # TODO: Wrap with try/catch - ret = session.execute(sla.insert(ins_list)) - - session.commit() # Commit all inserted rows - session.close() + with Session() as session: + metadata = MetaData() + sla = Table("sl_animal_events", metadata, autoload=True, autoload_with=engine) + + # TODO: Pull from DB - inserted in db_setup/base_users.py/populate_sl_event_types() + event_map = { + "Outcome.Adoption": 1, + "Outcome.Foster": 2, + "Outcome.ReturnToOwner": 3, + "Intake.AdoptionReturn": 4, + "Intake.FosterReturn":5 + } + + # """ INSERT INTO "sl_event_types" ("id","event_name") VALUES + # ( 1,'Outcome.Adoption' ), + # ( 2,'Outcome.Foster' ), + # ( 3,'Outcome.ReturnToOwner' ), + # ( 4,'Intake.AdoptionReturn' ), + # ( 5,'Intake.FosterReturn' ) """ + + + + + # Event record: [ AssociatedRecords[Type = Person]["Id"]', + # AssociatedRecords[Type = Animal]["Id"]', + # "Type", + # "Time" + # ] + # + # In db: ['id', + # 'person_id', + # 'animal_id', + # 'event_type', + # 'time'] + + ins_list = [] # Create a list of per-row dicts + for rec in event_list: + ins_list.append( + { + "person_id": next( + filter(lambda x: x["Type"] == "Person", rec["AssociatedRecords"]) + )["Id"], + "animal_id": next( + filter(lambda x: x["Type"] == "Animal", rec["AssociatedRecords"]) + )["Id"], + "event_type": event_map[rec["Type"]], + "time": rec["Time"], + } + ) + + # TODO: Wrap with try/catch + ret = session.execute(sla.insert(ins_list)) + session.commit() + logger.debug("finished inserting events") return ret.rowcount diff --git a/src/server/api/API_ingest/shelterluv_people.py b/src/server/api/API_ingest/shelterluv_people.py new file mode 100644 index 00000000..76932a7e --- /dev/null +++ b/src/server/api/API_ingest/shelterluv_people.py @@ -0,0 +1,79 @@ +import requests, os +from models import ShelterluvPeople +from config import engine +from sqlalchemy.orm import sessionmaker +import structlog +logger = structlog.get_logger() + +try: + from secrets_dict import SHELTERLUV_SECRET_TOKEN +except ImportError: + # Not running locally + logger.debug("Couldn't get SHELTERLUV_SECRET_TOKEN from file, trying environment **********") + from os import environ + + try: + SHELTERLUV_SECRET_TOKEN = environ['SHELTERLUV_SECRET_TOKEN'] + except KeyError: + # Not in environment + # You're SOL for now + logger.error("Couldn't get SHELTERLUV_SECRET_TOKEN from file or environment") + + + +TEST_MODE=os.getenv("TEST_MODE") # if not present, has value None +LIMIT = 100 +################################# +# This script is used to fetch data from shelterluv API. +# Please be mindful of your usage. +# example: /people will fetch the data of all people. and send approximately 300 requests. +# https://help.shelterluv.com/hc/en-us/articles/115000580127-Shelterluv-API-Overview +################################# + +######## Insights ############### +# Max result items is 100 - even though it's not specifically specified in the above reference +# /people has all the data. it seems that /person/:id isn't used +################################# + +''' Iterate over all shelterlove people and store in json file in the raw data folder +We fetch 100 items in each request, since that is the limit based on our research ''' +def store_shelterluv_people_all(): + offset = 0 + has_more = True + Session = sessionmaker(engine) + + with Session() as session: + logger.debug("Truncating table shelterluvpeople") + + session.execute("TRUNCATE TABLE shelterluvpeople") + + logger.debug("Start getting shelterluv contacts from people table") + + while has_more: + r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset), + headers={"x-api-key": SHELTERLUV_SECRET_TOKEN}) + response = r.json() + for person in response["people"]: + #todo: Does this need more "null checks"? + session.add(ShelterluvPeople(firstname=person["Firstname"], + lastname=person["Lastname"], + id=person["ID"] if "ID" in person else None, + internal_id=person["Internal-ID"], + associated=person["Associated"], + street=person["Street"], + apartment=person["Apartment"], + city=person["City"], + state=person["State"], + zip=person["Zip"], + email=person["Email"], + phone=person["Phone"], + animal_ids=person["Animal_ids"])) + offset += LIMIT + has_more = response["has_more"] if not TEST_MODE else response["has_more"] and offset < 1000 + if offset % 1000 == 0: + logger.debug("Reading offset %s", str(offset)) + session.commit() + + logger.debug("Finished getting shelterluv contacts from people table") + return offset + diff --git a/src/server/api/API_ingest/sl_animal_events.py b/src/server/api/API_ingest/sl_animal_events.py index d7f6a472..43170184 100644 --- a/src/server/api/API_ingest/sl_animal_events.py +++ b/src/server/api/API_ingest/sl_animal_events.py @@ -1,13 +1,14 @@ -import os, time, json +import json +import os import posixpath as path import structlog + logger = structlog.get_logger() import requests from api.API_ingest import shelterluv_db -from server.api.API_ingest.shelterluv_db import insert_animals # There are a number of different record types. These are the ones we care about. keep_record_types = [ @@ -143,7 +144,7 @@ def get_events_bulk(): more_records = decoded["has_more"] # if so, we'll make another pass offset += limit if offset % 1000 == 0: - logger.debug("Reading offset ", str(offset)) + logger.debug("Reading offset %s", str(offset)) if TEST_MODE and offset > 1000: more_records=False # Break out early @@ -155,10 +156,23 @@ def get_events_bulk(): def slae_test(): total_count = get_event_count() - logger.debug("Total events:", total_count) + logger.debug("Total events: %d", total_count) + + b = get_events_bulk() + logger.debug("Stored records: %d", len(b)) + + # f = filter_events(b) + # print(f) + + count = shelterluv_db.insert_events(b) + return count + +def store_all_animals_and_events(): + total_count = get_event_count() + logger.debug("Total events: %d", total_count) b = get_events_bulk() - logger.debug("Strored records:", len(b)) + logger.debug("Stored records: %d", len(b)) # f = filter_events(b) # print(f) diff --git a/src/server/api/admin_api.py b/src/server/api/admin_api.py index 80763f8b..ca20c549 100644 --- a/src/server/api/admin_api.py +++ b/src/server/api/admin_api.py @@ -9,9 +9,9 @@ from sqlalchemy import Table, MetaData from pipeline import flow_script from config import engine -from flask import request, redirect, jsonify, current_app +from flask import request, redirect, jsonify from api.file_uploader import validate_and_arrange_upload -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.orm import sessionmaker from api import jwt_ops from config import RAW_DATA_PATH @@ -224,10 +224,10 @@ def start_job(): if running_job : # There was a running job already - logger.warn("Request to start job, but job_id " + str(running_job) + " already executing") + logger.warn("Request to start job, but job_id %s already executing", str(running_job)) return None else: - logger.info("Assigned job_id %s" + str(job_id ) ) + logger.info("Assigned job_id %s" , str(job_id ) ) return job_id @@ -392,7 +392,7 @@ def generate_dummy_rfm_scores(): return count -# ########### Test API endpoints +# ########### Test API endpoints # TODO: Remove for production # trigger rfm scoring process diff --git a/src/server/api/internal_api.py b/src/server/api/internal_api.py index bcae6704..8c21236e 100644 --- a/src/server/api/internal_api.py +++ b/src/server/api/internal_api.py @@ -1,11 +1,12 @@ -from api.api import internal_api -from config import engine -from flask import jsonify, current_app from datetime import datetime + +import structlog +from flask import jsonify + from api.API_ingest import ingest_sources_from_api +from api.api import internal_api from rfm_funcs.create_scores import create_scores -import structlog logger = structlog.get_logger() ### Internal API endpoints can only be accessed from inside the cluster; @@ -28,8 +29,7 @@ def user_test2(): @internal_api.route("/api/internal/ingestRawData", methods=["GET"]) def ingest_raw_data(): try: - with engine.begin() as conn: - ingest_sources_from_api.start(conn) + ingest_sources_from_api.start() except Exception as e: logger.error(e) diff --git a/src/server/bin/export_secrets.sh b/src/server/bin/export_secrets.sh index 081b4ce1..e505e271 100644 --- a/src/server/bin/export_secrets.sh +++ b/src/server/bin/export_secrets.sh @@ -1,3 +1,3 @@ set -o allexport -source secrets_dict.py +source bin/secrets_dict.py set +o allexport diff --git a/src/server/models.py b/src/server/models.py index 8bffa419..f5b2cccb 100644 --- a/src/server/models.py +++ b/src/server/models.py @@ -379,3 +379,17 @@ def insert_from_df(cls, df, conn): ) conn.execute(insert(cls).values(matched_pairs).on_conflict_do_nothing()) + +class SalesforceDonations(Base): + __tablename__ = "salesforcedonations" + + _id = sa.Column(sa.Integer, primary_key=True) + opp_id = sa.Column(sa.String) + recurring_donor = sa.Column(sa.Boolean) + primary_contact = sa.Column(sa.String) + contact_id = sa.Column(sa.String) + amount = sa.Column(sa.Numeric) + close_date = sa.Column(sa.Date) + donation_type = sa.Column(sa.String) + primary_campaign_source = sa.Column(sa.String) + diff --git a/src/server/pipeline/flow_script.py b/src/server/pipeline/flow_script.py index e55c466a..9a4a4766 100644 --- a/src/server/pipeline/flow_script.py +++ b/src/server/pipeline/flow_script.py @@ -67,22 +67,26 @@ def start_flow(): # 6. Update each row in pdp_contacts to give it a match id # corresponding to its connected componenet. - logger.info("Clearing pdp_contacts to prepare for match") + logger.debug("Clearing pdp_contacts to prepare for match") reset_pdp_contacts_with_unmatched(conn) - logger.info("Computing matches") + logger.debug("Computing automatic matches") automatic_matches = get_automatic_matches(conn) + logger.debug("Computing manual matches") manual_matches = get_manual_matches(conn) match_graph = Graph() + logger.debug("Adding automatic matches to graph") match_graph.add_edges_from(automatic_matches) + logger.debug("Adding manual matches to graph") match_graph.add_edges_from(manual_matches) + logger.debug("Processing graph") match_groups = connected_components(match_graph) - logger.info("Updating pdp_contacts with match ids") + logger.debug("Updating pdp_contacts with match ids") update_matching_ids(match_groups, conn) - logger.info("Finished flow script run") + logger.debug("Finished flow script run") job_outcome = "completed" log_db.log_exec_status(job_id, "flow", "complete", "") diff --git a/src/server/requirements.txt b/src/server/requirements.txt index d70b1115..dab58e43 100644 --- a/src/server/requirements.txt +++ b/src/server/requirements.txt @@ -16,5 +16,6 @@ jellyfish networkx jinja2<3.1.0 itsdangerous==2.0.1 # TODO: upgrade flask so we don't have to do this +simple-salesforce werkzeug==2.0.3 structlog \ No newline at end of file diff --git a/src/server/secrets_dict.py b/src/server/secrets_dict.py index e0d3ce6f..dc16857e 100644 --- a/src/server/secrets_dict.py +++ b/src/server/secrets_dict.py @@ -1,5 +1,4 @@ SD_COMMENT="This is for local development" -SHELTERLUV_SECRET_TOKEN="" APP_SECRET_KEY="ASKASK" JWT_SECRET="JWTSECRET" POSTGRES_PASSWORD="thispasswordisverysecure" diff --git a/src/server/shifts_importer.py b/src/server/shifts_importer.py index 972ad556..9299a0ef 100644 --- a/src/server/shifts_importer.py +++ b/src/server/shifts_importer.py @@ -130,7 +130,7 @@ def validate_import_vs(filename, conn): # parent will commit. Don't commit here! - logger.info("Total rows: %s Dupes: %s Missing volgistics id: ", str(row_count), str(dupes), str(missing_volgistics_id) ) + logger.info("Total rows: %s Dupes: %s Missing volgistics id: %s", str(row_count), str(dupes), str(missing_volgistics_id) ) logger.info("Other integrity exceptions: %s Other exceptions: %s", str(other_exceptions), str(other_integrity) ) wb.close() return { True : "File imported" } \ No newline at end of file