Skip to content

Commit 3e67379

Browse files
Merge pull request #11 from topcoder-platform/feature/group-consumer
[skip ci] group consumer changes
2 parents 8e815f6 + 58714b3 commit 3e67379

File tree

2 files changed

+33
-18
lines changed

2 files changed

+33
-18
lines changed

config/default.js

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,30 +36,34 @@ module.exports = {
3636
partition: process.env.partition || [0], // Kafka partitions to use
3737
maxRetry: process.env.MAX_RETRY || 3,
3838
errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error',
39-
recipients: ['[email protected]'] // Kafka partitions to use
39+
recipients: ['[email protected]'], // Kafka partitions to use,
40+
KAFKA_URL: process.env.KAFKA_URL,
41+
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'postgres-ifx-consumer',
42+
KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT ? process.env.KAFKA_CLIENT_CERT.replace('\\n', '\n') : null,
43+
KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ? process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null,
4044
},
4145
SLACK: {
4246
URL: process.env.SLACKURL || 'us-east-1',
4347
SLACKCHANNEL: process.env.SLACKCHANNEL || 'ifxpg-migrator',
44-
SLACKNOTIFY: process.env.SLACKNOTIFY || 'false'
48+
SLACKNOTIFY: process.env.SLACKNOTIFY || 'false'
4549
},
46-
RECONSILER:{
50+
RECONSILER: {
4751
RECONSILER_START: process.env.RECONSILER_START || 5,
4852
RECONSILER_END: process.env.RECONSILER_END || 1,
4953
RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm'
5054
},
5155
DYNAMODB:
52-
{
53-
DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync',
54-
DD_ElapsedTime: process.env.DD_ElapsedTime || 600000
55-
},
56+
{
57+
DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync',
58+
DD_ElapsedTime: process.env.DD_ElapsedTime || 600000
59+
},
5660

57-
AUTH0_URL: process.env.AUTH0_URL ,
58-
AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE ,
59-
TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME ,
60-
AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID ,
61-
AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET ,
62-
BUSAPI_URL : process.env.BUSAPI_URL ,
63-
KAFKA_ERROR_TOPIC : process.env.KAFKA_ERROR_TOPIC ,
64-
AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL
61+
AUTH0_URL: process.env.AUTH0_URL,
62+
AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE,
63+
TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME,
64+
AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID,
65+
AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET,
66+
BUSAPI_URL: process.env.BUSAPI_URL,
67+
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC,
68+
AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL
6569
}

src/consumer.js

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,27 @@ const healthcheck = require('topcoder-healthcheck-dropin');
1010
const auditTrail = require('./services/auditTrail');
1111
const kafkaOptions = config.get('KAFKA')
1212
const postMessage = require('./services/posttoslack')
13-
const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
14-
const consumer = new Kafka.SimpleConsumer({
13+
//const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
14+
15+
const options = {
16+
groupId: kafkaOptions.KAFKA_GROUP_ID,
17+
connectionString: kafkaOptions.KAFKA_URL,
18+
ssl: {
19+
cert: kafkaOptions.KAFKA_CLIENT_CERT,
20+
key: kafkaOptions.KAFKA_CLIENT_CERT_KEY
21+
}
22+
};
23+
const consumer = new Kafka.GroupConsumer(options);
24+
25+
/*const consumer = new Kafka.SimpleConsumer({
1526
connectionString: kafkaOptions.brokers_url,
1627
...(isSslEnabled && { // Include ssl options if present
1728
ssl: {
1829
cert: kafkaOptions.SSL.cert,
1930
key: kafkaOptions.SSL.key
2031
}
2132
})
22-
})
33+
})*/
2334

2435
const check = function () {
2536
if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {

0 commit comments

Comments
 (0)