From 3776238e924a53eeb024ed783c0fdec93f654e00 Mon Sep 17 00:00:00 2001 From: Afrisal Yodi Purnama Date: Fri, 8 Nov 2019 10:54:51 +0700 Subject: [PATCH] Change simple consumer to group consumer. --- README.md | 1 + config/default.js | 1 + configuration.md | 1 + utils/kafka.js | 8 +++++--- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 6fa1aa1..c64ee38 100755 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ For using with SSL, the options should be as ``` { connectionString: '', + groupId: , ssl: { cert: '', key: '' diff --git a/config/default.js b/config/default.js index eaa6672..201b0e9 100644 --- a/config/default.js +++ b/config/default.js @@ -18,6 +18,7 @@ module.exports = { TOPIC: process.env.TOPIC || 'tc-x-events', KAFKA_OPTIONS: { connectionString: process.env.KAFKA_URL || 'localhost:9092', + groupId: process.env.KAFKA_GROUP_ID || 'topcoder-x-processor', ssl: { cert: process.env.KAFKA_CLIENT_CERT || fs.readFileSync('./kafka_client.cer'), // eslint-disable-line no-sync key: process.env.KAFKA_CLIENT_CERT_KEY || fs.readFileSync('./kafka_client.key'), // eslint-disable-line no-sync diff --git a/configuration.md b/configuration.md index 8e39cff..57468d2 100644 --- a/configuration.md +++ b/configuration.md @@ -9,6 +9,7 @@ The following config parameters are supported, they are defined in `config/defau |TOPIC | The Kafka topic where events are published. This must be the same as the configured value for topcoder-x-processor| | |KAFKA_OPTIONS | Kafka connection options| | |KAFKA_URL | The Kafka host to connect to| localhost:9092 | +|KAFKA_GROUP_ID | The Kafka group id name| topcoder-x-processor | |KAFKA_CLIENT_CERT | The Kafka SSL certificate to use when connecting| Read from kafka_client.cer file, but this can be set as a string like it is on Heroku | |KAFKA_CLIENT_CERT_KEY | The Kafka SSL certificate key to use when connecting| Read from kafka_client.key file, but this can be set as a string like it is on Heroku| |TC_DEV_ENV| the flag whether to use topcoder development api or production| false| diff --git a/utils/kafka.js b/utils/kafka.js index d70189b..60f477a 100644 --- a/utils/kafka.js +++ b/utils/kafka.js @@ -21,7 +21,7 @@ const logger = require('./logger'); class Kafka { constructor() { - this.consumer = new kafka.SimpleConsumer(config.KAFKA_OPTIONS); + this.consumer = new kafka.GroupConsumer(config.KAFKA_OPTIONS); this.producer = new kafka.Producer(config.KAFKA_OPTIONS); this.producer.init().then(() => { @@ -77,10 +77,12 @@ class Kafka { } run() { - this.consumer.init().then(() => { + this.consumer.init([{ + subscriptions: [config.TOPIC], + handler: this.messageHandler + }]).then(() => { logger.info('kafka consumer is ready'); healthcheck.init([this.check]); - this.consumer.subscribe(config.TOPIC, {}, this.messageHandler); }).catch((err) => { logger.error(`kafka consumer is not connected. ${err.stack}`); });