Skip to content

Commit fd14132

Browse files
Merge pull request #1 from topcoder-platform/dev-retryfeature
[skip ci] merge into DEV from Dev retryfeature
2 parents af042f3 + eba433f commit fd14132

File tree

9 files changed

+221
-107
lines changed

9 files changed

+221
-107
lines changed

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ workflows:
8181
branches:
8282
only:
8383
- dev
84+
- dev-retryfeature
8485
- "build-prod":
8586
context : org-global
8687
filters:

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,6 @@ node_modules
33
*.log
44
env_producer.sh
55
env_consumer.sh
6+
*.env
7+
*.sh
8+
*.list

config/default.js

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ module.exports = {
77
host: process.env.INFORMIX_HOST || 'localhost',
88
port: parseInt(process.env.INFORMIX_PORT, 10) || 2021,
99
user: process.env.INFORMIX_USER || 'informix',
10-
password: process.env.INFORMIX_PASSWORD || '1nf0rm1x',
11-
database: process.env.INFORMIX_DATABASE || 'tcs_catalog',
12-
server: process.env.INFORMIX_SERVER || 'informixoltp_tcp',
10+
password: process.env.INFORMIX_PASSWORD || 'password',
11+
database: process.env.INFORMIX_DATABASE || 'db',
12+
server: process.env.INFORMIX_SERVER || 'informixserver',
1313
minpool: parseInt(process.env.MINPOOL, 10) || 1,
1414
maxpool: parseInt(process.env.MAXPOOL, 10) || 60,
1515
maxsize: parseInt(process.env.MAXSIZE, 10) || 0,
@@ -32,8 +32,11 @@ module.exports = {
3232
cert: process.env.KAFKA_CLIENT_CERT || null, // SSL client certificate file path
3333
key: process.env.KAFKA_CLIENT_CERT_KEY || null // SSL client key file path
3434
},
35-
topic: process.env.KAFKA_TOPIC || 'db.postgres.sync', // Kafka topic to push and receive messages
36-
partition: process.env.partition || [0] // Kafka partitions to use
35+
topic: process.env.KAFKA_TOPIC || 'db.topic.sync', // Kafka topic to push and receive messages
36+
partition: process.env.partition || [0], // Kafka partitions to use
37+
maxRetry: process.env.MAX_RETRY || 3,
38+
errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error',
39+
recipients: ['[email protected]'] // Kafka partitions to use
3740
},
3841

3942
AUTH0_URL: process.env.AUTH0_URL ,

scorecard_trigger_function.sql

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ DECLARE
1111
payload_items TEXT[];
1212
uniquecolumn TEXT;
1313
logtime TEXT;
14+
payloadseqid INTEGER;
1415
BEGIN
1516
-- Set record row depending on operation
1617
CASE TG_OP
@@ -51,6 +52,7 @@ BEGIN
5152
END LOOP;
5253
--logtime := (select date_display_tz());
5354
logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'));
55+
payloadseqid := (select nextval('payloadsequence'::regclass));
5456

5557
uniquecolumn := (SELECT c.column_name
5658
FROM information_schema.key_column_usage AS c
@@ -63,11 +65,10 @@ BEGIN
6365
|| '{'
6466
|| '"topic":"' || 'db.postgres.sync' || '",'
6567
|| '"originator":"' || 'tc-postgres-delta-processor' || '",'
66-
-- || '"timestamp":"' || '2019-08-19T08:39:48.959Z' || '",'
67-
|| '"timestamp":"' || logtime || '",'
68+
|| '"timestamp":"' || logtime || '",'
6869
|| '"mime-type":"' || 'application/json' || '",'
6970
|| '"payload": {'
70-
71+
|| '"payloadseqid":"' || payloadseqid || '",'
7172
|| '"Uniquecolumn":"' || uniquecolumn || '",'
7273
|| '"operation":"' || TG_OP || '",'
7374
|| '"schema":"' || TG_TABLE_SCHEMA || '",'
@@ -125,3 +126,22 @@ CREATE TRIGGER "scorecard_type_lu_trigger"
125126
AFTER INSERT OR DELETE OR UPDATE ON scorecard_type_lu
126127
FOR EACH ROW
127128
EXECUTE PROCEDURE notify_trigger('scorecard_type_id', 'name', 'description', 'create_user', 'create_date', 'modify_user', 'modify_date','version');
129+
130+
CREATE TABLE producer_scorecard_audit
131+
(seq_id SERIAL NOT NULL, payloadseqid INTEGER NOT NULL,
132+
origin_source CHARACTER VARYING(64) NOT NULL, kafka_post_status BOOLEAN,
133+
topic_name CHARACTER VARYING(64), table_name CHARACTER VARYING(64) NOT NULL,
134+
uniquecolumn CHARACTER VARYING(64), operationtype CHARACTER VARYING NOT NULL,
135+
errormessage CHARACTER VARYING, payloadtime TIMESTAMP(6) WITHOUT TIME ZONE,
136+
auditdatetime DATE NOT NULL, payload CHARACTER VARYING NOT NULL);
137+
138+
CREATE TABLE consumer_scorecard_audit (seq_id SERIAL NOT NULL, payloadseqid INTEGER NOT NULL,
139+
origin_source CHARACTER VARYING(64) NOT NULL, table_name CHARACTER VARYING(64) NOT NULL,
140+
uniquecolumn CHARACTER VARYING(64), operationtype CHARACTER VARYING NOT NULL,
141+
dest_db_status BOOLEAN, dest_retry_count INTEGER, errormessage CHARACTER VARYING,
142+
payloadtime TIMESTAMP(6) WITHOUT TIME ZONE, auditdatetime DATE NOT NULL,
143+
dest_operationquery CHARACTER VARYING);
144+
145+
CREATE SEQUENCE payloadsequence INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807
146+
START WITH 1 NO CYCLE;
147+

src/consumer.js

Lines changed: 50 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
const config = require('config')
55
const Kafka = require('no-kafka')
66
const logger = require('./common/logger')
7-
const informix = require('./common/informixWrapper.js')
7+
const updateInformix = require('./services/updateInformix')
8+
const pushToKafka = require('./services/pushToKafka')
89
const healthcheck = require('topcoder-healthcheck-dropin');
10+
const auditTrail = require('./services/auditTrail');
911
const kafkaOptions = config.get('KAFKA')
10-
const sleep = require('sleep');
1112
const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
1213
const consumer = new Kafka.SimpleConsumer({
1314
connectionString: kafkaOptions.brokers_url,
@@ -20,90 +21,74 @@ const consumer = new Kafka.SimpleConsumer({
2021
})
2122

2223

23-
const check = function () {
24-
if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {
25-
return false;
26-
}
27-
let connected = true;
28-
consumer.client.initialBrokers.forEach(conn => {
29-
logger.debug(`url ${conn.server()} - connected=${conn.connected}`);
30-
connected = conn.connected & connected;
31-
});
32-
return connected;
33-
};
34-
35-
36-
const terminate = () => process.exit()
37-
38-
/**
39-
* Updates informix database with insert/update/delete operation
40-
* @param {Object} payload The DML trigger data
41-
*/
42-
async function updateInformix (payload) {
43-
logger.debug('Starting to update informix with data:')
44-
logger.debug(payload)
45-
if (payload.payload.table === 'scorecard_question'){
46-
logger.debug('inside scorecard_question')
47-
sleep.sleep(2);
24+
const check = function () {
25+
if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {
26+
return false;
4827
}
49-
//const operation = payload.operation.toLowerCase()
50-
const operation = payload.payload.operation.toLowerCase()
51-
console.log("level producer1 ",operation)
52-
let sql = null
28+
let connected = true;
29+
consumer.client.initialBrokers.forEach(conn => {
30+
logger.debug(`url ${conn.server()} - connected=${conn.connected}`);
31+
connected = conn.connected & connected;
32+
});
33+
return connected;
34+
};
5335

54-
const columns = payload.payload.data
55-
const primaryKey = payload.payload.Uniquecolumn
56-
// Build SQL query
57-
switch (operation) {
58-
case 'insert':
59-
{
60-
const columnNames = Object.keys(columns)
61-
sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `'${columns[k]}'`).join(', ')});` // "insert into <schema>:<table> (col_1, col_2, ...) values (val_1, val_2, ...)"
62-
}
63-
break
64-
case 'update':
65-
{
66-
sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update <schema>:<table> set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val"
67-
}
68-
break
69-
case 'delete':
70-
{
71-
sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from <schema>:<table> where primary_key_col=primary_key_val"
72-
}
73-
break
74-
default:
75-
throw new Error(`Operation ${operation} is not supported`)
76-
}
77-
78-
const result = await informix.executeQuery(payload.payload.schema, sql, null)
79-
return result
80-
}
8136

37+
const terminate = () => process.exit()
8238
/**
8339
*
8440
* @param {Array} messageSet List of messages from kafka
8541
* @param {String} topic The name of the message topic
8642
* @param {Number} partition The kafka partition to which messages are written
8743
*/
88-
async function dataHandler (messageSet, topic, partition) {
44+
async function dataHandler(messageSet, topic, partition) {
8945
for (const m of messageSet) { // Process messages sequentially
46+
let message
9047
try {
91-
const payload = JSON.parse(m.message.value)
92-
logger.debug('Received payload from kafka:')
93-
// logger.debug(payload)
94-
await updateInformix(payload)
48+
message = JSON.parse(m.message.value)
49+
logger.debug('Received message from kafka:')
50+
logger.debug(JSON.stringify(message))
51+
await updateInformix(message)
9552
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success
53+
await auditTrail([message.payload.payloadseqid,'scorecard_consumer',message.payload.table,message.payload.Uniquecolumn,
54+
message.payload.operation,1,0,"",message.timestamp,new Date(),message.payload],'consumer')
9655
} catch (err) {
9756
logger.error('Could not process kafka message')
98-
logger.logFullError(err)
57+
//logger.logFullError(err)
58+
try {
59+
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish
60+
logger.debug('Trying to push same message after adding retryCounter')
61+
if (!message.payload.retryCount) {
62+
message.payload.retryCount = 0
63+
logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry);
64+
}
65+
if (message.payload.retryCount >= config.KAFKA.maxRetry) {
66+
logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic);
67+
68+
let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic })
69+
notifiyMessage.payload['recipients'] = config.KAFKA.recipients
70+
logger.debug('pushing following message on kafka error alert queue:')
71+
logger.debug(notifiyMessage)
72+
await pushToKafka(notifiyMessage)
73+
return
74+
}
75+
message.payload['retryCount'] = message.payload.retryCount + 1;
76+
await pushToKafka(message)
77+
logger.debug('pushed same message after adding retryCount')
78+
} catch (err) {
79+
//await auditTrail([payload.payload.payloadseqid,'scorecard_consumer',payload.payload.table,payload.payload.Uniquecolumn,
80+
// payload.payload.operation,0,message.payload.retryCount,"re-publish kafka err",payload.timestamp,new Date(),""],'consumer')
81+
logger.error("Error occured in re-publishing kafka message", err)
82+
}
9983
}
10084
}
10185
}
10286

87+
10388
/**
10489
* Initialize kafka consumer
10590
*/
106-
async function setupKafkaConsumer () {
91+
async function setupKafkaConsumer() {
10792
try {
10893
await consumer.init()
10994
await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler)

src/producer.js

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,60 +4,39 @@
44
const config = require('config')
55
const pg = require('pg')
66
const logger = require('./common/logger')
7-
8-
const busApi = require('topcoder-bus-api-wrapper')
9-
const _ = require('lodash')
10-
7+
const pushToKafka = require('./services/pushToKafka')
118
const pgOptions = config.get('POSTGRES')
129
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
1310
const pgClient = new pg.Client(pgConnectionString)
14-
11+
const auditTrail = require('./services/auditTrail');
1512
const express = require('express')
1613
const app = express()
1714
const port = 3000
1815

19-
const busApiClient = busApi(_.pick(config,
20-
['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME',
21-
'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL',
22-
'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
23-
busApiClient
24-
.getHealth()
25-
.then(result => console.log(result.body, result.status))
26-
.catch(err => console.log(err))
27-
28-
async function postTopic(payload) {
29-
try {
30-
await busApiClient.postEvent(payload)
31-
} catch (e) {
32-
console.log(e)
33-
}
34-
}
3516

3617
async function setupPgClient () {
3718
try {
3819
await pgClient.connect()
39-
// Listen to each of the trigger functions
4020
for (const triggerFunction of pgOptions.triggerFunctions) {
4121
await pgClient.query(`LISTEN ${triggerFunction}`)
4222
}
4323
pgClient.on('notification', async (message) => {
44-
console.log('Received trigger payload:')
45-
logger.debug(`Received trigger payload:`)
46-
logger.debug(message)
47-
//console.log(message)
4824
try {
4925
const payload = JSON.parse(message.payload)
50-
console.log("level 0",payload);
5126
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
5227
if (validTopicAndOriginator) {
53-
// await pushToKafka(payload)
54-
await postTopic(payload)
55-
} else {
28+
console.log(`${payload.topic} ${payload.payload.table} ${payload.payload.operation} ${payload.timestamp}`);
29+
await pushToKafka(payload)
30+
} else {
5631
logger.debug('Ignoring message with incorrect topic or originator')
5732
}
58-
} catch (err) {
33+
await auditTrail([payload.payload.payloadseqid,'scorecard_producer',1,payload.topic,payload.payload.table,payload.payload.Uniquecolumn,
34+
payload.payload.operation,"",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer')
35+
} catch (error) {
5936
logger.error('Could not parse message payload')
60-
logger.logFullError(err)
37+
await auditTrail([payload.payload.payloadseqid,'scorecard_producer',0,payload.topic,payload.payload.table,payload.payload.Uniquecolumn,
38+
payload.payload.operation,"error",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer')
39+
logger.logFullError(error)
6140
}
6241
})
6342
logger.info('Listening to notifications')
@@ -75,7 +54,7 @@ async function run () {
7554
run()
7655

7756
app.get('/health', (req, res) => {
78-
//console.log('pgClient', pgClient)
7957
res.send('health ok')
8058
})
81-
app.listen(port, () => console.log(`Example app listening on port ${port}!`))
59+
app.listen(port, () => console.log(`app listening on port ${port}!`))
60+

src/services/auditTrail.js

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
const config = require('config')
2+
const pg = require('pg')
3+
const logger = require('../common/logger')
4+
5+
const pgOptions = config.get('POSTGRES')
6+
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
7+
let pgClient2
8+
9+
async function setupPgClient2 () {
10+
pgClient2 = new pg.Client(pgConnectionString)
11+
try {
12+
await pgClient2.connect()
13+
logger.debug('Connected to Pg Client2 Audit:')
14+
}
15+
catch (err) {
16+
logger.error('Could not setup postgres client2')
17+
logger.logFullError(err)
18+
process.exit()
19+
}
20+
}
21+
22+
async function auditTrail (data,sourcetype) {
23+
if (!pgClient2) {
24+
await setupPgClient2()
25+
}
26+
if (sourcetype === 'producer'){
27+
sql = 'INSERT INTO tcs_catalog.producer_scorecard_audit(payloadseqid,origin_source,kafka_post_status,topic_name,table_name,Uniquecolumn,operationtype,errormessage,payloadtime,auditdatetime,payload) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)'
28+
logger.debug(`--Audit Trail update producer--`)
29+
} else {
30+
sql = 'INSERT INTO tcs_catalog.consumer_scorecard_audit(payloadseqid,origin_source,table_name,Uniquecolumn,operationtype,dest_db_status, dest_retry_count,errormessage,payloadtime,auditdatetime,dest_operationquery) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)'
31+
logger.debug(`--Audit Trail update consumer--`)
32+
}
33+
return pgClient2.query(sql, data, (err, res) => {
34+
if (err) {
35+
logger.debug(`--Audit Trail update error-- ${err.stack}`)
36+
//pgClient2.end()
37+
} else {
38+
logger.debug(`--Audit Trail update success-- `)
39+
}
40+
})
41+
}
42+
43+
44+
module.exports = auditTrail
45+

0 commit comments

Comments
 (0)