From ef21e778972180dd505168f97f41c90df9a43fda Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Thu, 19 Sep 2019 15:26:44 +0530 Subject: [PATCH 1/7] [skip ci] [skip ci] --- .circleci/config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 2d2db79..da48833 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -81,6 +81,7 @@ workflows: branches: only: - dev + - dev-retryfeature - "build-prod": context : org-global filters: From 9106279974c38f5c4c12fb6ca6005da39fc08e40 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 19 Sep 2019 10:00:29 +0000 Subject: [PATCH 2/7] kafka retry feature --- .gitignore | 2 + config/default.js | 13 ++++--- src/consumer.js | 69 ++++++++++------------------------ src/producer.js | 29 +++----------- src/services/pushToKafka.js | 34 +++++++++++++++++ src/services/updateInformix.js | 49 ++++++++++++++++++++++++ 6 files changed, 118 insertions(+), 78 deletions(-) create mode 100644 src/services/pushToKafka.js create mode 100644 src/services/updateInformix.js diff --git a/.gitignore b/.gitignore index d8557e3..e037c00 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ node_modules *.log env_producer.sh env_consumer.sh +*.env +*.sh diff --git a/config/default.js b/config/default.js index 4ab5319..54a0793 100644 --- a/config/default.js +++ b/config/default.js @@ -7,9 +7,9 @@ module.exports = { host: process.env.INFORMIX_HOST || 'localhost', port: parseInt(process.env.INFORMIX_PORT, 10) || 2021, user: process.env.INFORMIX_USER || 'informix', - password: process.env.INFORMIX_PASSWORD || '1nf0rm1x', - database: process.env.INFORMIX_DATABASE || 'tcs_catalog', - server: process.env.INFORMIX_SERVER || 'informixoltp_tcp', + password: process.env.INFORMIX_PASSWORD || 'password', + database: process.env.INFORMIX_DATABASE || 'db', + server: process.env.INFORMIX_SERVER || 'informixserver', minpool: parseInt(process.env.MINPOOL, 10) || 1, maxpool: parseInt(process.env.MAXPOOL, 10) || 60, maxsize: parseInt(process.env.MAXSIZE, 10) || 0, @@ -32,8 +32,11 @@ module.exports = { cert: process.env.KAFKA_CLIENT_CERT || null, // SSL client certificate file path key: process.env.KAFKA_CLIENT_CERT_KEY || null // SSL client key file path }, - topic: process.env.KAFKA_TOPIC || 'db.postgres.sync', // Kafka topic to push and receive messages - partition: process.env.partition || [0] // Kafka partitions to use + topic: process.env.KAFKA_TOPIC || 'db.topic.sync', // Kafka topic to push and receive messages + partition: process.env.partition || [0], // Kafka partitions to use + maxRetry: process.env.MAX_RETRY || 3, + errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error', + recipients: ['admin@abc.com'] // Kafka partitions to use }, AUTH0_URL: process.env.AUTH0_URL , diff --git a/src/consumer.js b/src/consumer.js index 0d32fc5..5a7b065 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -4,10 +4,11 @@ const config = require('config') const Kafka = require('no-kafka') const logger = require('./common/logger') -const informix = require('./common/informixWrapper.js') +const updateInformix = require('./services/updateInformix') +const pushToKafka = require('./services/pushToKafka') const healthcheck = require('topcoder-healthcheck-dropin'); const kafkaOptions = config.get('KAFKA') -const sleep = require('sleep'); +//const sleep = require('sleep'); const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key const consumer = new Kafka.SimpleConsumer({ connectionString: kafkaOptions.brokers_url, @@ -34,51 +35,6 @@ const consumer = new Kafka.SimpleConsumer({ const terminate = () => process.exit() - -/** - * Updates informix database with insert/update/delete operation - * @param {Object} payload The DML trigger data - */ -async function updateInformix (payload) { - logger.debug('Starting to update informix with data:') - logger.debug(payload) - if (payload.payload.table === 'scorecard_question'){ - logger.debug('inside scorecard_question') - sleep.sleep(2); - } - //const operation = payload.operation.toLowerCase() - const operation = payload.payload.operation.toLowerCase() - console.log("level producer1 ",operation) - let sql = null - - const columns = payload.payload.data - const primaryKey = payload.payload.Uniquecolumn - // Build SQL query - switch (operation) { - case 'insert': - { - const columnNames = Object.keys(columns) - sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `'${columns[k]}'`).join(', ')});` // "insert into : (col_1, col_2, ...) values (val_1, val_2, ...)" - } - break - case 'update': - { - sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update :
set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val" - } - break - case 'delete': - { - sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from :
where primary_key_col=primary_key_val" - } - break - default: - throw new Error(`Operation ${operation} is not supported`) - } - - const result = await informix.executeQuery(payload.payload.schema, sql, null) - return result -} - /** * * @param {Array} messageSet List of messages from kafka @@ -90,15 +46,29 @@ async function dataHandler (messageSet, topic, partition) { try { const payload = JSON.parse(m.message.value) logger.debug('Received payload from kafka:') - // logger.debug(payload) + logger.debug(payload) await updateInformix(payload) + // await insertConsumerAudit(payload, true, undefined, false) await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success } catch (err) { logger.error('Could not process kafka message') logger.logFullError(err) + if (!payload.retryCount) { + payload.retryCount = 0 + } + if (payload.retryCount >= config.KAFKA.maxRetry) { + await pushToKafka( + Object.assign({}, payload, { topic: config.KAFKA.errorTopic, recipients: config.KAFKA.recipients }) + ) + return + } + await pushToKafka( + Object.assign({}, payload, { retryCount: payload.retryCount + 1 }) + ) + } } } -} + /** * Initialize kafka consumer @@ -117,3 +87,4 @@ async function setupKafkaConsumer () { } setupKafkaConsumer() + diff --git a/src/producer.js b/src/producer.js index f028cc3..2354fd5 100644 --- a/src/producer.js +++ b/src/producer.js @@ -4,34 +4,15 @@ const config = require('config') const pg = require('pg') const logger = require('./common/logger') - -const busApi = require('topcoder-bus-api-wrapper') -const _ = require('lodash') - +const pushToKafka = require('./services/pushToKafka') const pgOptions = config.get('POSTGRES') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` const pgClient = new pg.Client(pgConnectionString) const express = require('express') const app = express() -const port = 3000 - -const busApiClient = busApi(_.pick(config, - ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', - 'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', - 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL'])) -busApiClient - .getHealth() - .then(result => console.log(result.body, result.status)) - .catch(err => console.log(err)) +const port = 4000 -async function postTopic(payload) { - try { - await busApiClient.postEvent(payload) - } catch (e) { - console.log(e) - } -} async function setupPgClient () { try { @@ -51,7 +32,7 @@ async function setupPgClient () { const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { // await pushToKafka(payload) - await postTopic(payload) + await pushToKafka(payload) } else { logger.debug('Ignoring message with incorrect topic or originator') } @@ -75,7 +56,7 @@ async function run () { run() app.get('/health', (req, res) => { - //console.log('pgClient', pgClient) res.send('health ok') }) -app.listen(port, () => console.log(`Example app listening on port ${port}!`)) +app.listen(port, () => console.log(`app listening on port ${port}!`)) + diff --git a/src/services/pushToKafka.js b/src/services/pushToKafka.js new file mode 100644 index 0000000..1ed5634 --- /dev/null +++ b/src/services/pushToKafka.js @@ -0,0 +1,34 @@ +/* + * Kafka producer that sends messages to Kafka server. + */ +const config = require('config') +const Kafka = require('no-kafka') +const logger = require('../common/logger') +const busApi = require('topcoder-bus-api-wrapper') +const _ = require('lodash') + +const kafkaOptions = config.get('KAFKA') +const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key +let producer + + +const busApiClient = busApi(_.pick(config, + ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', + 'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', + 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL'])) +busApiClient + .getHealth() + .then(result => console.log(result.body, result.status)) + .catch(err => console.log(err)) + + +async function pushToKafka(payload) { + try { + await busApiClient.postEvent(payload) + } catch (e) { + console.log(e) + } +} + +module.exports = pushToKafka + diff --git a/src/services/updateInformix.js b/src/services/updateInformix.js new file mode 100644 index 0000000..749a282 --- /dev/null +++ b/src/services/updateInformix.js @@ -0,0 +1,49 @@ + +const informix = require('../common/informixWrapper') +const logger = require('../common/logger') + +/** + * Updates informix database with insert/update/delete operation + * @param {Object} payload The DML trigger data + */ +async function updateInformix (payload) { + logger.debug('Starting to update informix with data:') + logger.debug(payload) + if (payload.payload.table === 'scorecard_question'){ + logger.debug('inside scorecard_question') + sleep.sleep(2); + } + //const operation = payload.operation.toLowerCase() + const operation = payload.payload.operation.toLowerCase() + console.log("level producer1 ",operation) + let sql = null + + const columns = payload.payload.data + const primaryKey = payload.payload.Uniquecolumn + // Build SQL query + switch (operation) { + case 'insert': + { + const columnNames = Object.keys(columns) + sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `'${columns[k]}'`).join(', ')});` // "insert into :
(col_1, col_2, ...) values (val_1, val_2, ...)" + } + break + case 'update': + { + sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update :
set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val" + } + break + case 'delete': + { + sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from :
where primary_key_col=primary_key_val" + } + break + default: + throw new Error(`Operation ${operation} is not supported`) + } + + const result = await informix.executeQuery(payload.payload.schema, sql, null) + return result +} + +module.exports = updateInformix From 43fd410d92efb96b3f4ffa72eb00b40c9a014262 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Thu, 19 Sep 2019 15:56:11 +0530 Subject: [PATCH 3/7] Update producer.js --- src/producer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/producer.js b/src/producer.js index 2354fd5..57e2ced 100644 --- a/src/producer.js +++ b/src/producer.js @@ -11,7 +11,7 @@ const pgClient = new pg.Client(pgConnectionString) const express = require('express') const app = express() -const port = 4000 +const port = 3000 async function setupPgClient () { From c5b829b7c20842ef19873bbdee77f4d4ae4b4968 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 22 Sep 2019 13:20:34 +0000 Subject: [PATCH 4/7] kafka audit update --- .gitignore | 1 + src/consumer.js | 80 ++++++++++++++++++++-------------- src/producer.js | 20 ++++----- src/services/auditTrail.js | 45 +++++++++++++++++++ src/services/updateInformix.js | 9 +--- 5 files changed, 104 insertions(+), 51 deletions(-) create mode 100644 src/services/auditTrail.js diff --git a/.gitignore b/.gitignore index e037c00..c13cec1 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ env_producer.sh env_consumer.sh *.env *.sh +*.list diff --git a/src/consumer.js b/src/consumer.js index 5a7b065..3718811 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -7,8 +7,8 @@ const logger = require('./common/logger') const updateInformix = require('./services/updateInformix') const pushToKafka = require('./services/pushToKafka') const healthcheck = require('topcoder-healthcheck-dropin'); +const auditTrail = require('./services/auditTrail'); const kafkaOptions = config.get('KAFKA') -//const sleep = require('sleep'); const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key const consumer = new Kafka.SimpleConsumer({ connectionString: kafkaOptions.brokers_url, @@ -21,17 +21,17 @@ const consumer = new Kafka.SimpleConsumer({ }) - const check = function () { - if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) { - return false; - } - let connected = true; - consumer.client.initialBrokers.forEach(conn => { - logger.debug(`url ${conn.server()} - connected=${conn.connected}`); - connected = conn.connected & connected; - }); - return connected; - }; +const check = function () { + if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) { + return false; + } + let connected = true; + consumer.client.initialBrokers.forEach(conn => { + logger.debug(`url ${conn.server()} - connected=${conn.connected}`); + connected = conn.connected & connected; + }); + return connected; +}; const terminate = () => process.exit() @@ -41,39 +41,54 @@ const terminate = () => process.exit() * @param {String} topic The name of the message topic * @param {Number} partition The kafka partition to which messages are written */ -async function dataHandler (messageSet, topic, partition) { +async function dataHandler(messageSet, topic, partition) { for (const m of messageSet) { // Process messages sequentially + let message try { - const payload = JSON.parse(m.message.value) - logger.debug('Received payload from kafka:') - logger.debug(payload) - await updateInformix(payload) - // await insertConsumerAudit(payload, true, undefined, false) + message = JSON.parse(m.message.value) + logger.debug('Received message from kafka:') + logger.debug(JSON.stringify(message)) + await updateInformix(message) await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success + await auditTrail([message.payload.payloadseqid,'scorecard_consumer',message.payload.table,message.payload.Uniquecolumn, + message.payload.operation,1,0,"",message.timestamp,new Date(),""],'consumer') } catch (err) { logger.error('Could not process kafka message') - logger.logFullError(err) - if (!payload.retryCount) { - payload.retryCount = 0 - } - if (payload.retryCount >= config.KAFKA.maxRetry) { - await pushToKafka( - Object.assign({}, payload, { topic: config.KAFKA.errorTopic, recipients: config.KAFKA.recipients }) - ) - return - } - await pushToKafka( - Object.assign({}, payload, { retryCount: payload.retryCount + 1 }) - ) + //logger.logFullError(err) + try { + await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish + logger.debug('Trying to push same message after adding retryCounter') + if (!message.payload.retryCount) { + message.payload.retryCount = 0 + logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry); + } + if (message.payload.retryCount >= config.KAFKA.maxRetry) { + logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic); + + let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic }) + notifiyMessage.payload['recipients'] = config.KAFKA.recipients + logger.debug('pushing following message on kafka error alert queue:') + logger.debug(notifiyMessage) + await pushToKafka(notifiyMessage) + return + } + message.payload['retryCount'] = message.payload.retryCount + 1; + await pushToKafka(message) + logger.debug('pushed same message after adding retryCount') + } catch (err) { + //await auditTrail([payload.payload.payloadseqid,'scorecard_consumer',payload.payload.table,payload.payload.Uniquecolumn, + // payload.payload.operation,0,message.payload.retryCount,"re-publish kafka err",payload.timestamp,new Date(),""],'consumer') + logger.error("Error occured in re-publishing kafka message", err) } } } +} /** * Initialize kafka consumer */ -async function setupKafkaConsumer () { +async function setupKafkaConsumer() { try { await consumer.init() await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler) @@ -87,4 +102,3 @@ async function setupKafkaConsumer () { } setupKafkaConsumer() - diff --git a/src/producer.js b/src/producer.js index 57e2ced..ce59982 100644 --- a/src/producer.js +++ b/src/producer.js @@ -8,7 +8,7 @@ const pushToKafka = require('./services/pushToKafka') const pgOptions = config.get('POSTGRES') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` const pgClient = new pg.Client(pgConnectionString) - +const auditTrail = require('./services/auditTrail'); const express = require('express') const app = express() const port = 3000 @@ -17,28 +17,26 @@ const port = 3000 async function setupPgClient () { try { await pgClient.connect() - // Listen to each of the trigger functions for (const triggerFunction of pgOptions.triggerFunctions) { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { - console.log('Received trigger payload:') - logger.debug(`Received trigger payload:`) - logger.debug(message) - //console.log(message) try { const payload = JSON.parse(message.payload) - console.log("level 0",payload); const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { - // await pushToKafka(payload) + console.log(`${payload.topic} ${payload.payload.table} ${payload.payload.operation} ${payload.timestamp}`); await pushToKafka(payload) - } else { + } else { logger.debug('Ignoring message with incorrect topic or originator') } - } catch (err) { + await auditTrail([payload.payload.payloadseqid,'scorecard_producer',1,payload.topic,payload.payload.table,payload.payload.Uniquecolumn, + payload.payload.operation,"",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer') + } catch (error) { logger.error('Could not parse message payload') - logger.logFullError(err) + await auditTrail([payload.payload.payloadseqid,'scorecard_producer',0,payload.topic,payload.payload.table,payload.payload.Uniquecolumn, + payload.payload.operation,"error",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer') + logger.logFullError(error) } }) logger.info('Listening to notifications') diff --git a/src/services/auditTrail.js b/src/services/auditTrail.js new file mode 100644 index 0000000..fcd26a5 --- /dev/null +++ b/src/services/auditTrail.js @@ -0,0 +1,45 @@ +const config = require('config') +const pg = require('pg') +const logger = require('../common/logger') + +const pgOptions = config.get('POSTGRES') +const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` +let pgClient2 + +async function setupPgClient2 () { + pgClient2 = new pg.Client(pgConnectionString) + try { + await pgClient2.connect() + logger.debug('Connected to Pg Client2 Audit:') + } + catch (err) { + logger.error('Could not setup postgres client2') + logger.logFullError(err) + process.exit() + } +} + +async function auditTrail (data,sourcetype) { +if (!pgClient2) { + await setupPgClient2() +} +if (sourcetype === 'producer'){ + sql = 'INSERT INTO tcs_catalog.producer_scorecard_audit(payloadseqid,origin_source,kafka_post_status,topic_name,table_name,Uniquecolumn,operationtype,errormessage,payloadtime,auditdatetime,payload) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)' +logger.debug(`--Audit Trail update producer--`) +} else { +sql = 'INSERT INTO tcs_catalog.consumer_scorecard_audit(payloadseqid,origin_source,table_name,Uniquecolumn,operationtype,dest_db_status, dest_retry_count,errormessage,payloadtime,auditdatetime,dest_operationquery) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)' +logger.debug(`--Audit Trail update consumer--`) +} + return pgClient2.query(sql, data, (err, res) => { + if (err) { + logger.debug(`--Audit Trail update error-- ${err.stack}`) + //pgClient2.end() + } else { + logger.debug(`--Audit Trail update success-- `) + } +}) +} + + +module.exports = auditTrail + diff --git a/src/services/updateInformix.js b/src/services/updateInformix.js index 749a282..70c868d 100644 --- a/src/services/updateInformix.js +++ b/src/services/updateInformix.js @@ -7,15 +7,10 @@ const logger = require('../common/logger') * @param {Object} payload The DML trigger data */ async function updateInformix (payload) { - logger.debug('Starting to update informix with data:') - logger.debug(payload) - if (payload.payload.table === 'scorecard_question'){ - logger.debug('inside scorecard_question') - sleep.sleep(2); - } + logger.debug('=====Starting to update informix with data:====') //const operation = payload.operation.toLowerCase() const operation = payload.payload.operation.toLowerCase() - console.log("level producer1 ",operation) + console.log("level 1 informix ",operation) let sql = null const columns = payload.payload.data From 15843989a4a14e74ed0c1f25ac0e607c06ec5cd2 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Mon, 23 Sep 2019 12:06:56 +0530 Subject: [PATCH 5/7] Update scorecard_trigger_function.sql [skip ci] --- scorecard_trigger_function.sql | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/scorecard_trigger_function.sql b/scorecard_trigger_function.sql index 523a872..bafbb41 100644 --- a/scorecard_trigger_function.sql +++ b/scorecard_trigger_function.sql @@ -51,6 +51,7 @@ BEGIN END LOOP; --logtime := (select date_display_tz()); logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('payloadsequence'::regclass)); uniquecolumn := (SELECT c.column_name FROM information_schema.key_column_usage AS c @@ -63,11 +64,10 @@ BEGIN || '{' || '"topic":"' || 'db.postgres.sync' || '",' || '"originator":"' || 'tc-postgres-delta-processor' || '",' - -- || '"timestamp":"' || '2019-08-19T08:39:48.959Z' || '",' - || '"timestamp":"' || logtime || '",' + || '"timestamp":"' || logtime || '",' || '"mime-type":"' || 'application/json' || '",' || '"payload": {' - + || '"payloadseqid":"' || payloadseqid || '",' || '"Uniquecolumn":"' || uniquecolumn || '",' || '"operation":"' || TG_OP || '",' || '"schema":"' || TG_TABLE_SCHEMA || '",' @@ -125,3 +125,22 @@ CREATE TRIGGER "scorecard_type_lu_trigger" AFTER INSERT OR DELETE OR UPDATE ON scorecard_type_lu FOR EACH ROW EXECUTE PROCEDURE notify_trigger('scorecard_type_id', 'name', 'description', 'create_user', 'create_date', 'modify_user', 'modify_date','version'); + + CREATE TABLE producer_scorecard_audit +(seq_id SERIAL NOT NULL, payloadseqid INTEGER NOT NULL, +origin_source CHARACTER VARYING(64) NOT NULL, kafka_post_status BOOLEAN, +topic_name CHARACTER VARYING(64), table_name CHARACTER VARYING(64) NOT NULL, +uniquecolumn CHARACTER VARYING(64), operationtype CHARACTER VARYING NOT NULL, +errormessage CHARACTER VARYING, payloadtime TIMESTAMP(6) WITHOUT TIME ZONE, + auditdatetime DATE NOT NULL, payload CHARACTER VARYING NOT NULL); + + CREATE TABLE consumer_scorecard_audit (seq_id SERIAL NOT NULL, payloadseqid INTEGER NOT NULL, +origin_source CHARACTER VARYING(64) NOT NULL, table_name CHARACTER VARYING(64) NOT NULL, +uniquecolumn CHARACTER VARYING(64), operationtype CHARACTER VARYING NOT NULL, +dest_db_status BOOLEAN, dest_retry_count INTEGER, errormessage CHARACTER VARYING, +payloadtime TIMESTAMP(6) WITHOUT TIME ZONE, auditdatetime DATE NOT NULL, +dest_operationquery CHARACTER VARYING); + +CREATE SEQUENCE payloadsequence INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 +START WITH 1 NO CYCLE; + From 4ec6c60bec193798ff2d2f73ddf78f9b03badde0 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Mon, 23 Sep 2019 14:31:51 +0530 Subject: [PATCH 6/7] Update consumer.js --- src/consumer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/consumer.js b/src/consumer.js index 3718811..bdcfb6d 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -51,7 +51,7 @@ async function dataHandler(messageSet, topic, partition) { await updateInformix(message) await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success await auditTrail([message.payload.payloadseqid,'scorecard_consumer',message.payload.table,message.payload.Uniquecolumn, - message.payload.operation,1,0,"",message.timestamp,new Date(),""],'consumer') + message.payload.operation,1,0,"",message.timestamp,new Date(),message.payload],'consumer') } catch (err) { logger.error('Could not process kafka message') //logger.logFullError(err) From eba433f62427b420d6de37806c266ba9afff475e Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Mon, 23 Sep 2019 17:31:15 +0530 Subject: [PATCH 7/7] Update scorecard_trigger_function.sql [skip ci] --- scorecard_trigger_function.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/scorecard_trigger_function.sql b/scorecard_trigger_function.sql index bafbb41..62f8f46 100644 --- a/scorecard_trigger_function.sql +++ b/scorecard_trigger_function.sql @@ -11,6 +11,7 @@ DECLARE payload_items TEXT[]; uniquecolumn TEXT; logtime TEXT; + payloadseqid INTEGER; BEGIN -- Set record row depending on operation CASE TG_OP