Skip to content

Commit 0912d85

Browse files
author
root
committed
initial commit
1 parent 3755023 commit 0912d85

File tree

6 files changed

+338
-0
lines changed

6 files changed

+338
-0
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
.DS_Store
2+
node_modules
3+
*.log
4+
env_producer.sh
5+
env_consumer.sh

package.json

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"name": "postgres_informix_updater",
3+
"version": "1.0.0",
4+
"description": "Listens to DML triggers from the postgres database and updates the corresponding informix database",
5+
"scripts": {
6+
"test": "mocha ./test/*.test.js --exit",
7+
"lint": "standard --env mocha",
8+
"lint:fix": "standard --env mocha --fix",
9+
"producer": "node ./src/producer.js",
10+
"consumer": "node ./src/consumer.js",
11+
"start": "npm run producer & npm run consumer"
12+
},
13+
"author": "Topcoder",
14+
"license": "ISC",
15+
"dependencies": {
16+
"config": "^3.2.2",
17+
"informix-wrapper": "git+https://github.com/appirio-tech/informix-wrapper.git",
18+
"no-kafka": "^3.4.3",
19+
"pg": "^7.12.1",
20+
"standard": "^13.1.0",
21+
"underscore": "^1.9.1",
22+
"winston": "^3.2.1",
23+
"lodash": "^4.17.15",
24+
"topcoder-bus-api-wrapper": "^1.0.1"
25+
},
26+
"devDependencies": {
27+
"chai": "^4.2.0",
28+
"mocha": "^6.2.0"
29+
}
30+
}

src/common/informixWrapper.js

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/**
2+
* Provides services to execute queries on Informix database
3+
*/
4+
const config = require('config')
5+
const Wrapper = require('informix-wrapper')
6+
const logger = require('./logger')
7+
8+
const informixOptions = config.get('INFORMIX')
9+
const writeOperationRegex = new RegExp('^(insert|update|delete|create)', 'i') // Regex to check if sql query is a write operation
10+
11+
/**
12+
* Creates a connection to Informix
13+
* @param {String} database Name of database to connect to
14+
* @param {Boolean} isWrite Whether the query is a write operation or not
15+
* @param {Function} reject The callback function called in case of errors
16+
* @returns {Object} The connection object
17+
*/
18+
function createConnection (database, isWrite, reject) {
19+
const jdbc = new Wrapper({ ...informixOptions, ...{ database } })
20+
jdbc.on('error', (err) => {
21+
if (isWrite) {
22+
jdbc.endTransaction(err, (err) => {
23+
jdbc.disconnect()
24+
return reject(err)
25+
})
26+
} else {
27+
jdbc.disconnect()
28+
return reject(err)
29+
}
30+
})
31+
return jdbc.initialize()
32+
}
33+
34+
/**
35+
* Executes a sql query on the given database
36+
* @param {String} database Name of database to query on
37+
* @param {String} sql The sql query
38+
* @param {Object} [params] Optional configuration options
39+
* @returns {Promise<Object|Error>} Returns the result of the query on success or the error object on failure
40+
*/
41+
function executeQuery (database, sql, params) {
42+
return new Promise((resolve, reject) => {
43+
const isWrite = writeOperationRegex.test(sql.trim())
44+
const connection = createConnection(database, isWrite, reject)
45+
connection.connect((err) => {
46+
if (err) {
47+
connection.disconnect()
48+
return reject(err)
49+
}
50+
51+
if (isWrite) { // Write operations are executed inside a transaction
52+
connection.beginTransaction(() => {
53+
connection.query(sql, (err, res) => {
54+
if (err) {
55+
return reject(err)
56+
}
57+
resolve(res)
58+
}, {
59+
start: (q) => {
60+
logger.debug(`Starting to execute ${q}`)
61+
},
62+
finish: (f) => {
63+
connection.endTransaction(null, () => {
64+
connection.disconnect()
65+
logger.debug(`Finished executing`)
66+
})
67+
}
68+
}).execute(params)
69+
})
70+
} else {
71+
connection.query(sql, (err, res) => {
72+
if (err) {
73+
return reject(err)
74+
}
75+
resolve(res)
76+
}, {
77+
start: (q) => {
78+
logger.debug(`Starting to execute ${q}`)
79+
},
80+
finish: (f) => {
81+
connection.disconnect()
82+
logger.debug(`Finished executing`)
83+
}
84+
}).execute(params)
85+
}
86+
})
87+
})
88+
}
89+
90+
module.exports = {
91+
executeQuery
92+
}

src/common/logger.js

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* This module defines a winston logger instance for the application.
3+
*/
4+
const { createLogger, format, transports } = require('winston')
5+
6+
const config = require('config')
7+
8+
const logger = createLogger({
9+
format: format.combine(
10+
format.json(),
11+
format.colorize(),
12+
format.printf((data) => `${new Date().toISOString()} - ${data.level}: ${JSON.stringify(data.message, null, 4)}`)
13+
),
14+
transports: [
15+
new transports.Console({ // Log to console
16+
stderrLevels: ['error'],
17+
level: config.get('LOG_LEVEL')
18+
}),
19+
new transports.File({ // Log to file
20+
filename: config.get('LOG_FILE'),
21+
level: config.get('LOG_LEVEL')
22+
})
23+
]
24+
})
25+
26+
/**
27+
* Logs complete error message with stack trace if present
28+
*/
29+
logger.logFullError = (err) => {
30+
if (err && err.stack) {
31+
logger.error(err.stack)
32+
} else {
33+
logger.error(JSON.stringify(err))
34+
}
35+
}
36+
module.exports = logger

src/consumer.js

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/**
2+
* Receives postgres DML triggger messages from kafka and updates the informix database
3+
*/
4+
const config = require('config')
5+
const Kafka = require('no-kafka')
6+
const logger = require('./common/logger')
7+
const informix = require('./common/informixWrapper.js')
8+
9+
const kafkaOptions = config.get('KAFKA')
10+
const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
11+
const consumer = new Kafka.SimpleConsumer({
12+
connectionString: kafkaOptions.brokers_url,
13+
...(isSslEnabled && { // Include ssl options if present
14+
ssl: {
15+
cert: kafkaOptions.SSL.cert,
16+
key: kafkaOptions.SSL.key
17+
}
18+
})
19+
})
20+
21+
const terminate = () => process.exit()
22+
23+
/**
24+
* Updates informix database with insert/update/delete operation
25+
* @param {Object} payload The DML trigger data
26+
*/
27+
async function updateInformix (payload) {
28+
logger.debug('Starting to update informix with data:')
29+
logger.debug(payload)
30+
//const operation = payload.operation.toLowerCase()
31+
const operation = payload.payload.operation.toLowerCase()
32+
console.log("level producer1 ",operation)
33+
let sql = null
34+
35+
const columns = payload.payload.data
36+
const primaryKey = payload.payload.Uniquecolumn
37+
// Build SQL query
38+
switch (operation) {
39+
case 'insert':
40+
{
41+
const columnNames = Object.keys(columns)
42+
sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `'${columns[k]}'`).join(', ')});` // "insert into <schema>:<table> (col_1, col_2, ...) values (val_1, val_2, ...)"
43+
}
44+
break
45+
case 'update':
46+
{
47+
sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update <schema>:<table> set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val"
48+
}
49+
break
50+
case 'delete':
51+
{
52+
// const columns = payload.data
53+
//const primaryKey = payload.Uniquecolumn
54+
sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from <schema>:<table> where primary_key_col=primary_key_val"
55+
}
56+
break
57+
default:
58+
throw new Error(`Operation ${operation} is not supported`)
59+
}
60+
61+
const result = await informix.executeQuery(payload.payload.schema, sql, null)
62+
return result
63+
}
64+
65+
/**
66+
*
67+
* @param {Array} messageSet List of messages from kafka
68+
* @param {String} topic The name of the message topic
69+
* @param {Number} partition The kafka partition to which messages are written
70+
*/
71+
async function dataHandler (messageSet, topic, partition) {
72+
for (const m of messageSet) { // Process messages sequentially
73+
try {
74+
const payload = JSON.parse(m.message.value)
75+
logger.debug('Received payload from kafka:')
76+
logger.debug(payload)
77+
await updateInformix(payload)
78+
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success
79+
} catch (err) {
80+
logger.error('Could not process kafka message')
81+
logger.logFullError(err)
82+
}
83+
}
84+
}
85+
86+
/**
87+
* Initialize kafka consumer
88+
*/
89+
async function setupKafkaConsumer () {
90+
try {
91+
await consumer.init()
92+
await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler)
93+
logger.info('Initialized kafka consumer')
94+
} catch (err) {
95+
logger.error('Could not setup kafka consumer')
96+
logger.logFullError(err)
97+
terminate()
98+
}
99+
}
100+
101+
setupKafkaConsumer()

src/producer.js

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Listens to DML trigger notifications from postgres and pushes the trigger data into kafka
3+
*/
4+
const config = require('config')
5+
const pg = require('pg')
6+
const logger = require('./common/logger')
7+
8+
const busApi = require('topcoder-bus-api-wrapper')
9+
const _ = require('lodash')
10+
11+
const pgOptions = config.get('POSTGRES')
12+
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
13+
const pgClient = new pg.Client(pgConnectionString)
14+
15+
16+
17+
const busApiClient = busApi(_.pick(config,
18+
['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME',
19+
'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL',
20+
'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
21+
22+
busApiClient
23+
.getHealth()
24+
.then(result => console.log(result.body, result.status))
25+
.catch(err => console.log(err))
26+
27+
async function postTopic(payload) {
28+
try {
29+
await busApiClient.postEvent(payload)
30+
} catch (e) {
31+
console.log(e)
32+
}
33+
}
34+
35+
async function setupPgClient () {
36+
try {
37+
await pgClient.connect()
38+
// Listen to each of the trigger functions
39+
for (const triggerFunction of pgOptions.triggerFunctions) {
40+
await pgClient.query(`LISTEN ${triggerFunction}`)
41+
}
42+
pgClient.on('notification', async (message) => {
43+
console.log('Received trigger payload:')
44+
logger.debug(`Received trigger payload:`)
45+
logger.debug(message)
46+
//console.log(message)
47+
try {
48+
const payload = JSON.parse(message.payload)
49+
console.log("level 0",payload);
50+
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
51+
if (validTopicAndOriginator) {
52+
// await pushToKafka(payload)
53+
await postTopic(payload)
54+
} else {
55+
logger.debug('Ignoring message with incorrect topic or originator')
56+
}
57+
} catch (err) {
58+
logger.error('Could not parse message payload')
59+
logger.logFullError(err)
60+
}
61+
})
62+
logger.info('Listening to notifications')
63+
} catch (err) {
64+
logger.error('Could not setup postgres client')
65+
logger.logFullError(err)
66+
terminate()
67+
}
68+
}
69+
70+
async function run () {
71+
await setupPgClient()
72+
}
73+
74+
run()

0 commit comments

Comments
 (0)