From 58714b30bcd043db6d92cbf1a5a4a73bafac6f23 Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari <sachin.maheshwari@topcoder.com>
Date: Fri, 15 May 2020 12:28:04 +0530
Subject: [PATCH] group consumer changes

---
 config/default.js | 34 +++++++++++++++++++---------------
 src/consumer.js   | 17 ++++++++++++++---
 2 files changed, 33 insertions(+), 18 deletions(-)

diff --git a/config/default.js b/config/default.js
index c64292d..65d48a8 100644
--- a/config/default.js
+++ b/config/default.js
@@ -36,30 +36,34 @@ module.exports = {
     partition: process.env.partition || [0], // Kafka partitions to use
     maxRetry: process.env.MAX_RETRY || 3,
     errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error',
-    recipients: ['admin@abc.com'] // Kafka partitions to use
+    recipients: ['admin@abc.com'], // Kafka partitions to use,
+    KAFKA_URL: process.env.KAFKA_URL,
+    KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'postgres-ifx-consumer',
+    KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT ? process.env.KAFKA_CLIENT_CERT.replace('\\n', '\n') : null,
+    KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ? process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null,
   },
   SLACK: {
     URL: process.env.SLACKURL || 'us-east-1',
     SLACKCHANNEL: process.env.SLACKCHANNEL || 'ifxpg-migrator',
-    SLACKNOTIFY:  process.env.SLACKNOTIFY || 'false'
+    SLACKNOTIFY: process.env.SLACKNOTIFY || 'false'
   },
-   RECONSILER:{
+  RECONSILER: {
     RECONSILER_START: process.env.RECONSILER_START || 5,
     RECONSILER_END: process.env.RECONSILER_END || 1,
     RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm'
   },
   DYNAMODB:
-	{
-	DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync',
-  	DD_ElapsedTime: process.env.DD_ElapsedTime || 600000
-	},
+  {
+    DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync',
+    DD_ElapsedTime: process.env.DD_ElapsedTime || 600000
+  },
 
-	AUTH0_URL: process.env.AUTH0_URL ,
-	AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE ,
-	TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME ,
-	AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID ,
-	AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET  ,
-	BUSAPI_URL : process.env.BUSAPI_URL ,
-	KAFKA_ERROR_TOPIC : process.env.KAFKA_ERROR_TOPIC ,
-	AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL 
+  AUTH0_URL: process.env.AUTH0_URL,
+  AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE,
+  TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME,
+  AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID,
+  AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET,
+  BUSAPI_URL: process.env.BUSAPI_URL,
+  KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC,
+  AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL
 }
diff --git a/src/consumer.js b/src/consumer.js
index 6fc4084..abddff2 100644
--- a/src/consumer.js
+++ b/src/consumer.js
@@ -10,8 +10,19 @@ const healthcheck = require('topcoder-healthcheck-dropin');
 const auditTrail = require('./services/auditTrail');
 const kafkaOptions = config.get('KAFKA')
 const postMessage = require('./services/posttoslack')
-const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
-const consumer = new Kafka.SimpleConsumer({
+//const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
+
+const options = {
+  groupId: kafkaOptions.KAFKA_GROUP_ID,
+  connectionString: kafkaOptions.KAFKA_URL,
+  ssl: {
+    cert: kafkaOptions.KAFKA_CLIENT_CERT,
+    key: kafkaOptions.KAFKA_CLIENT_CERT_KEY
+  }
+};
+const consumer = new Kafka.GroupConsumer(options);
+
+/*const consumer = new Kafka.SimpleConsumer({
   connectionString: kafkaOptions.brokers_url,
   ...(isSslEnabled && { // Include ssl options if present
     ssl: {
@@ -19,7 +30,7 @@ const consumer = new Kafka.SimpleConsumer({
       key: kafkaOptions.SSL.key
     }
   })
-})
+})*/
 
 const check = function () {
   if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {