Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Change simple consumer to group consumer. #25

Merged
merged 1 commit into from
Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ For using with SSL, the options should be as
```
{
connectionString: '<server>',
groupId: <groupid>,
ssl: {
cert: '<certificate>',
key: '<key>'
Expand Down
1 change: 1 addition & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module.exports = {
TOPIC: process.env.TOPIC || 'tc-x-events',
KAFKA_OPTIONS: {
connectionString: process.env.KAFKA_URL || 'localhost:9092',
groupId: process.env.KAFKA_GROUP_ID || 'topcoder-x-processor',
ssl: {
cert: process.env.KAFKA_CLIENT_CERT || fs.readFileSync('./kafka_client.cer'), // eslint-disable-line no-sync
key: process.env.KAFKA_CLIENT_CERT_KEY || fs.readFileSync('./kafka_client.key'), // eslint-disable-line no-sync
Expand Down
1 change: 1 addition & 0 deletions configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The following config parameters are supported, they are defined in `config/defau
|TOPIC | The Kafka topic where events are published. This must be the same as the configured value for topcoder-x-processor| |
|KAFKA_OPTIONS | Kafka connection options| |
|KAFKA_URL | The Kafka host to connect to| localhost:9092 |
|KAFKA_GROUP_ID | The Kafka group id name| topcoder-x-processor |
|KAFKA_CLIENT_CERT | The Kafka SSL certificate to use when connecting| Read from kafka_client.cer file, but this can be set as a string like it is on Heroku |
|KAFKA_CLIENT_CERT_KEY | The Kafka SSL certificate key to use when connecting| Read from kafka_client.key file, but this can be set as a string like it is on Heroku|
|TC_DEV_ENV| the flag whether to use topcoder development api or production| false|
Expand Down
8 changes: 5 additions & 3 deletions utils/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const logger = require('./logger');

class Kafka {
constructor() {
this.consumer = new kafka.SimpleConsumer(config.KAFKA_OPTIONS);
this.consumer = new kafka.GroupConsumer(config.KAFKA_OPTIONS);

this.producer = new kafka.Producer(config.KAFKA_OPTIONS);
this.producer.init().then(() => {
Expand Down Expand Up @@ -77,10 +77,12 @@ class Kafka {
}

run() {
this.consumer.init().then(() => {
this.consumer.init([{
subscriptions: [config.TOPIC],
handler: this.messageHandler
}]).then(() => {
logger.info('kafka consumer is ready');
healthcheck.init([this.check]);
this.consumer.subscribe(config.TOPIC, {}, this.messageHandler);
}).catch((err) => {
logger.error(`kafka consumer is not connected. ${err.stack}`);
});
Expand Down