Skip to content

Commit fec5683

Browse files
Merge pull request #6 from topcoder-platform/dev-test-pg
merge to dev from dev-test-pg
2 parents f4356e1 + 6389f71 commit fec5683

File tree

7 files changed

+239
-78
lines changed

7 files changed

+239
-78
lines changed

.circleci/config.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ build_steps: &build_steps
5757
./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar
5858
source buildenvvar
5959
./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
60+
61+
echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1"
62+
if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi
63+
./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar
64+
source buildenvvar
65+
./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
66+
6067
jobs:
6168
# Build & Deploy against development backend #
6269
"build-dev":

config/default.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@ module.exports = {
4444
SLACKNOTIFY: process.env.SLACKNOTIFY || 'false'
4545
},
4646
RECONSILER:{
47-
RECONSILER_START: process.env.RECONSILER_START || 10,
48-
RECONSILER_END: process.env.RECONSILER_END || 5,
47+
RECONSILER_START: process.env.RECONSILER_START || 5,
48+
RECONSILER_END: process.env.RECONSILER_END || 1,
4949
RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm'
5050
},
5151
DYNAMODB:
5252
{
5353
DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'dev_pg_ifx_payload_sync'
54+
DD_ElapsedTime: process.env.DD_ElapsedTime || 600000
5455
},
5556

5657
AUTH0_URL: process.env.AUTH0_URL ,

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
"producer": "node ./src/producer.js",
1010
"consumer": "node ./src/consumer.js",
1111
"producer_dd": "node ./src/producer.js failover",
12-
"start": "npm run producer & npm run producer_dd & npm run consumer"
12+
"reconsiler1": "node ./src/reconsiler-audit.js",
13+
"start": "npm run producer & npm run producer_dd & npm run consumer & npm run reconsiler1"
1314
},
1415
"author": "Topcoder",
1516
"license": "ISC",

pg-identity-func-trig-seq.sql

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,30 @@ DECLARE
2727
payload TEXT;
2828
column_name TEXT;
2929
column_value TEXT;
30+
pguserval TEXT;
3031
payload_items TEXT[];
3132
uniquecolumn TEXT;
3233
logtime TEXT;
3334
payloadseqid INTEGER;
3435
BEGIN
36+
37+
38+
--pguserval := (SELECT 1 FROM pg_roles WHERE rolname = 'pgsyncuser');
39+
pguserval := (SELECT current_user);
40+
if pguserval = 'pgsyncuser' then
41+
RAISE notice 'pgsyncuser name : %', pguserval;
42+
43+
CASE TG_OP
44+
WHEN 'INSERT', 'UPDATE' THEN
45+
rec := NEW;
46+
WHEN 'DELETE' THEN
47+
rec := OLD;
48+
ELSE
49+
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
50+
END CASE;
51+
return rec;
52+
end if;
53+
3554
CASE TG_OP
3655
WHEN 'INSERT', 'UPDATE' THEN
3756
rec := NEW;
@@ -99,13 +118,14 @@ BEGIN
99118
END LOOP;
100119
--logtime := (select date_display_tz());
101120
logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'));
121+
102122
payloadseqid := (select nextval('payloadsequence'::regclass));
103123

104124
uniquecolumn := (SELECT c.column_name
105125
FROM information_schema.key_column_usage AS c
106126
LEFT JOIN information_schema.table_constraints AS t
107127
ON t.constraint_name = c.constraint_name
108-
WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1);
128+
WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' limit 1);
109129

110130
if (uniquecolumn = '') IS NOT FALSE then
111131
uniquecolumn := 'Not-Available';
@@ -131,9 +151,10 @@ BEGIN
131151
PERFORM pg_notify('test_db_notifications', payload);
132152

133153
RETURN rec;
134-
END;
135-
$body$ LANGUAGE plpgsql;
136154

155+
END;
156+
$body$ LANGUAGE plpgsql
157+
137158
CREATE TRIGGER "pg_email_trigger"
138159
AFTER INSERT OR DELETE OR UPDATE ON email
139160
FOR EACH ROW
@@ -217,10 +238,27 @@ DECLARE
217238
column_name TEXT;
218239
column_value TEXT;
219240
payload_items TEXT[];
241+
pguserval TEXT;
220242
uniquecolumn TEXT;
221243
logtime TEXT;
222244
payloadseqid INTEGER;
223245
BEGIN
246+
247+
pguserval := (SELECT current_user);
248+
if pguserval = 'pgsyncuser' then
249+
RAISE notice 'pgsyncuser name : %', pguserval;
250+
251+
CASE TG_OP
252+
WHEN 'INSERT', 'UPDATE' THEN
253+
rec := NEW;
254+
WHEN 'DELETE' THEN
255+
rec := OLD;
256+
ELSE
257+
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
258+
END CASE;
259+
return rec;
260+
end if;
261+
224262
CASE TG_OP
225263
WHEN 'INSERT', 'UPDATE' THEN
226264
rec := NEW;

src/consumer.js

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,26 +56,23 @@ let cs_payloadseqid
5656
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success
5757
if (message.payload['retryCount']) retryvar = message.payload.retryCount;
5858
auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn,
59-
message.payload.operation,"Informix-updated",retryvar,"","",message.payload.data, message.timestamp,message.topic],'consumer')
59+
message.payload.operation,"Informix-updated",retryvar,"","",JSON.stringify(message), message.timestamp,message.topic],'consumer')
6060
} catch (err) {
61-
const errmsg2 = `Could not process kafka message or informix DB error: "${err.message}"`
61+
const errmsg2 = `error-sync: Could not process kafka message or informix DB error: "${err.message}"`
6262
logger.error(errmsg2)
63-
//await callposttoslack(errmsg2)
64-
//logger.logFullError(err)
6563
logger.debug(`error-sync: consumer "${err.message}"`)
6664
if (!cs_payloadseqid){
67-
cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2);
68-
}
69-
70-
await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
65+
cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2);}
66+
/* await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
7167
'message.payload.operation',"Error-Consumer","",err.message,"",'message.payload.data',new Date(),'message.topic'],'consumer')
68+
}else{
69+
auditTrail([cs_payloadseqid,4444,message.payload.table,message.payload.Uniquecolumn,
70+
message.payload.operation,"Informix-updated",retryvar,"consumer2","",JSON.stringify(message), message.timestamp,message.topic],'consumer')
71+
}*/
72+
7273
try {
73-
//var retryvar
7474
if (message.payload['retryCount']) retryvar = message.payload.retryCount;
7575
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish
76-
// await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
77-
// 'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer')
78-
//await callposttoslack(`Retry for Kafka push : retrycount : "${retryvar}"`)
7976
logger.debug(`Trying to push same message after adding retryCounter`)
8077
if (!message.payload.retryCount) {
8178
message.payload.retryCount = 0
@@ -95,14 +92,14 @@ let cs_payloadseqid
9592
}
9693
message.payload['retryCount'] = message.payload.retryCount + 1;
9794
await pushToKafka(message)
98-
var errmsg9 = `Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"`
95+
var errmsg9 = `error-sync: Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"`
9996
logger.debug(errmsg9)
10097
//await callposttoslack(errmsg9)
10198
} catch (err) {
10299

103100
await auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn,
104101
message.payload.operation,"Error-republishing",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer')
105-
const errmsg1 = `postgres-ifx-processor: consumer : Error-republishing: "${err.message}"`
102+
const errmsg1 = `error-sync: postgres-ifx-processor: consumer : Error-republishing: "${err.message}"`
106103
logger.error(errmsg1)
107104
logger.debug(`error-sync: consumer re-publishing "${err.message}"`)
108105
// push to slack - alertIt("slack message"

0 commit comments

Comments
 (0)