Skip to content

merge to dev from dev-test-pg #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ build_steps: &build_steps
./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar
source buildenvvar
./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1"
if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi
./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar
source buildenvvar
./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
jobs:
# Build & Deploy against development backend #
"build-dev":
Expand Down
5 changes: 3 additions & 2 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ module.exports = {
SLACKNOTIFY: process.env.SLACKNOTIFY || 'false'
},
RECONSILER:{
RECONSILER_START: process.env.RECONSILER_START || 10,
RECONSILER_END: process.env.RECONSILER_END || 5,
RECONSILER_START: process.env.RECONSILER_START || 5,
RECONSILER_END: process.env.RECONSILER_END || 1,
RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm'
},
DYNAMODB:
{
DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'dev_pg_ifx_payload_sync'
DD_ElapsedTime: process.env.DD_ElapsedTime || 600000
},

AUTH0_URL: process.env.AUTH0_URL ,
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"producer": "node ./src/producer.js",
"consumer": "node ./src/consumer.js",
"producer_dd": "node ./src/producer.js failover",
"start": "npm run producer & npm run producer_dd & npm run consumer"
"reconsiler1": "node ./src/reconsiler-audit.js",
"start": "npm run producer & npm run producer_dd & npm run consumer & npm run reconsiler1"
},
"author": "Topcoder",
"license": "ISC",
Expand Down
44 changes: 41 additions & 3 deletions pg-identity-func-trig-seq.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,30 @@ DECLARE
payload TEXT;
column_name TEXT;
column_value TEXT;
pguserval TEXT;
payload_items TEXT[];
uniquecolumn TEXT;
logtime TEXT;
payloadseqid INTEGER;
BEGIN


--pguserval := (SELECT 1 FROM pg_roles WHERE rolname = 'pgsyncuser');
pguserval := (SELECT current_user);
if pguserval = 'pgsyncuser' then
RAISE notice 'pgsyncuser name : %', pguserval;

CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
return rec;
end if;

CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
rec := NEW;
Expand Down Expand Up @@ -99,13 +118,14 @@ BEGIN
END LOOP;
--logtime := (select date_display_tz());
logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'));

payloadseqid := (select nextval('payloadsequence'::regclass));

uniquecolumn := (SELECT c.column_name
FROM information_schema.key_column_usage AS c
LEFT JOIN information_schema.table_constraints AS t
ON t.constraint_name = c.constraint_name
WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1);
WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' limit 1);

if (uniquecolumn = '') IS NOT FALSE then
uniquecolumn := 'Not-Available';
Expand All @@ -131,9 +151,10 @@ BEGIN
PERFORM pg_notify('test_db_notifications', payload);

RETURN rec;
END;
$body$ LANGUAGE plpgsql;

END;
$body$ LANGUAGE plpgsql

CREATE TRIGGER "pg_email_trigger"
AFTER INSERT OR DELETE OR UPDATE ON email
FOR EACH ROW
Expand Down Expand Up @@ -217,10 +238,27 @@ DECLARE
column_name TEXT;
column_value TEXT;
payload_items TEXT[];
pguserval TEXT;
uniquecolumn TEXT;
logtime TEXT;
payloadseqid INTEGER;
BEGIN

pguserval := (SELECT current_user);
if pguserval = 'pgsyncuser' then
RAISE notice 'pgsyncuser name : %', pguserval;

CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
return rec;
end if;

CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
rec := NEW;
Expand Down
25 changes: 11 additions & 14 deletions src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,23 @@ let cs_payloadseqid
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success
if (message.payload['retryCount']) retryvar = message.payload.retryCount;
auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn,
message.payload.operation,"Informix-updated",retryvar,"","",message.payload.data, message.timestamp,message.topic],'consumer')
message.payload.operation,"Informix-updated",retryvar,"","",JSON.stringify(message), message.timestamp,message.topic],'consumer')
} catch (err) {
const errmsg2 = `Could not process kafka message or informix DB error: "${err.message}"`
const errmsg2 = `error-sync: Could not process kafka message or informix DB error: "${err.message}"`
logger.error(errmsg2)
//await callposttoslack(errmsg2)
//logger.logFullError(err)
logger.debug(`error-sync: consumer "${err.message}"`)
if (!cs_payloadseqid){
cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2);
}

await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2);}
/* await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
'message.payload.operation',"Error-Consumer","",err.message,"",'message.payload.data',new Date(),'message.topic'],'consumer')
}else{
auditTrail([cs_payloadseqid,4444,message.payload.table,message.payload.Uniquecolumn,
message.payload.operation,"Informix-updated",retryvar,"consumer2","",JSON.stringify(message), message.timestamp,message.topic],'consumer')
}*/

try {
//var retryvar
if (message.payload['retryCount']) retryvar = message.payload.retryCount;
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish
// await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
// 'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer')
//await callposttoslack(`Retry for Kafka push : retrycount : "${retryvar}"`)
logger.debug(`Trying to push same message after adding retryCounter`)
if (!message.payload.retryCount) {
message.payload.retryCount = 0
Expand All @@ -95,14 +92,14 @@ let cs_payloadseqid
}
message.payload['retryCount'] = message.payload.retryCount + 1;
await pushToKafka(message)
var errmsg9 = `Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"`
var errmsg9 = `error-sync: Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"`
logger.debug(errmsg9)
//await callposttoslack(errmsg9)
} catch (err) {

await auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn,
message.payload.operation,"Error-republishing",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer')
const errmsg1 = `postgres-ifx-processor: consumer : Error-republishing: "${err.message}"`
const errmsg1 = `error-sync: postgres-ifx-processor: consumer : Error-republishing: "${err.message}"`
logger.error(errmsg1)
logger.debug(`error-sync: consumer re-publishing "${err.message}"`)
// push to slack - alertIt("slack message"
Expand Down
Loading