Skip to content

Commit 9106279

Browse files
author
root
committed
kafka retry feature
1 parent af042f3 commit 9106279

File tree

6 files changed

+118
-78
lines changed

6 files changed

+118
-78
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ node_modules
33
*.log
44
env_producer.sh
55
env_consumer.sh
6+
*.env
7+
*.sh

config/default.js

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ module.exports = {
77
host: process.env.INFORMIX_HOST || 'localhost',
88
port: parseInt(process.env.INFORMIX_PORT, 10) || 2021,
99
user: process.env.INFORMIX_USER || 'informix',
10-
password: process.env.INFORMIX_PASSWORD || '1nf0rm1x',
11-
database: process.env.INFORMIX_DATABASE || 'tcs_catalog',
12-
server: process.env.INFORMIX_SERVER || 'informixoltp_tcp',
10+
password: process.env.INFORMIX_PASSWORD || 'password',
11+
database: process.env.INFORMIX_DATABASE || 'db',
12+
server: process.env.INFORMIX_SERVER || 'informixserver',
1313
minpool: parseInt(process.env.MINPOOL, 10) || 1,
1414
maxpool: parseInt(process.env.MAXPOOL, 10) || 60,
1515
maxsize: parseInt(process.env.MAXSIZE, 10) || 0,
@@ -32,8 +32,11 @@ module.exports = {
3232
cert: process.env.KAFKA_CLIENT_CERT || null, // SSL client certificate file path
3333
key: process.env.KAFKA_CLIENT_CERT_KEY || null // SSL client key file path
3434
},
35-
topic: process.env.KAFKA_TOPIC || 'db.postgres.sync', // Kafka topic to push and receive messages
36-
partition: process.env.partition || [0] // Kafka partitions to use
35+
topic: process.env.KAFKA_TOPIC || 'db.topic.sync', // Kafka topic to push and receive messages
36+
partition: process.env.partition || [0], // Kafka partitions to use
37+
maxRetry: process.env.MAX_RETRY || 3,
38+
errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error',
39+
recipients: ['[email protected]'] // Kafka partitions to use
3740
},
3841

3942
AUTH0_URL: process.env.AUTH0_URL ,

src/consumer.js

Lines changed: 20 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
const config = require('config')
55
const Kafka = require('no-kafka')
66
const logger = require('./common/logger')
7-
const informix = require('./common/informixWrapper.js')
7+
const updateInformix = require('./services/updateInformix')
8+
const pushToKafka = require('./services/pushToKafka')
89
const healthcheck = require('topcoder-healthcheck-dropin');
910
const kafkaOptions = config.get('KAFKA')
10-
const sleep = require('sleep');
11+
//const sleep = require('sleep');
1112
const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
1213
const consumer = new Kafka.SimpleConsumer({
1314
connectionString: kafkaOptions.brokers_url,
@@ -34,51 +35,6 @@ const consumer = new Kafka.SimpleConsumer({
3435

3536

3637
const terminate = () => process.exit()
37-
38-
/**
39-
* Updates informix database with insert/update/delete operation
40-
* @param {Object} payload The DML trigger data
41-
*/
42-
async function updateInformix (payload) {
43-
logger.debug('Starting to update informix with data:')
44-
logger.debug(payload)
45-
if (payload.payload.table === 'scorecard_question'){
46-
logger.debug('inside scorecard_question')
47-
sleep.sleep(2);
48-
}
49-
//const operation = payload.operation.toLowerCase()
50-
const operation = payload.payload.operation.toLowerCase()
51-
console.log("level producer1 ",operation)
52-
let sql = null
53-
54-
const columns = payload.payload.data
55-
const primaryKey = payload.payload.Uniquecolumn
56-
// Build SQL query
57-
switch (operation) {
58-
case 'insert':
59-
{
60-
const columnNames = Object.keys(columns)
61-
sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `'${columns[k]}'`).join(', ')});` // "insert into <schema>:<table> (col_1, col_2, ...) values (val_1, val_2, ...)"
62-
}
63-
break
64-
case 'update':
65-
{
66-
sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update <schema>:<table> set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val"
67-
}
68-
break
69-
case 'delete':
70-
{
71-
sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from <schema>:<table> where primary_key_col=primary_key_val"
72-
}
73-
break
74-
default:
75-
throw new Error(`Operation ${operation} is not supported`)
76-
}
77-
78-
const result = await informix.executeQuery(payload.payload.schema, sql, null)
79-
return result
80-
}
81-
8238
/**
8339
*
8440
* @param {Array} messageSet List of messages from kafka
@@ -90,15 +46,29 @@ async function dataHandler (messageSet, topic, partition) {
9046
try {
9147
const payload = JSON.parse(m.message.value)
9248
logger.debug('Received payload from kafka:')
93-
// logger.debug(payload)
49+
logger.debug(payload)
9450
await updateInformix(payload)
51+
// await insertConsumerAudit(payload, true, undefined, false)
9552
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success
9653
} catch (err) {
9754
logger.error('Could not process kafka message')
9855
logger.logFullError(err)
56+
if (!payload.retryCount) {
57+
payload.retryCount = 0
58+
}
59+
if (payload.retryCount >= config.KAFKA.maxRetry) {
60+
await pushToKafka(
61+
Object.assign({}, payload, { topic: config.KAFKA.errorTopic, recipients: config.KAFKA.recipients })
62+
)
63+
return
64+
}
65+
await pushToKafka(
66+
Object.assign({}, payload, { retryCount: payload.retryCount + 1 })
67+
)
68+
}
9969
}
10070
}
101-
}
71+
10272

10373
/**
10474
* Initialize kafka consumer
@@ -117,3 +87,4 @@ async function setupKafkaConsumer () {
11787
}
11888

11989
setupKafkaConsumer()
90+

src/producer.js

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,34 +4,15 @@
44
const config = require('config')
55
const pg = require('pg')
66
const logger = require('./common/logger')
7-
8-
const busApi = require('topcoder-bus-api-wrapper')
9-
const _ = require('lodash')
10-
7+
const pushToKafka = require('./services/pushToKafka')
118
const pgOptions = config.get('POSTGRES')
129
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
1310
const pgClient = new pg.Client(pgConnectionString)
1411

1512
const express = require('express')
1613
const app = express()
17-
const port = 3000
18-
19-
const busApiClient = busApi(_.pick(config,
20-
['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME',
21-
'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL',
22-
'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
23-
busApiClient
24-
.getHealth()
25-
.then(result => console.log(result.body, result.status))
26-
.catch(err => console.log(err))
14+
const port = 4000
2715

28-
async function postTopic(payload) {
29-
try {
30-
await busApiClient.postEvent(payload)
31-
} catch (e) {
32-
console.log(e)
33-
}
34-
}
3516

3617
async function setupPgClient () {
3718
try {
@@ -51,7 +32,7 @@ async function setupPgClient () {
5132
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
5233
if (validTopicAndOriginator) {
5334
// await pushToKafka(payload)
54-
await postTopic(payload)
35+
await pushToKafka(payload)
5536
} else {
5637
logger.debug('Ignoring message with incorrect topic or originator')
5738
}
@@ -75,7 +56,7 @@ async function run () {
7556
run()
7657

7758
app.get('/health', (req, res) => {
78-
//console.log('pgClient', pgClient)
7959
res.send('health ok')
8060
})
81-
app.listen(port, () => console.log(`Example app listening on port ${port}!`))
61+
app.listen(port, () => console.log(`app listening on port ${port}!`))
62+

src/services/pushToKafka.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Kafka producer that sends messages to Kafka server.
3+
*/
4+
const config = require('config')
5+
const Kafka = require('no-kafka')
6+
const logger = require('../common/logger')
7+
const busApi = require('topcoder-bus-api-wrapper')
8+
const _ = require('lodash')
9+
10+
const kafkaOptions = config.get('KAFKA')
11+
const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
12+
let producer
13+
14+
15+
const busApiClient = busApi(_.pick(config,
16+
['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME',
17+
'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL',
18+
'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
19+
busApiClient
20+
.getHealth()
21+
.then(result => console.log(result.body, result.status))
22+
.catch(err => console.log(err))
23+
24+
25+
async function pushToKafka(payload) {
26+
try {
27+
await busApiClient.postEvent(payload)
28+
} catch (e) {
29+
console.log(e)
30+
}
31+
}
32+
33+
module.exports = pushToKafka
34+

src/services/updateInformix.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
2+
const informix = require('../common/informixWrapper')
3+
const logger = require('../common/logger')
4+
5+
/**
6+
* Updates informix database with insert/update/delete operation
7+
* @param {Object} payload The DML trigger data
8+
*/
9+
async function updateInformix (payload) {
10+
logger.debug('Starting to update informix with data:')
11+
logger.debug(payload)
12+
if (payload.payload.table === 'scorecard_question'){
13+
logger.debug('inside scorecard_question')
14+
sleep.sleep(2);
15+
}
16+
//const operation = payload.operation.toLowerCase()
17+
const operation = payload.payload.operation.toLowerCase()
18+
console.log("level producer1 ",operation)
19+
let sql = null
20+
21+
const columns = payload.payload.data
22+
const primaryKey = payload.payload.Uniquecolumn
23+
// Build SQL query
24+
switch (operation) {
25+
case 'insert':
26+
{
27+
const columnNames = Object.keys(columns)
28+
sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `'${columns[k]}'`).join(', ')});` // "insert into <schema>:<table> (col_1, col_2, ...) values (val_1, val_2, ...)"
29+
}
30+
break
31+
case 'update':
32+
{
33+
sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update <schema>:<table> set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val"
34+
}
35+
break
36+
case 'delete':
37+
{
38+
sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from <schema>:<table> where primary_key_col=primary_key_val"
39+
}
40+
break
41+
default:
42+
throw new Error(`Operation ${operation} is not supported`)
43+
}
44+
45+
const result = await informix.executeQuery(payload.payload.schema, sql, null)
46+
return result
47+
}
48+
49+
module.exports = updateInformix

0 commit comments

Comments
 (0)