Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 3776238

Browse files
committed
Change simple consumer to group consumer.
1 parent 49a9159 commit 3776238

File tree

4 files changed

+8
-3
lines changed

4 files changed

+8
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ For using with SSL, the options should be as
6060
```
6161
{
6262
connectionString: '<server>',
63+
groupId: <groupid>,
6364
ssl: {
6465
cert: '<certificate>',
6566
key: '<key>'

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ module.exports = {
1818
TOPIC: process.env.TOPIC || 'tc-x-events',
1919
KAFKA_OPTIONS: {
2020
connectionString: process.env.KAFKA_URL || 'localhost:9092',
21+
groupId: process.env.KAFKA_GROUP_ID || 'topcoder-x-processor',
2122
ssl: {
2223
cert: process.env.KAFKA_CLIENT_CERT || fs.readFileSync('./kafka_client.cer'), // eslint-disable-line no-sync
2324
key: process.env.KAFKA_CLIENT_CERT_KEY || fs.readFileSync('./kafka_client.key'), // eslint-disable-line no-sync

configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The following config parameters are supported, they are defined in `config/defau
99
|TOPIC | The Kafka topic where events are published. This must be the same as the configured value for topcoder-x-processor| |
1010
|KAFKA_OPTIONS | Kafka connection options| |
1111
|KAFKA_URL | The Kafka host to connect to| localhost:9092 |
12+
|KAFKA_GROUP_ID | The Kafka group id name| topcoder-x-processor |
1213
|KAFKA_CLIENT_CERT | The Kafka SSL certificate to use when connecting| Read from kafka_client.cer file, but this can be set as a string like it is on Heroku |
1314
|KAFKA_CLIENT_CERT_KEY | The Kafka SSL certificate key to use when connecting| Read from kafka_client.key file, but this can be set as a string like it is on Heroku|
1415
|TC_DEV_ENV| the flag whether to use topcoder development api or production| false|

utils/kafka.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ const logger = require('./logger');
2121

2222
class Kafka {
2323
constructor() {
24-
this.consumer = new kafka.SimpleConsumer(config.KAFKA_OPTIONS);
24+
this.consumer = new kafka.GroupConsumer(config.KAFKA_OPTIONS);
2525

2626
this.producer = new kafka.Producer(config.KAFKA_OPTIONS);
2727
this.producer.init().then(() => {
@@ -77,10 +77,12 @@ class Kafka {
7777
}
7878

7979
run() {
80-
this.consumer.init().then(() => {
80+
this.consumer.init([{
81+
subscriptions: [config.TOPIC],
82+
handler: this.messageHandler
83+
}]).then(() => {
8184
logger.info('kafka consumer is ready');
8285
healthcheck.init([this.check]);
83-
this.consumer.subscribe(config.TOPIC, {}, this.messageHandler);
8486
}).catch((err) => {
8587
logger.error(`kafka consumer is not connected. ${err.stack}`);
8688
});

0 commit comments

Comments
 (0)