Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 4f29ba0

Browse files
authored
Merge pull request #25 from afrisalyp/issue-221
Change simple consumer to group consumer.
2 parents d88f764 + 3776238 commit 4f29ba0

File tree

4 files changed

+8
-3
lines changed

4 files changed

+8
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ For using with SSL, the options should be as
5757
```
5858
{
5959
connectionString: '<server>',
60+
groupId: <groupid>,
6061
ssl: {
6162
cert: '<certificate>',
6263
key: '<key>'

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ module.exports = {
1818
TOPIC: process.env.TOPIC || 'tc-x-events',
1919
KAFKA_OPTIONS: {
2020
connectionString: process.env.KAFKA_URL || 'localhost:9092',
21+
groupId: process.env.KAFKA_GROUP_ID || 'topcoder-x-processor',
2122
ssl: {
2223
cert: process.env.KAFKA_CLIENT_CERT || fs.readFileSync('./kafka_client.cer'), // eslint-disable-line no-sync
2324
key: process.env.KAFKA_CLIENT_CERT_KEY || fs.readFileSync('./kafka_client.key'), // eslint-disable-line no-sync

configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The following config parameters are supported, they are defined in `config/defau
99
|TOPIC | The Kafka topic where events are published. This must be the same as the configured value for topcoder-x-processor| |
1010
|KAFKA_OPTIONS | Kafka connection options| |
1111
|KAFKA_URL | The Kafka host to connect to| localhost:9092 |
12+
|KAFKA_GROUP_ID | The Kafka group id name| topcoder-x-processor |
1213
|KAFKA_CLIENT_CERT | The Kafka SSL certificate to use when connecting| Read from kafka_client.cer file, but this can be set as a string like it is on Heroku |
1314
|KAFKA_CLIENT_CERT_KEY | The Kafka SSL certificate key to use when connecting| Read from kafka_client.key file, but this can be set as a string like it is on Heroku|
1415
|TC_DEV_ENV| the flag whether to use topcoder development api or production| false|

utils/kafka.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ const logger = require('./logger');
2121

2222
class Kafka {
2323
constructor() {
24-
this.consumer = new kafka.SimpleConsumer(config.KAFKA_OPTIONS);
24+
this.consumer = new kafka.GroupConsumer(config.KAFKA_OPTIONS);
2525

2626
this.producer = new kafka.Producer(config.KAFKA_OPTIONS);
2727
this.producer.init().then(() => {
@@ -77,10 +77,12 @@ class Kafka {
7777
}
7878

7979
run() {
80-
this.consumer.init().then(() => {
80+
this.consumer.init([{
81+
subscriptions: [config.TOPIC],
82+
handler: this.messageHandler
83+
}]).then(() => {
8184
logger.info('kafka consumer is ready');
8285
healthcheck.init([this.check]);
83-
this.consumer.subscribe(config.TOPIC, {}, this.messageHandler);
8486
}).catch((err) => {
8587
logger.error(`kafka consumer is not connected. ${err.stack}`);
8688
});

0 commit comments

Comments
 (0)