diff --git a/config/default.js b/config/default.js index c64292d..65d48a8 100644 --- a/config/default.js +++ b/config/default.js @@ -36,30 +36,34 @@ module.exports = { partition: process.env.partition || [0], // Kafka partitions to use maxRetry: process.env.MAX_RETRY || 3, errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error', - recipients: ['admin@abc.com'] // Kafka partitions to use + recipients: ['admin@abc.com'], // Kafka partitions to use, + KAFKA_URL: process.env.KAFKA_URL, + KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'postgres-ifx-consumer', + KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT ? process.env.KAFKA_CLIENT_CERT.replace('\\n', '\n') : null, + KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ? process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null, }, SLACK: { URL: process.env.SLACKURL || 'us-east-1', SLACKCHANNEL: process.env.SLACKCHANNEL || 'ifxpg-migrator', - SLACKNOTIFY: process.env.SLACKNOTIFY || 'false' + SLACKNOTIFY: process.env.SLACKNOTIFY || 'false' }, - RECONSILER:{ + RECONSILER: { RECONSILER_START: process.env.RECONSILER_START || 5, RECONSILER_END: process.env.RECONSILER_END || 1, RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm' }, DYNAMODB: - { - DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync', - DD_ElapsedTime: process.env.DD_ElapsedTime || 600000 - }, + { + DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync', + DD_ElapsedTime: process.env.DD_ElapsedTime || 600000 + }, - AUTH0_URL: process.env.AUTH0_URL , - AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE , - TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME , - AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID , - AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET , - BUSAPI_URL : process.env.BUSAPI_URL , - KAFKA_ERROR_TOPIC : process.env.KAFKA_ERROR_TOPIC , - AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL + AUTH0_URL: process.env.AUTH0_URL, + AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE, + TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME, + AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID, + AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET, + BUSAPI_URL: process.env.BUSAPI_URL, + KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC, + AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL } diff --git a/src/consumer.js b/src/consumer.js index 6fc4084..abddff2 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -10,8 +10,19 @@ const healthcheck = require('topcoder-healthcheck-dropin'); const auditTrail = require('./services/auditTrail'); const kafkaOptions = config.get('KAFKA') const postMessage = require('./services/posttoslack') -const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key -const consumer = new Kafka.SimpleConsumer({ +//const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key + +const options = { + groupId: kafkaOptions.KAFKA_GROUP_ID, + connectionString: kafkaOptions.KAFKA_URL, + ssl: { + cert: kafkaOptions.KAFKA_CLIENT_CERT, + key: kafkaOptions.KAFKA_CLIENT_CERT_KEY + } +}; +const consumer = new Kafka.GroupConsumer(options); + +/*const consumer = new Kafka.SimpleConsumer({ connectionString: kafkaOptions.brokers_url, ...(isSslEnabled && { // Include ssl options if present ssl: { @@ -19,7 +30,7 @@ const consumer = new Kafka.SimpleConsumer({ key: kafkaOptions.SSL.key } }) -}) +})*/ const check = function () { if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {