Skip to content

Commit 2e29d66

Browse files
committed
first commit
0 parents  commit 2e29d66

File tree

3 files changed

+311
-0
lines changed

3 files changed

+311
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
**/__pycache__/
2+
**/*.pyc
3+
*.log

datamultiproc.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#!/usr/bin/env python3
2+
##
3+
# Module to enable a data processing function to be run multiple times in parallel each against a
4+
# subset of a total data set.
5+
##
6+
import sys
7+
import os
8+
import contextlib
9+
from multiprocessing import Process, Queue
10+
11+
12+
##
13+
# Spawns multiple process, each running the same logic in parallel against a data set and including
14+
# a shared inter-process file logger.
15+
#
16+
# The 'func_to_parallelise' argument should have the following signature:
17+
# myfunc(processor_count, dataset_id, number_to_insert, log_queue, *args)
18+
##
19+
def run_data_processors(processor_count, records_count, logfile_name, func_to_parallelise, *args):
20+
records_per_process = int(records_count / processor_count)
21+
22+
with contextlib.suppress(FileNotFoundError):
23+
os.remove(logfile_name)
24+
25+
# Create separate safe shared process queue for logging by other processes
26+
log_queue = Queue()
27+
log_process = Process(target=logger_process_run, args=(log_queue, STOP_TOKEN, logfile_name))
28+
log_process.start()
29+
processesList = []
30+
31+
# Create a set of OS processes to perform the main work to leverage CPU cores
32+
for i in range(processor_count):
33+
process = Process(target=wrapper_process_with_keyboard_exception,
34+
args=(func_to_parallelise, processor_count, i, records_per_process,
35+
log_queue, *args))
36+
processesList.append(process)
37+
38+
try:
39+
# Start all processes
40+
for process in processesList:
41+
process.start()
42+
43+
# Wait for all processes to finish
44+
for process in processesList:
45+
process.join()
46+
47+
# Send queue message to end logging process
48+
log_queue.put(STOP_TOKEN)
49+
log_process.join()
50+
except KeyboardInterrupt:
51+
print(f'\nInterrupted - view file "{logfile_name}"\n')
52+
shutdown()
53+
54+
55+
##
56+
# Dequeue messages and write to single log file
57+
# (a message written from one process doesn't interleave with messages written from another
58+
# process)
59+
##
60+
def logger_process_run(log_queue, stop_token, logfile):
61+
try:
62+
with open(logfile, 'w') as f:
63+
while True:
64+
line = log_queue.get()
65+
66+
if line == stop_token:
67+
f.close()
68+
break
69+
70+
f.write(line)
71+
f.flush()
72+
except KeyboardInterrupt:
73+
shutdown()
74+
75+
76+
##
77+
# For a newly spawned process wraps a business function with the catch of a keyboard interrupt to
78+
# then immediately ends the process when the exception occurs without spitting out verbiage
79+
##
80+
def wrapper_process_with_keyboard_exception(*args):
81+
try:
82+
args[0](*(args[1:]))
83+
except KeyboardInterrupt:
84+
os._exit(0)
85+
86+
87+
##
88+
# Swallow the verbiage that is spat out when using 'Ctrl-C' to kill the script
89+
# and instead just print a simple line message
90+
##
91+
def shutdown():
92+
try:
93+
sys.exit(0)
94+
except SystemExit as e:
95+
os._exit(0)
96+
97+
98+
# Constants
99+
STOP_TOKEN = 'EOF!!'

payments-records-loader.py

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
#!/usr/bin/env python3
2+
##
3+
# Processes simulated Payments records using MongoDB. Provides ability to ingest radomly generated
4+
# records into a collection with one of various Write Concerns and the ability to read the ingested
5+
# records back out using one of various Read Concerns.
6+
#
7+
# Note: If you specify the number of client injection processes as exceeding the 6 less than the
8+
# total number of hardware theads (vCPUs) of the host machine/VM the result data logged to file may
9+
# not fully appeary due to a suspected Python multi-processing issue.
10+
#
11+
# For usage run first ensure the '.py' script is executable and then run:
12+
# $ ./payments-records-loader.py -h
13+
#
14+
# Example (connect to Atlas cluster and use 8 client OS processes for injection):
15+
# $ ./payments-records-loader.py -u "mongodb+srv://myuser:[email protected]" -p 8
16+
##
17+
import argparse
18+
from random import randint, choice
19+
import traceback
20+
from datetime import datetime
21+
from pymongo import MongoClient
22+
import datamultiproc as dmp
23+
24+
25+
# Seeds for random generated field values
26+
PAYMENT_REASONS = ['pay bill', 'replayment', 'owed money', 'temp loan', 'buy goods']
27+
FIRST_NAMES = ['Suzi', 'Bobby', 'Gertrude', 'Gordon', 'Mandy', 'Sandy', 'Randy', 'Candy', 'Bambi']
28+
LAST_NAMES = ['Brown', 'Jones', 'Roberts', 'McDonald', 'Barrett', 'Saunders', 'Reid',
29+
'Whittington-Smythe', 'Parker-Tweed']
30+
31+
32+
##
33+
# Main function spawning multiple processes to each generate and ingest a set of randomly generated
34+
# Payment records.
35+
##
36+
def main():
37+
# Parse all arguments passed to this script
38+
argparser = argparse.ArgumentParser(description='Ingest or query Payment records in a MongoDB '
39+
f'database)')
40+
argparser.add_argument('-u', '--url', default=DEFAULT_MONGODB_URL,
41+
help=f'MongoDB Cluster URL (default: {DEFAULT_MONGODB_URL})')
42+
argparser.add_argument('-p', '--procs', type=int, default=DEFAULT_TOTAL_PROCESSES,
43+
help=f'Number of processes to run (default: {DEFAULT_TOTAL_PROCESSES})')
44+
argparser.add_argument('-l', '--logfile', default=DEFAULT_LOG_FILENAME,
45+
help=f'Name of file to log to (default: {DEFAULT_LOG_FILENAME})')
46+
argparser.add_argument('-m', '--mode', choices=['local', 'majority', 'linearizable'],
47+
default=DEFAULT_MODE, help=f'Defines which Write & Read Concern mode to '
48+
f'use for DB interaction: local (WC:1, RC:local), majority (WC:majority,'
49+
f'RC:majority), linearizable (WC:majority, RC:linearizable) (default: '
50+
f'{DEFAULT_MODE})')
51+
argparser.add_argument('-t', '--totalrec', type=int, default=DEF_INSERTS_TOTAL,
52+
help=f'Number of records to insert (default: {DEF_INSERTS_TOTAL})')
53+
argparser.add_argument('-q', '--doqueries', default=DEFAULT_DO_QUERIES,
54+
action='store_true', help=f'Perform querying of existing Payment data'
55+
f' set rather than inserting new Payment data set (default:'
56+
f' {DEFAULT_DO_QUERIES})')
57+
args = argparser.parse_args()
58+
59+
# Run main data processing work split into multiple processes, tracking start & end times
60+
print()
61+
print(f"Using URL '{args.url}' (mode: {args.mode}), check file '{args.logfile}' for progress "
62+
f"information")
63+
start = datetime.now()
64+
65+
if args.doqueries:
66+
dmp.run_data_processors(args.procs, args.totalrec, args.logfile, query_payment_records,
67+
args.url, args.mode)
68+
else:
69+
dmp.run_data_processors(args.procs, args.totalrec, args.logfile, insert_payment_records,
70+
args.url, args.mode)
71+
72+
end = datetime.now()
73+
print(f'{args.totalrec} records processed by {args.procs} processes in '
74+
f'{int((end-start).total_seconds())} seconds')
75+
print()
76+
77+
78+
##
79+
# The function for a single OS process to run, looping and querying random records from the total
80+
# set of Payment records.
81+
##
82+
def query_payment_records(processor_count, process_id, records_per_process, log_queue, uri, mode):
83+
(connection, database, collection) = getClientConnDBCollectionWithConcerns(uri, mode)
84+
85+
# Loop creating each random payment record and ingesting into a collection
86+
for count in range(records_per_process):
87+
try:
88+
doc_id = f'{randint(0, processor_count-1)}_{randint(0, records_per_process-1)}'
89+
90+
if count % (records_per_process/100) == 0:
91+
start = datetime.utcnow().timestamp()
92+
93+
# FIND
94+
payment_records = collection.find_one({'_id': doc_id})
95+
96+
if payment_records is None:
97+
print(f'Queried record not found for _id: {doc_id}')
98+
99+
logSampleStatusOnEachPercent(log_queue, records_per_process, process_id, count, start)
100+
"""
101+
# Log a response time & status for every 1% processed
102+
if count % (records_per_process/100) == 0:
103+
log_queue.put(f'{int(count / records_per_process * 100)}% - {count} '
104+
f'documents inserted for data set id {process_id} - '
105+
f'{datetime.now()} - sample response time for one request: '
106+
f'{1000 * (datetime.utcnow().timestamp() - start)} ms\n')
107+
"""
108+
except Exception as e:
109+
print(f'Terminating due to error whilst performing queries:\n')
110+
traceback.print_exc()
111+
break
112+
113+
114+
##
115+
# The function for a single OS process to run, looping and inserting a subset of Payment records.
116+
##
117+
def insert_payment_records(processor_count, process_id, records_per_process, log_queue, uri, mode):
118+
(connection, database, collection) = getClientConnDBCollectionWithConcerns(uri, mode)
119+
# Example index creation: collection.create_index([('fld1', ASCENDING), ('fld2', DESCENDING)])
120+
121+
# Loop creating each random payment record and ingesting into a collection
122+
for count in range(records_per_process):
123+
try:
124+
ingest_doc = {
125+
'_id': f'{process_id}_{count}',
126+
'timestamp': datetime.now(),
127+
'payment_ref': choice(PAYMENT_REASONS),
128+
'payer_sort_code': str(randint(111111, 999999)),
129+
'payer_acc_num': str(randint(111111111, 999999999)),
130+
'payer_name': f'{choice(FIRST_NAMES)} {choice(LAST_NAMES)}',
131+
'payee_sort_code': str(randint(111111, 999999)),
132+
'payee_acc_num': str(randint(111111111, 999999999)),
133+
'payee_name': f'{choice(FIRST_NAMES)} {choice(LAST_NAMES)}',
134+
'amount': str(randint(1, 99999)),
135+
}
136+
137+
if count % (records_per_process/100) == 0:
138+
start = datetime.utcnow().timestamp()
139+
140+
# INSERT
141+
collection.insert_one(ingest_doc)
142+
143+
logSampleStatusOnEachPercent(log_queue, records_per_process, process_id, count, start)
144+
"""
145+
# Log a response time & status for every 1% processed
146+
if count % (records_per_process/100) == 0:
147+
log_queue.put(f'{int(count / records_per_process * 100)}% - {count} '
148+
f'documents inserted for data set id {process_id} - '
149+
f'{datetime.now()} - sample response time for one request: '
150+
f'{1000 * (datetime.utcnow().timestamp() - start)} ms\n')
151+
"""
152+
except Exception as e:
153+
print(f'Terminating due to error whilst performing inserts:\n')
154+
traceback.print_exc()
155+
break
156+
157+
158+
##
159+
# Create a MongoDB client connection using specific Read & Write Concerns and with timeouts set
160+
##
161+
def getClientConnDBCollectionWithConcerns(uri, mode):
162+
if mode == 'local':
163+
wc = 1
164+
rc = 'local'
165+
elif mode == 'majority':
166+
wc = 'majority'
167+
rc = 'majority'
168+
else:
169+
wc = 'majority'
170+
rc = 'linearizable'
171+
172+
connection = MongoClient(host=uri, w=wc, readConcernLevel=rc, readPreference='primary',
173+
socketTimeoutMS=DEFAULT_TIMEOUT_MILLIS,
174+
wtimeout=DEFAULT_TIMEOUT_MILLIS,
175+
connectTimeoutMS=DEFAULT_TIMEOUT_MILLIS,
176+
serverSelectionTimeoutMS=DEFAULT_TIMEOUT_MILLIS)
177+
database = connection[DB_NAME]
178+
collection = database[COLLECTION_NAME]
179+
return (connection, database, collection)
180+
181+
182+
##
183+
# Log a sample status and response time for every 1% of records processed
184+
##
185+
def logSampleStatusOnEachPercent(log_queue, records_per_process, process_id, count, start_utc):
186+
if count % (records_per_process/100) == 0:
187+
log_queue.put(f'{int(count / records_per_process * 100)}% - {count} '
188+
f'documents inserted for data set id {process_id} - '
189+
f'{datetime.now()} - sample response time for one request: '
190+
f'{1000 * (datetime.utcnow().timestamp() - start_utc)} ms\n')
191+
192+
193+
# Constants
194+
DEFAULT_TIMEOUT_MILLIS = 2000
195+
DEFAULT_MONGODB_URL = 'mongodb://localhost:27017'
196+
DEFAULT_TOTAL_PROCESSES = 2
197+
DEFAULT_MODE = 'local'
198+
DEF_INSERTS_TOTAL = 1000000
199+
DEFAULT_DO_QUERIES = False
200+
DEFAULT_LOG_FILENAME = 'processing-output.log'
201+
DB_NAME = 'fs'
202+
COLLECTION_NAME = 'payments'
203+
204+
205+
##
206+
# Main
207+
##
208+
if __name__ == '__main__':
209+
main()

0 commit comments

Comments
 (0)