Skip to content

Commit 6135064

Browse files
committed
tweaked updated data query to return data types that match the platform message, modify updated data to include platform message meta data, created internal endpoint to run full flow
1 parent ad93a35 commit 6135064

File tree

5 files changed

+67
-45
lines changed

5 files changed

+67
-45
lines changed

src/server/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ ENV PYTHONUNBUFFERED 1
99

1010
RUN apt-get update
1111

12-
RUN apt-get install -y python3-dev uwsgi uwsgi-src libcap-dev uwsgi-plugin-python3
12+
RUN apt-get install -y python3-dev uwsgi uwsgi-src libcap-dev uwsgi-plugin-python3 libpcre3-dev
1313
RUN pip install --upgrade pip
1414

1515
COPY requirements.txt /

src/server/api/API_ingest/salesforce_contacts.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111

1212
TEST_MODE = os.getenv("TEST_MODE") # if not present, has value None
1313

14+
DOMAIN = os.getenv("SALESFORCE_DOMAIN")
15+
CONSUMER_KEY = os.getenv('SALESFORCE_CONSUMER_KEY')
16+
USERNAME = os.getenv('SALESFORCE_USERNAME')
17+
1418
def store_contacts_all():
1519
Session = sessionmaker(engine)
1620
with Session() as session:
@@ -28,8 +32,8 @@ def store_contacts_all():
2832
logger.error("Missing salesforce jwt private key pem file, skipping data pull")
2933
return
3034

31-
sf = Salesforce(username=os.getenv('SALESFORCE_USERNAME'), consumer_key=os.getenv('SALESFORCE_CONSUMER_KEY'),
32-
privatekey_file=pem_file)
35+
sf = Salesforce(username=USERNAME, consumer_key=CONSUMER_KEY,
36+
privatekey_file=pem_file, domain=DOMAIN)
3337
results = sf.query("SELECT Contact_ID_18__c, FirstName, LastName, Contact.Account.Name, MailingCountry, MailingStreet, MailingCity, MailingState, MailingPostalCode, Phone, MobilePhone, Email FROM Contact")
3438
logger.debug("%d total Salesforce contact records", results['totalSize'])
3539
if TEST_MODE:
@@ -61,4 +65,4 @@ def store_contacts_all():
6165
results = sf.query_more(results['nextRecordsUrl'], True)
6266
logger.debug("Committing downloaded contact records")
6367
session.commit()
64-
logger.debug("finished downloading latest salesforce contacts data")
68+
logger.info("finished downloading latest salesforce contacts data")

src/server/api/API_ingest/updated_data.py

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,24 @@ def get_updated_contact_data():
3131
3232
select json_agg (upd) as "cd" from (
3333
select
34-
slsf.source_id as "contactId" , -- long salesforce string
35-
slp.id as "personId" , -- short PAWS-local shelterluv id
34+
slsf.source_id as "Contact_Record_Id__c" , -- long salesforce string
35+
slp.id as "Person_Id__c" , -- short PAWS-local shelterluv id
3636
37-
case
38-
when
39-
(extract(epoch from now())::bigint - foster_out < 365*86400) -- foster out in last year
40-
or (extract(epoch from now())::bigint - foster_return < 365*86400) -- foster return
41-
then 'Active'
42-
else 'Inactive'
43-
end as "updatedFosterStatus" ,
37+
--case
38+
-- when
39+
-- (extract(epoch from now())::bigint - foster_out < 365*86400) -- foster out in last year
40+
-- or (extract(epoch from now())::bigint - foster_return < 365*86400) -- foster return
41+
-- then 'Active'
42+
-- else 'Inactive'
43+
--end as "Updated_Recent_Foster_Activity__c",
4444
45-
(to_timestamp(foster_out ) at time zone 'America/New_York')::date as "updatedFosterStartDate",
46-
(to_timestamp(foster_return ) at time zone 'America/New_York')::date as "updatedFosterEndDate",
47-
48-
min(vs.from_date) as "updatedFirstVolunteerDate",
49-
max(vs.from_date) as "updatedLastVolunteerDate",
50-
vc.source_id as "volgisticsId"
45+
foster_out as "Updated_Foster_Start_Date__c",
46+
foster_return as "Updated_Foster_End_Date__c",
5147
48+
extract(epoch from min(vs.from_date)) as "Updated_First_Volunteer_Date__c",
49+
extract(epoch from max(vs.from_date)) as "Updated_Last_Volunteer_Date__c",
50+
sum(vs.hours) as "Updated_Total_Volunteer_Hours__c",
51+
vc.source_id::integer as "Volgistics_Id__c"
5252
5353
from
5454
ev_dates
@@ -69,12 +69,11 @@ def get_updated_contact_data():
6969
foster_return
7070
7171
) upd ;
72-
73-
7472
"""
7573

7674
with Session() as session:
7775
result = session.execute(qry)
7876
sfdata = result.fetchone()[0]
77+
logger.debug(sfdata)
7978
logger.debug("Query for Salesforce update returned %d records", len(sfdata))
8079
return sfdata

src/server/api/internal_api.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from rfm_funcs.create_scores import create_scores
99
from api.API_ingest import updated_data
1010
from pub_sub import salesforce_message_publisher
11-
import json
11+
from pipeline import flow_script
1212

1313
logger = structlog.get_logger()
1414

@@ -50,17 +50,28 @@ def hit_create_scores():
5050
@internal_api.route("/api/internal/get_updated_data", methods=["GET"])
5151
def get_contact_data():
5252
logger.debug("Calling get_updated_contact_data()")
53-
contact_json = updated_data.get_updated_contact_data()
54-
logger.debug("Returning %d contact records", len(contact_json) )
53+
contact_json = updated_data.get_updated_contact_data()
54+
logger.debug("Returning %d contact records", len(contact_json))
5555
return jsonify(contact_json), 200
5656

5757

58-
@internal_api.route("/api/internal/salesforce_platform_message", methods=["POST"])
59-
def hit_salesforce_platform_message():
60-
try:
61-
post_dict = json.loads(request.data)
62-
salesforce_message_publisher.pipeline_update_message(post_dict)
63-
except Exception as e:
64-
logger.error(e)
58+
@internal_api.route("/api/internal/send_salesforce_platform_message", methods=["GET"])
59+
def send_salesforce_platform_message():
60+
contact_list = updated_data.get_updated_contact_data()
61+
logger.debug("Returning %d contact records", len(contact_list))
62+
salesforce_message_publisher.send_pipeline_update_messages(contact_list)
6563

6664
return jsonify({'outcome': 'OK'}), 200
65+
66+
@internal_api.route("/api/internal/full_flow", methods=["GET"])
67+
def start_flow():
68+
logger.info("Downloading data from APIs")
69+
ingest_sources_from_api.start()
70+
logger.info("Starting pipeline matching")
71+
flow_script.start_flow()
72+
logger.info("Building updated data payload")
73+
updated_contacts_list = updated_data.get_updated_contact_data()
74+
logger.info("Sending Salesforce platform messages")
75+
salesforce_message_publisher.send_pipeline_update_messages(updated_contacts_list)
76+
77+
return jsonify({'outcome': 'OK'}), 200

src/server/pub_sub/salesforce_message_publisher.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,21 @@
99
import avro.io
1010
import io
1111
import structlog
12+
from datetime import datetime
1213

1314
logger = structlog.get_logger()
1415

15-
# todo: also read these from dict module
1616
ISSUER = os.getenv("SALESFORCE_CONSUMER_KEY")
1717
DOMAIN = os.getenv("SALESFORCE_DOMAIN")
1818
SUBJECT = os.getenv("SALESFORCE_USERNAME")
1919
INSTANCE_URL = os.getenv("INSTANCE_URL")
2020
TENANT_ID = os.getenv("TENANT_ID")
21+
PLATFORM_MESSAGE_AUTHOR = os.getenv("PLATFORM_MESSAGE_AUTHOR_RECORD_ID")
2122

2223
UPDATE_TOPIC = "/event/Updated_Contacts_From_Pipeline__e"
2324

2425

25-
def pipeline_update_message(message_dict):
26-
# todo: look for certificate using both local and container pathing
26+
def send_pipeline_update_messages(contacts_list):
2727
pem_file = 'bin/connected-app-secrets.pem'
2828
with open(pem_file) as fd:
2929
private_key = fd.read()
@@ -42,7 +42,6 @@ def pipeline_update_message(message_dict):
4242
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
4343
'assertion': assertion,
4444
})
45-
logger.info(r.json())
4645
access_token = r.json()['access_token']
4746
logger.info('Made OAuth call to get access token')
4847

@@ -52,16 +51,25 @@ def pipeline_update_message(message_dict):
5251
auth_meta_data = (('accesstoken', access_token),
5352
('instanceurl', INSTANCE_URL),
5453
('tenantid', TENANT_ID))
54+
55+
5556
stub = pb2_grpc.PubSubStub(channel)
5657
schema_id = stub.GetTopic(pb2.TopicRequest(topic_name=UPDATE_TOPIC), metadata=auth_meta_data).schema_id
5758
schema = stub.GetSchema(pb2.SchemaRequest(schema_id=schema_id), metadata=auth_meta_data).schema_json
58-
buf = io.BytesIO()
59-
encoder = avro.io.BinaryEncoder(buf)
60-
writer = avro.io.DatumWriter(avro.schema.parse(schema))
61-
writer.write(message_dict, encoder)
62-
payload = {
63-
"schema_id": schema_id,
64-
"payload": buf.getvalue()
65-
}
66-
stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=[payload]), metadata=auth_meta_data)
67-
logger.info('Pipeline update message sent')
59+
60+
for contact_dict in contacts_list:
61+
contact_dict['CreatedDate'] = int(datetime.now().timestamp())
62+
contact_dict['CreatedById'] = PLATFORM_MESSAGE_AUTHOR
63+
64+
buf = io.BytesIO()
65+
encoder = avro.io.BinaryEncoder(buf)
66+
writer = avro.io.DatumWriter(avro.schema.parse(schema))
67+
writer.write(contact_dict, encoder)
68+
payload = {
69+
"schema_id": schema_id,
70+
"payload": buf.getvalue()
71+
}
72+
stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=[payload]), metadata=auth_meta_data)
73+
logger.info('Pipeline update message sent')
74+
75+
logger.info("%s total pipeline update messages sent", len(contacts_list))

0 commit comments

Comments
 (0)