diff --git a/.circleci/config.yml b/.circleci/config.yml index 2d2db79..da48833 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -81,6 +81,7 @@ workflows: branches: only: - dev + - dev-retryfeature - "build-prod": context : org-global filters: diff --git a/.gitignore b/.gitignore index d8557e3..c13cec1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ node_modules *.log env_producer.sh env_consumer.sh +*.env +*.sh +*.list diff --git a/config/default.js b/config/default.js index 4ab5319..54a0793 100644 --- a/config/default.js +++ b/config/default.js @@ -7,9 +7,9 @@ module.exports = { host: process.env.INFORMIX_HOST || 'localhost', port: parseInt(process.env.INFORMIX_PORT, 10) || 2021, user: process.env.INFORMIX_USER || 'informix', - password: process.env.INFORMIX_PASSWORD || '1nf0rm1x', - database: process.env.INFORMIX_DATABASE || 'tcs_catalog', - server: process.env.INFORMIX_SERVER || 'informixoltp_tcp', + password: process.env.INFORMIX_PASSWORD || 'password', + database: process.env.INFORMIX_DATABASE || 'db', + server: process.env.INFORMIX_SERVER || 'informixserver', minpool: parseInt(process.env.MINPOOL, 10) || 1, maxpool: parseInt(process.env.MAXPOOL, 10) || 60, maxsize: parseInt(process.env.MAXSIZE, 10) || 0, @@ -32,8 +32,11 @@ module.exports = { cert: process.env.KAFKA_CLIENT_CERT || null, // SSL client certificate file path key: process.env.KAFKA_CLIENT_CERT_KEY || null // SSL client key file path }, - topic: process.env.KAFKA_TOPIC || 'db.postgres.sync', // Kafka topic to push and receive messages - partition: process.env.partition || [0] // Kafka partitions to use + topic: process.env.KAFKA_TOPIC || 'db.topic.sync', // Kafka topic to push and receive messages + partition: process.env.partition || [0], // Kafka partitions to use + maxRetry: process.env.MAX_RETRY || 3, + errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error', + recipients: ['admin@abc.com'] // Kafka partitions to use }, AUTH0_URL: process.env.AUTH0_URL , diff --git a/scorecard_trigger_function.sql b/scorecard_trigger_function.sql index 523a872..62f8f46 100644 --- a/scorecard_trigger_function.sql +++ b/scorecard_trigger_function.sql @@ -11,6 +11,7 @@ DECLARE payload_items TEXT[]; uniquecolumn TEXT; logtime TEXT; + payloadseqid INTEGER; BEGIN -- Set record row depending on operation CASE TG_OP @@ -51,6 +52,7 @@ BEGIN END LOOP; --logtime := (select date_display_tz()); logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('payloadsequence'::regclass)); uniquecolumn := (SELECT c.column_name FROM information_schema.key_column_usage AS c @@ -63,11 +65,10 @@ BEGIN || '{' || '"topic":"' || 'db.postgres.sync' || '",' || '"originator":"' || 'tc-postgres-delta-processor' || '",' - -- || '"timestamp":"' || '2019-08-19T08:39:48.959Z' || '",' - || '"timestamp":"' || logtime || '",' + || '"timestamp":"' || logtime || '",' || '"mime-type":"' || 'application/json' || '",' || '"payload": {' - + || '"payloadseqid":"' || payloadseqid || '",' || '"Uniquecolumn":"' || uniquecolumn || '",' || '"operation":"' || TG_OP || '",' || '"schema":"' || TG_TABLE_SCHEMA || '",' @@ -125,3 +126,22 @@ CREATE TRIGGER "scorecard_type_lu_trigger" AFTER INSERT OR DELETE OR UPDATE ON scorecard_type_lu FOR EACH ROW EXECUTE PROCEDURE notify_trigger('scorecard_type_id', 'name', 'description', 'create_user', 'create_date', 'modify_user', 'modify_date','version'); + + CREATE TABLE producer_scorecard_audit +(seq_id SERIAL NOT NULL, payloadseqid INTEGER NOT NULL, +origin_source CHARACTER VARYING(64) NOT NULL, kafka_post_status BOOLEAN, +topic_name CHARACTER VARYING(64), table_name CHARACTER VARYING(64) NOT NULL, +uniquecolumn CHARACTER VARYING(64), operationtype CHARACTER VARYING NOT NULL, +errormessage CHARACTER VARYING, payloadtime TIMESTAMP(6) WITHOUT TIME ZONE, + auditdatetime DATE NOT NULL, payload CHARACTER VARYING NOT NULL); + + CREATE TABLE consumer_scorecard_audit (seq_id SERIAL NOT NULL, payloadseqid INTEGER NOT NULL, +origin_source CHARACTER VARYING(64) NOT NULL, table_name CHARACTER VARYING(64) NOT NULL, +uniquecolumn CHARACTER VARYING(64), operationtype CHARACTER VARYING NOT NULL, +dest_db_status BOOLEAN, dest_retry_count INTEGER, errormessage CHARACTER VARYING, +payloadtime TIMESTAMP(6) WITHOUT TIME ZONE, auditdatetime DATE NOT NULL, +dest_operationquery CHARACTER VARYING); + +CREATE SEQUENCE payloadsequence INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 +START WITH 1 NO CYCLE; + diff --git a/src/consumer.js b/src/consumer.js index 0d32fc5..bdcfb6d 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -4,10 +4,11 @@ const config = require('config') const Kafka = require('no-kafka') const logger = require('./common/logger') -const informix = require('./common/informixWrapper.js') +const updateInformix = require('./services/updateInformix') +const pushToKafka = require('./services/pushToKafka') const healthcheck = require('topcoder-healthcheck-dropin'); +const auditTrail = require('./services/auditTrail'); const kafkaOptions = config.get('KAFKA') -const sleep = require('sleep'); const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key const consumer = new Kafka.SimpleConsumer({ connectionString: kafkaOptions.brokers_url, @@ -20,90 +21,74 @@ const consumer = new Kafka.SimpleConsumer({ }) - const check = function () { - if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) { - return false; - } - let connected = true; - consumer.client.initialBrokers.forEach(conn => { - logger.debug(`url ${conn.server()} - connected=${conn.connected}`); - connected = conn.connected & connected; - }); - return connected; - }; - - -const terminate = () => process.exit() - -/** - * Updates informix database with insert/update/delete operation - * @param {Object} payload The DML trigger data - */ -async function updateInformix (payload) { - logger.debug('Starting to update informix with data:') - logger.debug(payload) - if (payload.payload.table === 'scorecard_question'){ - logger.debug('inside scorecard_question') - sleep.sleep(2); +const check = function () { + if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) { + return false; } - //const operation = payload.operation.toLowerCase() - const operation = payload.payload.operation.toLowerCase() - console.log("level producer1 ",operation) - let sql = null + let connected = true; + consumer.client.initialBrokers.forEach(conn => { + logger.debug(`url ${conn.server()} - connected=${conn.connected}`); + connected = conn.connected & connected; + }); + return connected; +}; - const columns = payload.payload.data - const primaryKey = payload.payload.Uniquecolumn - // Build SQL query - switch (operation) { - case 'insert': - { - const columnNames = Object.keys(columns) - sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `'${columns[k]}'`).join(', ')});` // "insert into : (col_1, col_2, ...) values (val_1, val_2, ...)" - } - break - case 'update': - { - sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update :
set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val" - } - break - case 'delete': - { - sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from :
where primary_key_col=primary_key_val" - } - break - default: - throw new Error(`Operation ${operation} is not supported`) - } - - const result = await informix.executeQuery(payload.payload.schema, sql, null) - return result -} +const terminate = () => process.exit() /** * * @param {Array} messageSet List of messages from kafka * @param {String} topic The name of the message topic * @param {Number} partition The kafka partition to which messages are written */ -async function dataHandler (messageSet, topic, partition) { +async function dataHandler(messageSet, topic, partition) { for (const m of messageSet) { // Process messages sequentially + let message try { - const payload = JSON.parse(m.message.value) - logger.debug('Received payload from kafka:') - // logger.debug(payload) - await updateInformix(payload) + message = JSON.parse(m.message.value) + logger.debug('Received message from kafka:') + logger.debug(JSON.stringify(message)) + await updateInformix(message) await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success + await auditTrail([message.payload.payloadseqid,'scorecard_consumer',message.payload.table,message.payload.Uniquecolumn, + message.payload.operation,1,0,"",message.timestamp,new Date(),message.payload],'consumer') } catch (err) { logger.error('Could not process kafka message') - logger.logFullError(err) + //logger.logFullError(err) + try { + await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish + logger.debug('Trying to push same message after adding retryCounter') + if (!message.payload.retryCount) { + message.payload.retryCount = 0 + logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry); + } + if (message.payload.retryCount >= config.KAFKA.maxRetry) { + logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic); + + let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic }) + notifiyMessage.payload['recipients'] = config.KAFKA.recipients + logger.debug('pushing following message on kafka error alert queue:') + logger.debug(notifiyMessage) + await pushToKafka(notifiyMessage) + return + } + message.payload['retryCount'] = message.payload.retryCount + 1; + await pushToKafka(message) + logger.debug('pushed same message after adding retryCount') + } catch (err) { + //await auditTrail([payload.payload.payloadseqid,'scorecard_consumer',payload.payload.table,payload.payload.Uniquecolumn, + // payload.payload.operation,0,message.payload.retryCount,"re-publish kafka err",payload.timestamp,new Date(),""],'consumer') + logger.error("Error occured in re-publishing kafka message", err) + } } } } + /** * Initialize kafka consumer */ -async function setupKafkaConsumer () { +async function setupKafkaConsumer() { try { await consumer.init() await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler) diff --git a/src/producer.js b/src/producer.js index f028cc3..ce59982 100644 --- a/src/producer.js +++ b/src/producer.js @@ -4,60 +4,39 @@ const config = require('config') const pg = require('pg') const logger = require('./common/logger') - -const busApi = require('topcoder-bus-api-wrapper') -const _ = require('lodash') - +const pushToKafka = require('./services/pushToKafka') const pgOptions = config.get('POSTGRES') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` const pgClient = new pg.Client(pgConnectionString) - +const auditTrail = require('./services/auditTrail'); const express = require('express') const app = express() const port = 3000 -const busApiClient = busApi(_.pick(config, - ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', - 'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', - 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL'])) -busApiClient - .getHealth() - .then(result => console.log(result.body, result.status)) - .catch(err => console.log(err)) - -async function postTopic(payload) { - try { - await busApiClient.postEvent(payload) - } catch (e) { - console.log(e) - } -} async function setupPgClient () { try { await pgClient.connect() - // Listen to each of the trigger functions for (const triggerFunction of pgOptions.triggerFunctions) { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { - console.log('Received trigger payload:') - logger.debug(`Received trigger payload:`) - logger.debug(message) - //console.log(message) try { const payload = JSON.parse(message.payload) - console.log("level 0",payload); const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { - // await pushToKafka(payload) - await postTopic(payload) - } else { + console.log(`${payload.topic} ${payload.payload.table} ${payload.payload.operation} ${payload.timestamp}`); + await pushToKafka(payload) + } else { logger.debug('Ignoring message with incorrect topic or originator') } - } catch (err) { + await auditTrail([payload.payload.payloadseqid,'scorecard_producer',1,payload.topic,payload.payload.table,payload.payload.Uniquecolumn, + payload.payload.operation,"",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer') + } catch (error) { logger.error('Could not parse message payload') - logger.logFullError(err) + await auditTrail([payload.payload.payloadseqid,'scorecard_producer',0,payload.topic,payload.payload.table,payload.payload.Uniquecolumn, + payload.payload.operation,"error",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer') + logger.logFullError(error) } }) logger.info('Listening to notifications') @@ -75,7 +54,7 @@ async function run () { run() app.get('/health', (req, res) => { - //console.log('pgClient', pgClient) res.send('health ok') }) -app.listen(port, () => console.log(`Example app listening on port ${port}!`)) +app.listen(port, () => console.log(`app listening on port ${port}!`)) + diff --git a/src/services/auditTrail.js b/src/services/auditTrail.js new file mode 100644 index 0000000..fcd26a5 --- /dev/null +++ b/src/services/auditTrail.js @@ -0,0 +1,45 @@ +const config = require('config') +const pg = require('pg') +const logger = require('../common/logger') + +const pgOptions = config.get('POSTGRES') +const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` +let pgClient2 + +async function setupPgClient2 () { + pgClient2 = new pg.Client(pgConnectionString) + try { + await pgClient2.connect() + logger.debug('Connected to Pg Client2 Audit:') + } + catch (err) { + logger.error('Could not setup postgres client2') + logger.logFullError(err) + process.exit() + } +} + +async function auditTrail (data,sourcetype) { +if (!pgClient2) { + await setupPgClient2() +} +if (sourcetype === 'producer'){ + sql = 'INSERT INTO tcs_catalog.producer_scorecard_audit(payloadseqid,origin_source,kafka_post_status,topic_name,table_name,Uniquecolumn,operationtype,errormessage,payloadtime,auditdatetime,payload) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)' +logger.debug(`--Audit Trail update producer--`) +} else { +sql = 'INSERT INTO tcs_catalog.consumer_scorecard_audit(payloadseqid,origin_source,table_name,Uniquecolumn,operationtype,dest_db_status, dest_retry_count,errormessage,payloadtime,auditdatetime,dest_operationquery) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)' +logger.debug(`--Audit Trail update consumer--`) +} + return pgClient2.query(sql, data, (err, res) => { + if (err) { + logger.debug(`--Audit Trail update error-- ${err.stack}`) + //pgClient2.end() + } else { + logger.debug(`--Audit Trail update success-- `) + } +}) +} + + +module.exports = auditTrail + diff --git a/src/services/pushToKafka.js b/src/services/pushToKafka.js new file mode 100644 index 0000000..1ed5634 --- /dev/null +++ b/src/services/pushToKafka.js @@ -0,0 +1,34 @@ +/* + * Kafka producer that sends messages to Kafka server. + */ +const config = require('config') +const Kafka = require('no-kafka') +const logger = require('../common/logger') +const busApi = require('topcoder-bus-api-wrapper') +const _ = require('lodash') + +const kafkaOptions = config.get('KAFKA') +const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key +let producer + + +const busApiClient = busApi(_.pick(config, + ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', + 'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', + 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL'])) +busApiClient + .getHealth() + .then(result => console.log(result.body, result.status)) + .catch(err => console.log(err)) + + +async function pushToKafka(payload) { + try { + await busApiClient.postEvent(payload) + } catch (e) { + console.log(e) + } +} + +module.exports = pushToKafka + diff --git a/src/services/updateInformix.js b/src/services/updateInformix.js new file mode 100644 index 0000000..70c868d --- /dev/null +++ b/src/services/updateInformix.js @@ -0,0 +1,44 @@ + +const informix = require('../common/informixWrapper') +const logger = require('../common/logger') + +/** + * Updates informix database with insert/update/delete operation + * @param {Object} payload The DML trigger data + */ +async function updateInformix (payload) { + logger.debug('=====Starting to update informix with data:====') + //const operation = payload.operation.toLowerCase() + const operation = payload.payload.operation.toLowerCase() + console.log("level 1 informix ",operation) + let sql = null + + const columns = payload.payload.data + const primaryKey = payload.payload.Uniquecolumn + // Build SQL query + switch (operation) { + case 'insert': + { + const columnNames = Object.keys(columns) + sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `'${columns[k]}'`).join(', ')});` // "insert into :
(col_1, col_2, ...) values (val_1, val_2, ...)" + } + break + case 'update': + { + sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update :
set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val" + } + break + case 'delete': + { + sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from :
where primary_key_col=primary_key_val" + } + break + default: + throw new Error(`Operation ${operation} is not supported`) + } + + const result = await informix.executeQuery(payload.payload.schema, sql, null) + return result +} + +module.exports = updateInformix