Skip to content

Dev retryfeature #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ workflows:
branches:
only:
- dev
- dev-retryfeature
- "build-prod":
context : org-global
filters:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ node_modules
*.log
env_producer.sh
env_consumer.sh
*.env
*.sh
*.list
13 changes: 8 additions & 5 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ module.exports = {
host: process.env.INFORMIX_HOST || 'localhost',
port: parseInt(process.env.INFORMIX_PORT, 10) || 2021,
user: process.env.INFORMIX_USER || 'informix',
password: process.env.INFORMIX_PASSWORD || '1nf0rm1x',
database: process.env.INFORMIX_DATABASE || 'tcs_catalog',
server: process.env.INFORMIX_SERVER || 'informixoltp_tcp',
password: process.env.INFORMIX_PASSWORD || 'password',
database: process.env.INFORMIX_DATABASE || 'db',
server: process.env.INFORMIX_SERVER || 'informixserver',
minpool: parseInt(process.env.MINPOOL, 10) || 1,
maxpool: parseInt(process.env.MAXPOOL, 10) || 60,
maxsize: parseInt(process.env.MAXSIZE, 10) || 0,
Expand All @@ -32,8 +32,11 @@ module.exports = {
cert: process.env.KAFKA_CLIENT_CERT || null, // SSL client certificate file path
key: process.env.KAFKA_CLIENT_CERT_KEY || null // SSL client key file path
},
topic: process.env.KAFKA_TOPIC || 'db.postgres.sync', // Kafka topic to push and receive messages
partition: process.env.partition || [0] // Kafka partitions to use
topic: process.env.KAFKA_TOPIC || 'db.topic.sync', // Kafka topic to push and receive messages
partition: process.env.partition || [0], // Kafka partitions to use
maxRetry: process.env.MAX_RETRY || 3,
errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error',
recipients: ['[email protected]'] // Kafka partitions to use
},

AUTH0_URL: process.env.AUTH0_URL ,
Expand Down
26 changes: 23 additions & 3 deletions scorecard_trigger_function.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ DECLARE
payload_items TEXT[];
uniquecolumn TEXT;
logtime TEXT;
payloadseqid INTEGER;
BEGIN
-- Set record row depending on operation
CASE TG_OP
Expand Down Expand Up @@ -51,6 +52,7 @@ BEGIN
END LOOP;
--logtime := (select date_display_tz());
logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'));
payloadseqid := (select nextval('payloadsequence'::regclass));

uniquecolumn := (SELECT c.column_name
FROM information_schema.key_column_usage AS c
Expand All @@ -63,11 +65,10 @@ BEGIN
|| '{'
|| '"topic":"' || 'db.postgres.sync' || '",'
|| '"originator":"' || 'tc-postgres-delta-processor' || '",'
-- || '"timestamp":"' || '2019-08-19T08:39:48.959Z' || '",'
|| '"timestamp":"' || logtime || '",'
|| '"timestamp":"' || logtime || '",'
|| '"mime-type":"' || 'application/json' || '",'
|| '"payload": {'

|| '"payloadseqid":"' || payloadseqid || '",'
|| '"Uniquecolumn":"' || uniquecolumn || '",'
|| '"operation":"' || TG_OP || '",'
|| '"schema":"' || TG_TABLE_SCHEMA || '",'
Expand Down Expand Up @@ -125,3 +126,22 @@ CREATE TRIGGER "scorecard_type_lu_trigger"
AFTER INSERT OR DELETE OR UPDATE ON scorecard_type_lu
FOR EACH ROW
EXECUTE PROCEDURE notify_trigger('scorecard_type_id', 'name', 'description', 'create_user', 'create_date', 'modify_user', 'modify_date','version');

CREATE TABLE producer_scorecard_audit
(seq_id SERIAL NOT NULL, payloadseqid INTEGER NOT NULL,
origin_source CHARACTER VARYING(64) NOT NULL, kafka_post_status BOOLEAN,
topic_name CHARACTER VARYING(64), table_name CHARACTER VARYING(64) NOT NULL,
uniquecolumn CHARACTER VARYING(64), operationtype CHARACTER VARYING NOT NULL,
errormessage CHARACTER VARYING, payloadtime TIMESTAMP(6) WITHOUT TIME ZONE,
auditdatetime DATE NOT NULL, payload CHARACTER VARYING NOT NULL);

CREATE TABLE consumer_scorecard_audit (seq_id SERIAL NOT NULL, payloadseqid INTEGER NOT NULL,
origin_source CHARACTER VARYING(64) NOT NULL, table_name CHARACTER VARYING(64) NOT NULL,
uniquecolumn CHARACTER VARYING(64), operationtype CHARACTER VARYING NOT NULL,
dest_db_status BOOLEAN, dest_retry_count INTEGER, errormessage CHARACTER VARYING,
payloadtime TIMESTAMP(6) WITHOUT TIME ZONE, auditdatetime DATE NOT NULL,
dest_operationquery CHARACTER VARYING);

CREATE SEQUENCE payloadsequence INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807
START WITH 1 NO CYCLE;

115 changes: 50 additions & 65 deletions src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
const config = require('config')
const Kafka = require('no-kafka')
const logger = require('./common/logger')
const informix = require('./common/informixWrapper.js')
const updateInformix = require('./services/updateInformix')
const pushToKafka = require('./services/pushToKafka')
const healthcheck = require('topcoder-healthcheck-dropin');
const auditTrail = require('./services/auditTrail');
const kafkaOptions = config.get('KAFKA')
const sleep = require('sleep');
const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
const consumer = new Kafka.SimpleConsumer({
connectionString: kafkaOptions.brokers_url,
Expand All @@ -20,90 +21,74 @@ const consumer = new Kafka.SimpleConsumer({
})


const check = function () {
if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {
return false;
}
let connected = true;
consumer.client.initialBrokers.forEach(conn => {
logger.debug(`url ${conn.server()} - connected=${conn.connected}`);
connected = conn.connected & connected;
});
return connected;
};


const terminate = () => process.exit()

/**
* Updates informix database with insert/update/delete operation
* @param {Object} payload The DML trigger data
*/
async function updateInformix (payload) {
logger.debug('Starting to update informix with data:')
logger.debug(payload)
if (payload.payload.table === 'scorecard_question'){
logger.debug('inside scorecard_question')
sleep.sleep(2);
const check = function () {
if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {
return false;
}
//const operation = payload.operation.toLowerCase()
const operation = payload.payload.operation.toLowerCase()
console.log("level producer1 ",operation)
let sql = null
let connected = true;
consumer.client.initialBrokers.forEach(conn => {
logger.debug(`url ${conn.server()} - connected=${conn.connected}`);
connected = conn.connected & connected;
});
return connected;
};

const columns = payload.payload.data
const primaryKey = payload.payload.Uniquecolumn
// Build SQL query
switch (operation) {
case 'insert':
{
const columnNames = Object.keys(columns)
sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `'${columns[k]}'`).join(', ')});` // "insert into <schema>:<table> (col_1, col_2, ...) values (val_1, val_2, ...)"
}
break
case 'update':
{
sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update <schema>:<table> set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val"
}
break
case 'delete':
{
sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from <schema>:<table> where primary_key_col=primary_key_val"
}
break
default:
throw new Error(`Operation ${operation} is not supported`)
}

const result = await informix.executeQuery(payload.payload.schema, sql, null)
return result
}

const terminate = () => process.exit()
/**
*
* @param {Array} messageSet List of messages from kafka
* @param {String} topic The name of the message topic
* @param {Number} partition The kafka partition to which messages are written
*/
async function dataHandler (messageSet, topic, partition) {
async function dataHandler(messageSet, topic, partition) {
for (const m of messageSet) { // Process messages sequentially
let message
try {
const payload = JSON.parse(m.message.value)
logger.debug('Received payload from kafka:')
// logger.debug(payload)
await updateInformix(payload)
message = JSON.parse(m.message.value)
logger.debug('Received message from kafka:')
logger.debug(JSON.stringify(message))
await updateInformix(message)
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success
await auditTrail([message.payload.payloadseqid,'scorecard_consumer',message.payload.table,message.payload.Uniquecolumn,
message.payload.operation,1,0,"",message.timestamp,new Date(),message.payload],'consumer')
} catch (err) {
logger.error('Could not process kafka message')
logger.logFullError(err)
//logger.logFullError(err)
try {
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish
logger.debug('Trying to push same message after adding retryCounter')
if (!message.payload.retryCount) {
message.payload.retryCount = 0
logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry);
}
if (message.payload.retryCount >= config.KAFKA.maxRetry) {
logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic);

let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic })
notifiyMessage.payload['recipients'] = config.KAFKA.recipients
logger.debug('pushing following message on kafka error alert queue:')
logger.debug(notifiyMessage)
await pushToKafka(notifiyMessage)
return
}
message.payload['retryCount'] = message.payload.retryCount + 1;
await pushToKafka(message)
logger.debug('pushed same message after adding retryCount')
} catch (err) {
//await auditTrail([payload.payload.payloadseqid,'scorecard_consumer',payload.payload.table,payload.payload.Uniquecolumn,
// payload.payload.operation,0,message.payload.retryCount,"re-publish kafka err",payload.timestamp,new Date(),""],'consumer')
logger.error("Error occured in re-publishing kafka message", err)
}
}
}
}


/**
* Initialize kafka consumer
*/
async function setupKafkaConsumer () {
async function setupKafkaConsumer() {
try {
await consumer.init()
await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler)
Expand Down
47 changes: 13 additions & 34 deletions src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,60 +4,39 @@
const config = require('config')
const pg = require('pg')
const logger = require('./common/logger')

const busApi = require('topcoder-bus-api-wrapper')
const _ = require('lodash')

const pushToKafka = require('./services/pushToKafka')
const pgOptions = config.get('POSTGRES')
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
const pgClient = new pg.Client(pgConnectionString)

const auditTrail = require('./services/auditTrail');
const express = require('express')
const app = express()
const port = 3000

const busApiClient = busApi(_.pick(config,
['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME',
'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL',
'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
busApiClient
.getHealth()
.then(result => console.log(result.body, result.status))
.catch(err => console.log(err))

async function postTopic(payload) {
try {
await busApiClient.postEvent(payload)
} catch (e) {
console.log(e)
}
}

async function setupPgClient () {
try {
await pgClient.connect()
// Listen to each of the trigger functions
for (const triggerFunction of pgOptions.triggerFunctions) {
await pgClient.query(`LISTEN ${triggerFunction}`)
}
pgClient.on('notification', async (message) => {
console.log('Received trigger payload:')
logger.debug(`Received trigger payload:`)
logger.debug(message)
//console.log(message)
try {
const payload = JSON.parse(message.payload)
console.log("level 0",payload);
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
if (validTopicAndOriginator) {
// await pushToKafka(payload)
await postTopic(payload)
} else {
console.log(`${payload.topic} ${payload.payload.table} ${payload.payload.operation} ${payload.timestamp}`);
await pushToKafka(payload)
} else {
logger.debug('Ignoring message with incorrect topic or originator')
}
} catch (err) {
await auditTrail([payload.payload.payloadseqid,'scorecard_producer',1,payload.topic,payload.payload.table,payload.payload.Uniquecolumn,
payload.payload.operation,"",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer')
} catch (error) {
logger.error('Could not parse message payload')
logger.logFullError(err)
await auditTrail([payload.payload.payloadseqid,'scorecard_producer',0,payload.topic,payload.payload.table,payload.payload.Uniquecolumn,
payload.payload.operation,"error",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer')
logger.logFullError(error)
}
})
logger.info('Listening to notifications')
Expand All @@ -75,7 +54,7 @@ async function run () {
run()

app.get('/health', (req, res) => {
//console.log('pgClient', pgClient)
res.send('health ok')
})
app.listen(port, () => console.log(`Example app listening on port ${port}!`))
app.listen(port, () => console.log(`app listening on port ${port}!`))

45 changes: 45 additions & 0 deletions src/services/auditTrail.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
const config = require('config')
const pg = require('pg')
const logger = require('../common/logger')

const pgOptions = config.get('POSTGRES')
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
let pgClient2

async function setupPgClient2 () {
pgClient2 = new pg.Client(pgConnectionString)
try {
await pgClient2.connect()
logger.debug('Connected to Pg Client2 Audit:')
}
catch (err) {
logger.error('Could not setup postgres client2')
logger.logFullError(err)
process.exit()
}
}

async function auditTrail (data,sourcetype) {
if (!pgClient2) {
await setupPgClient2()
}
if (sourcetype === 'producer'){
sql = 'INSERT INTO tcs_catalog.producer_scorecard_audit(payloadseqid,origin_source,kafka_post_status,topic_name,table_name,Uniquecolumn,operationtype,errormessage,payloadtime,auditdatetime,payload) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)'
logger.debug(`--Audit Trail update producer--`)
} else {
sql = 'INSERT INTO tcs_catalog.consumer_scorecard_audit(payloadseqid,origin_source,table_name,Uniquecolumn,operationtype,dest_db_status, dest_retry_count,errormessage,payloadtime,auditdatetime,dest_operationquery) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)'
logger.debug(`--Audit Trail update consumer--`)
}
return pgClient2.query(sql, data, (err, res) => {
if (err) {
logger.debug(`--Audit Trail update error-- ${err.stack}`)
//pgClient2.end()
} else {
logger.debug(`--Audit Trail update success-- `)
}
})
}


module.exports = auditTrail

Loading