Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

fixes for https://github.com/topcoder-platform/topcoder-x-ui/issues/21 #3

Merged
merged 1 commit into from
Jun 22, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -45,12 +45,12 @@ The following config parameters are supported, they are defined in `config/defau
|FIX_ACCEPTED_ISSUE_LABEL|the label name for fix accepted, should be one of the label configured in topcoder x ui|'Fix Accepted'|
|TC_OR_DETAIL_LINK|the link to online review detail of challenge| see `default.js`, OR link for dev environment|

KAFKA_OPTIONS should be object as described in https://github.com/SOHU-Co/kafka-node#kafkaclient
KAFKA_OPTIONS should be object as described in https://github.com/oleksiyk/kafka#ssl
For using with SSL, the options should be as
```
{
kafkaHost: '<server>',
sslOptions: {
connectionString: '<server>',
ssl: {
cert: '<certificate>',
key: '<key>'
}
4 changes: 2 additions & 2 deletions config/default.js
Original file line number Diff line number Diff line change
@@ -17,8 +17,8 @@ module.exports = {
PARTITION: process.env.PARTITION || 0,
TOPIC: process.env.TOPIC || 'tc-x-events',
KAFKA_OPTIONS: {
kafkaHost: process.env.KAFKA_HOST || 'localhost:9092',
sslOptions: {
connectionString: process.env.KAFKA_HOST || 'localhost:9092',
ssl: {
cert: process.env.KAFKA_CLIENT_CERT || fs.readFileSync('./kafka_client.cer'), // eslint-disable-line no-sync
key: process.env.KAFKA_CLIENT_CERT_KEY || fs.readFileSync('./kafka_client.key'), // eslint-disable-line no-sync
}
2 changes: 1 addition & 1 deletion configuration.md
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ The following config parameters are supported, they are defined in `config/defau
|RETRY_INTERVAL| the interval at which the event should be retried to process in milliseconds | 120000|
|READY_FOR_REVIEW_ISSUE_LABEL| the label name for ready for review, should be one of the label configured in topcoder x ui|'Ready for review'|

KAFKA_OPTIONS should be object as described in https://github.com/SOHU-Co/kafka-node#kafkaclient
KAFKA_OPTIONS should be object as described in https://github.com/oleksiyk/kafka#ssl
For using with SSL, the options should be as
```
{
264 changes: 96 additions & 168 deletions package-lock.json
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -27,11 +27,11 @@
"github": "^12.0.2",
"joi": "^13.0.0",
"jwt-decode": "^2.2.0",
"kafka-node": "^2.6.1",
"lodash": "^4.17.4",
"markdown-it": "^8.4.0",
"moment": "^2.19.1",
"mongoose": "^4.12.3",
"no-kafka": "^3.2.10",
"node-gitlab-api": "^2.2.6",
"nodemailer": "^4.4.0",
"topcoder-api-challenges": "^1.0.6",
71 changes: 21 additions & 50 deletions utils/kafka.js
Original file line number Diff line number Diff line change
@@ -11,54 +11,32 @@
*/
'use strict';

const {promisify} = require('util');
const kafka = require('kafka-node');
const config = require('config');
const _ = require('lodash');
const kafka = require('no-kafka');
const IssueService = require('../services/IssueService');
const logger = require('./logger');

const Offset = kafka.Offset;

class Kafka {
constructor() {
this.client = new kafka.KafkaClient(config.KAFKA_OPTIONS);
this.consumer = new kafka.Consumer(this.client, [{topic: config.TOPIC, partition: config.PARTITION}], {autoCommit: true});
this.consumer.setOffset(config.TOPIC, 0, 0);
this.offset = new Offset(this.client);
this.producer = new kafka.Producer(this.client);
logger.info(`Connecting on topic: ${config.TOPIC}`);

this.sendAsync = promisify(this.producer.send).bind(this.producer);
}

run() {
this.consumer.on('error', (err) => {
logger.error(`ERROR ${err}`);
});
this.consumer = new kafka.SimpleConsumer(config.KAFKA_OPTIONS);

this.consumer.on('offsetOutOfRange', (topic) => {
logger.debug(`TOPIC ${topic}`);
logger.info('offset OutOfRange. resetting.');
this.offset.fetch([topic], (errOffsetFetch, offsets) => {
if (errOffsetFetch) {
logger.error(errOffsetFetch);
return console.error(errOffsetFetch);
}

const min = Math.min(offsets[topic.topic][topic.partition]);
logger.info(`Setting offset to ${min}`);
return this.consumer.setOffset(config.TOPIC, topic.partition, min);
});
this.producer = new kafka.Producer(config.KAFKA_OPTIONS);
this.producer.init().then(() => {
logger.info('kafka producer is ready.');
}).catch((err) => {
logger.error(`kafka producer is not connected. ${err.stack}`);
});
}

this.consumer.on('message', (message) => {
logger.info(`received message from kafka: ${message.value}`);
messageHandler(messageSet) {
messageSet.forEach((item) => {
logger.info(`received message from kafka: ${item.message.value.toString('utf8')}`);

// The event should be a JSON object
let event;
try {
event = JSON.parse(message.value);
event = JSON.parse(item.message.value.toString('utf8'));
} catch (err) {
logger.error(`"message" is not a valid JSON-formatted string: ${err.message}`);
return;
@@ -70,26 +48,19 @@ class Kafka {
.catch(logger.error);
}
});
this.consumer.on('ready', () => {
logger.info('kafka consumer is ready.');
});
this.producer.on('ready', () => {
logger.info('kafka producer is ready.');
}

this.producer.createTopics([config.TOPIC], true, (err) => {
if (err) {
logger.error(`error in creating topic: ${config.TOPIC}, error: ${err.stack}`);
} else {
logger.info(`kafka topic: ${config.TOPIC} is ready`);
}
});
});
this.producer.on('error', (err) => {
logger.error(`kafka is not connected. ${err.stack}`);
run() {
this.consumer.init().then(() => {
logger.info('kafka consumer is ready');
this.consumer.subscribe(config.TOPIC, {}, this.messageHandler);
}).catch((err) => {
logger.error(`kafka consumer is not connected. ${err.stack}`);
});
}

send(message) {
return this.sendAsync([{topic: config.TOPIC, messages: message}]);
return this.producer.send({topic: config.TOPIC, message: {value: message}});
}
}