Skip to content

Commit 97c3b6c

Browse files
authored
Merge pull request #560 from CodeForPhilly/506-salesforce-message-publisher
506 salesforce message publisher
2 parents 8c592b0 + cbb26fe commit 97c3b6c

File tree

10 files changed

+500
-34
lines changed

10 files changed

+500
-34
lines changed

src/server/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ ENV PYTHONUNBUFFERED 1
99

1010
RUN apt-get update
1111

12-
RUN apt-get install -y python3-dev uwsgi uwsgi-src libcap-dev uwsgi-plugin-python3
12+
RUN apt-get install -y python3-dev uwsgi uwsgi-src libcap-dev uwsgi-plugin-python3 libpcre3-dev
1313
RUN pip install --upgrade pip
1414

1515
COPY requirements.txt /

src/server/api/API_ingest/salesforce_contacts.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111

1212
TEST_MODE = os.getenv("TEST_MODE") # if not present, has value None
1313

14+
DOMAIN = os.getenv("SALESFORCE_DOMAIN")
15+
CONSUMER_KEY = os.getenv('SALESFORCE_CONSUMER_KEY')
16+
USERNAME = os.getenv('SALESFORCE_USERNAME')
17+
1418
def store_contacts_all():
1519
Session = sessionmaker(engine)
1620
with Session() as session:
@@ -28,8 +32,8 @@ def store_contacts_all():
2832
logger.error("Missing salesforce jwt private key pem file, skipping data pull")
2933
return
3034

31-
sf = Salesforce(username=os.getenv('SALESFORCE_USERNAME'), consumer_key=os.getenv('SALESFORCE_CONSUMER_KEY'),
32-
privatekey_file=pem_file)
35+
sf = Salesforce(username=USERNAME, consumer_key=CONSUMER_KEY,
36+
privatekey_file=pem_file, domain=DOMAIN)
3337
results = sf.query("SELECT Contact_ID_18__c, FirstName, LastName, Contact.Account.Name, MailingCountry, MailingStreet, MailingCity, MailingState, MailingPostalCode, Phone, MobilePhone, Email FROM Contact")
3438
logger.debug("%d total Salesforce contact records", results['totalSize'])
3539
if TEST_MODE:
@@ -61,4 +65,4 @@ def store_contacts_all():
6165
results = sf.query_more(results['nextRecordsUrl'], True)
6266
logger.debug("Committing downloaded contact records")
6367
session.commit()
64-
logger.debug("finished downloading latest salesforce contacts data")
68+
logger.info("finished downloading latest salesforce contacts data")
Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11

2-
from sqlalchemy.orm import sessionmaker
3-
from simple_salesforce import Salesforce
2+
import structlog
3+
from sqlalchemy.orm import sessionmaker
4+
45
from config import engine
56

6-
import structlog
77
logger = structlog.get_logger()
88

99

@@ -14,10 +14,10 @@ def get_updated_contact_data():
1414
with ev_dates as
1515
(select
1616
person_id,
17-
max(case when event_type=1 then time else null end) adopt,
18-
max(case when event_type=2 then time else null end) foster_out,
17+
max(case when event_type=1 then time else null end) * 1000 adopt,
18+
max(case when event_type=2 then time else null end) * 1000 foster_out,
1919
-- max(case when event_type=3 then time else null end) rto,
20-
max(case when event_type=5 then time else null end) foster_return
20+
max(case when event_type=5 then time else null end) * 1000 foster_return
2121
2222
from
2323
sl_animal_events sla
@@ -31,24 +31,24 @@ def get_updated_contact_data():
3131
3232
select json_agg (upd) as "cd" from (
3333
select
34-
slsf.source_id as "contactId" , -- long salesforce string
35-
slp.id as "personId" , -- short PAWS-local shelterluv id
34+
slsf.source_id as "Contact_Record_Id__c" , -- long salesforce string
35+
slp.internal_id as "Person_Id__c" , -- short PAWS-local shelterluv id
3636
37-
case
38-
when
39-
(extract(epoch from now())::bigint - foster_out < 365*86400) -- foster out in last year
40-
or (extract(epoch from now())::bigint - foster_return < 365*86400) -- foster return
41-
then 'Active'
42-
else 'Inactive'
43-
end as "updatedFosterStatus" ,
37+
--case
38+
-- when
39+
-- (extract(epoch from now())::bigint - foster_out < 365*86400) -- foster out in last year
40+
-- or (extract(epoch from now())::bigint - foster_return < 365*86400) -- foster return
41+
-- then 'Active'
42+
-- else 'Inactive'
43+
--end as "Updated_Recent_Foster_Activity__c",
4444
45-
(to_timestamp(foster_out ) at time zone 'America/New_York')::date as "updatedFosterStartDate",
46-
(to_timestamp(foster_return ) at time zone 'America/New_York')::date as "updatedFosterEndDate",
47-
48-
min(vs.from_date) as "updatedFirstVolunteerDate",
49-
max(vs.from_date) as "updatedLastVolunteerDate",
50-
vc.source_id as "volgisticsId"
45+
foster_out as "Updated_Foster_Start_Date__c",
46+
foster_return as "Updated_Foster_End_Date__c",
5147
48+
extract(epoch from min(vs.from_date)) * 1000 as "Updated_First_Volunteer_Date__c",
49+
extract(epoch from max(vs.from_date)) * 1000 as "Updated_Last_Volunteer_Date__c",
50+
sum(vs.hours) as "Updated_Total_Volunteer_Hours__c",
51+
vc.source_id::integer as "Volgistics_Id__c"
5252
5353
from
5454
ev_dates
@@ -63,18 +63,17 @@ def get_updated_contact_data():
6363
6464
group by
6565
slsf.source_id,
66-
slp.id,
66+
slp.internal_id,
6767
vc.source_id,
6868
foster_out ,
6969
foster_return
7070
7171
) upd ;
72-
73-
7472
"""
7573

7674
with Session() as session:
7775
result = session.execute(qry)
7876
sfdata = result.fetchone()[0]
77+
logger.debug(sfdata)
7978
logger.debug("Query for Salesforce update returned %d records", len(sfdata))
8079
return sfdata

src/server/api/internal_api.py

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
import structlog
44
from flask import jsonify
55

6-
from api.API_ingest import ingest_sources_from_api, salesforce_contacts
7-
from api.api import internal_api
8-
# from rfm_funcs.create_scores import create_scores
6+
from api.API_ingest import ingest_sources_from_api
97
from api.API_ingest import updated_data
8+
from api.api import internal_api
9+
10+
from pipeline import flow_script
11+
from pub_sub import salesforce_message_publisher
12+
from rfm_funcs.create_scores import create_scores
13+
1014

1115
logger = structlog.get_logger()
1216

@@ -48,6 +52,28 @@ def ingest_raw_data():
4852
@internal_api.route("/api/internal/get_updated_data", methods=["GET"])
4953
def get_contact_data():
5054
logger.debug("Calling get_updated_contact_data()")
51-
contact_json = updated_data.get_updated_contact_data()
52-
logger.debug("Returning %d contact records", len(contact_json) )
55+
contact_json = updated_data.get_updated_contact_data()
56+
logger.debug("Returning %d contact records", len(contact_json))
5357
return jsonify(contact_json), 200
58+
59+
60+
@internal_api.route("/api/internal/send_salesforce_platform_message", methods=["GET"])
61+
def send_salesforce_platform_message():
62+
contact_list = updated_data.get_updated_contact_data()
63+
logger.debug("Returning %d contact records", len(contact_list))
64+
salesforce_message_publisher.send_pipeline_update_messages(contact_list)
65+
66+
return jsonify({'outcome': 'OK'}), 200
67+
68+
@internal_api.route("/api/internal/full_flow", methods=["GET"])
69+
def start_flow():
70+
logger.info("Downloading data from APIs")
71+
ingest_sources_from_api.start()
72+
logger.info("Starting pipeline matching")
73+
flow_script.start_flow()
74+
logger.info("Building updated data payload")
75+
updated_contacts_list = updated_data.get_updated_contact_data()
76+
logger.info("Sending Salesforce platform messages")
77+
salesforce_message_publisher.send_pipeline_update_messages(updated_contacts_list)
78+
79+
return jsonify({'outcome': 'OK'}), 200

src/server/pub_sub/__init__.py

Whitespace-only changes.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import time
2+
import jwt
3+
import os
4+
import requests
5+
import certifi
6+
import grpc
7+
import pub_sub.stubs.pubsub_api_pb2_grpc as pb2_grpc
8+
import pub_sub.stubs.pubsub_api_pb2 as pb2
9+
import avro.io
10+
import io
11+
import structlog
12+
from datetime import datetime
13+
14+
logger = structlog.get_logger()
15+
16+
ISSUER = os.getenv("SALESFORCE_CONSUMER_KEY")
17+
DOMAIN = os.getenv("SALESFORCE_DOMAIN")
18+
SUBJECT = os.getenv("SALESFORCE_USERNAME")
19+
INSTANCE_URL = os.getenv("INSTANCE_URL")
20+
TENANT_ID = os.getenv("TENANT_ID")
21+
PLATFORM_MESSAGE_AUTHOR = os.getenv("PLATFORM_MESSAGE_AUTHOR_RECORD_ID")
22+
23+
UPDATE_TOPIC = "/event/Updated_Contacts_From_Pipeline__e"
24+
25+
26+
def send_pipeline_update_messages(contacts_list):
27+
pem_file = 'bin/connected-app-secrets.pem'
28+
with open(pem_file) as fd:
29+
private_key = fd.read()
30+
logger.info('Loaded PEM certificate')
31+
32+
claim = {
33+
'iss': ISSUER,
34+
'exp': int(time.time()) + 300,
35+
'aud': 'https://{}.salesforce.com'.format(DOMAIN),
36+
'sub': SUBJECT,
37+
}
38+
assertion = jwt.encode(claim, private_key, algorithm='RS256', headers={'alg': 'RS256'})
39+
logger.info('Generated JWT')
40+
41+
r = requests.post('https://{}.salesforce.com/services/oauth2/token'.format(DOMAIN), data={
42+
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
43+
'assertion': assertion,
44+
})
45+
access_token = r.json()['access_token']
46+
logger.info('Made OAuth call to get access token')
47+
48+
with open(certifi.where(), 'rb') as f:
49+
creds = grpc.ssl_channel_credentials(f.read())
50+
with grpc.secure_channel('api.pubsub.salesforce.com:7443', creds) as channel:
51+
auth_meta_data = (('accesstoken', access_token),
52+
('instanceurl', INSTANCE_URL),
53+
('tenantid', TENANT_ID))
54+
55+
56+
stub = pb2_grpc.PubSubStub(channel)
57+
schema_id = stub.GetTopic(pb2.TopicRequest(topic_name=UPDATE_TOPIC), metadata=auth_meta_data).schema_id
58+
schema = stub.GetSchema(pb2.SchemaRequest(schema_id=schema_id), metadata=auth_meta_data).schema_json
59+
60+
for contact_dict in contacts_list:
61+
contact_dict['CreatedDate'] = int(datetime.now().timestamp())
62+
contact_dict['CreatedById'] = PLATFORM_MESSAGE_AUTHOR
63+
64+
buf = io.BytesIO()
65+
encoder = avro.io.BinaryEncoder(buf)
66+
writer = avro.io.DatumWriter(avro.schema.parse(schema))
67+
writer.write(contact_dict, encoder)
68+
payload = {
69+
"schema_id": schema_id,
70+
"payload": buf.getvalue()
71+
}
72+
stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=[payload]), metadata=auth_meta_data)
73+
logger.info('Pipeline update message sent')
74+
75+
logger.info("%s total pipeline update messages sent", len(contacts_list))

src/server/pub_sub/stubs/__init__.py

Whitespace-only changes.

src/server/pub_sub/stubs/pubsub_api_pb2.py

Lines changed: 56 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)