Skip to content
This repository was archived by the owner on Mar 12, 2025. It is now read-only.

Commit afa8620

Browse files
Merge pull request #3 from topcoder-platform/30065431
Merge winning submission
2 parents 98e7ea8 + e3f292b commit afa8620

25 files changed

+447
-157
lines changed

README.md

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
## Dependencies
44

55
- nodejs https://nodejs.org/en/ (v8+)
6-
6+
- Kafka v2.11+
77

88

99
## Configuration
@@ -13,12 +13,16 @@ The following parameters can be set in config files or in env variables:
1313
- LOG_LEVEL: the log level
1414
- PORT: the server port
1515
- KAFKA_OPTIONS: Kafka consumer options, see https://www.npmjs.com/package/no-kafka for available options
16+
- MAX_MESSAGE_COUNT: max message count to cache per topic
1617

1718
For the Kafka connection options:
1819

1920
- connectionString is comma delimited list of initial brokers list
2021
- secure connection may be achieved via ssl field, see https://www.npmjs.com/package/no-kafka#ssl for details
2122

23+
For front end config, see ui/README.md.
24+
25+
2226
## Local Kafka setup
2327

2428
- `http://kafka.apache.org/quickstart` contains details to setup and manage Kafka server,
@@ -33,6 +37,10 @@ For the Kafka connection options:
3337
- note that the zookeeper server is at localhost:2181, and Kafka server is at localhost:9092
3438
- use another terminal, go to same directory, create a topic:
3539
`bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic challenge.notification.create`
40+
similarly, you may create more topics, e.g.
41+
`bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1`
42+
`bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2`
43+
`bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic3`
3644
- verify that the topic is created:
3745
`bin/kafka-topics.sh --list --zookeeper localhost:2181`,
3846
it should list out the created topics
@@ -63,20 +71,35 @@ For the Kafka connection options:
6371

6472
## Heroku Deployment
6573

74+
- heroku login
75+
- heroku create
76+
- heroku config:set KAFKA_URL=some-public-kafka-url
77+
- heroku config:set API_URL=API-URL-according-to-the-heroku-app-URL
78+
- heroku config:set WS_URL=web-socket-URL-according-to-the-heroku-app-URL
6679
- git init
6780
- git add .
6881
- git commit -m message
69-
- heroku create
70-
- heroku config:set KAFKA_CONSUMER_URL=some-public-kafka-url
71-
- heroku config:set KAFKA_PRODUCER_URL=some-public-kafka-url
7282
- git push heroku master
7383

7484
## Verification
7585

7686
- setup stuff following above deployment
7787
- in the UI, select a topic to view topic data stream
7888
- use the kafka-console-producer to generate some messages as above,
79-
then watch the UI, it should got some messages
89+
then watch the UI, it should get some messages
8090
- filter the messages and see results
8191
- use the UI to post message to Kafka, see above for example message, the data stream table should also show the posted message
8292
- you may also use the above kafka-console-consumer to view the Kafka messages
93+
94+
95+
## Notes
96+
97+
- To keep the web socket connection alive, the following approaches are used:
98+
(a) the server will handle both `error` and `close` events to terminate the web socket connection,
99+
so that client side will re-start a new connection
100+
(b) client side will handle onerror and onclose to re-start a new connection to server
101+
- The get/view topic, send message operations will show loading indicator,
102+
but they are too fast because local back end is used, and especially for get/view topic operations web socket is used,
103+
you may hardly see the indicator
104+
105+

config/default.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,6 @@ module.exports = {
1212
key: process.env.KAFKA_CLIENT_CERT_KEY,
1313
},
1414
},
15+
// max message count to cache per topic
16+
MAX_MESSAGE_COUNT: process.env.MAX_MESSAGE_COUNT || 10000,
1517
};

set-env.js

100644100755
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ const targetPath = './ui/src/config/config.js';
44

55
const envConfigFile = `
66
const config = {
7-
API_URL: '${process.env.API_URL}/api/v1',
8-
WS_URL: '${process.env.WS_URL}'
7+
API_URL: '${process.env.API_URL || 'http://localhost:4000'}/api/v1',
8+
WS_URL: '${process.env.WS_URL || 'ws://localhost:4000'}',
9+
DEFAULT_MESSAGE_COUNT: ${process.env.DEFAULT_MESSAGE_COUNT || 20}
910
};
1011
1112
export default config;

src/dataStreamWS.js

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,26 @@ const logger = require('./common/logger');
88
const WebSocket = require('ws');
99
const WebSocketServer = WebSocket.Server;
1010
const helper = require('./common/helper');
11+
const config = require('config');
1112

1213
// all web socket client data
1314
const allWS = [];
1415

16+
// all cached messages, key is topic, value is array of messages of the topic
17+
const allMessages = {};
18+
19+
// max cache message count per topic
20+
const maxMsgCount = Number(config.MAX_MESSAGE_COUNT);
21+
22+
// send data to client via web socket
23+
const sendData = (ws, payload) => {
24+
try {
25+
ws.send(JSON.stringify(payload));
26+
} catch (err) {
27+
logger.error(err);
28+
}
29+
};
30+
1531
/**
1632
* Setup web socket.
1733
*/
@@ -28,41 +44,76 @@ const setup = (server) => {
2844
};
2945
allWS.push(clientData);
3046

31-
// got message from client
47+
// got message from client, the message is string representation of JSON containing fields: topic and count,
48+
// where count is the last count of messages of the topic to retrieve
3249
ws.on('message', (message) => {
3350
logger.debug(`web socket message: ${message}`);
34-
if (message.startsWith('topic:') && message.length > 'topic:'.length) {
35-
clientData.topic = message.substring('topic:'.length);
36-
} else {
37-
logger.error(`invalid web socket message: ${message}`);
51+
let msgJSON;
52+
try {
53+
msgJSON = JSON.parse(message);
54+
} catch (err) {
55+
logger.err('invalid message', message, err);
56+
return;
3857
}
58+
clientData.topic = msgJSON.topic;
59+
const topicMsgs = allMessages[msgJSON.topic] || [];
60+
let startIndex = topicMsgs.length - msgJSON.count;
61+
if (startIndex < 0) startIndex = 0;
62+
const messages = topicMsgs.slice(startIndex);
63+
// the 'full' flag is true, indicating the messages are full latest messages for client side,
64+
// client side should clear the existing messages if any for the topic
65+
sendData(ws, { full: true, topic: msgJSON.topic, messages });
3966
});
4067

41-
// close event handler
42-
ws.on('close', () => {
68+
// terminate web socket
69+
const terminateWS = () => {
70+
if (clientData.terminated) {
71+
return;
72+
}
73+
clientData.terminated = true;
74+
4375
for (let i = 0; i < allWS.length; i += 1) {
4476
if (id === allWS[i].id) {
4577
// remove the current client data
4678
allWS.splice(i, 1);
4779
break;
4880
}
4981
}
82+
ws.close();
83+
};
84+
85+
// close event handler
86+
ws.on('close', () => {
5087
logger.debug('web socket closed');
88+
terminateWS();
89+
});
90+
91+
// error event handler
92+
ws.on('error', (err) => {
93+
logger.error('there is error for the web socket', err);
94+
terminateWS();
5195
});
5296
});
97+
98+
wss.on('error', (err) => {
99+
logger.error('there is error for the web socket server', err);
100+
});
53101
};
54102

55103
/**
56-
* Send message to all applicable web socket clients.
104+
* Send message to all applicable web socket clients. The message will be cached to be retrieved by clients.
57105
*/
58106
const sendMessage = (topic, message) => {
107+
// cache message
108+
if (!allMessages[topic]) allMessages[topic] = [];
109+
allMessages[topic].push(message);
110+
if (allMessages[topic].length > maxMsgCount) allMessages[topic].shift();
111+
112+
// send message to clients
59113
_.each(allWS, (clientData) => {
60114
if (topic === clientData.topic) {
61-
try {
62-
clientData.ws.send(message);
63-
} catch (e) {
64-
logger.error(e);
65-
}
115+
// the 'full' flag is false, indicating the message is to be added to client side
116+
sendData(clientData.ws, { full: false, topic, messages: [message] });
66117
}
67118
});
68119
};

src/services/DataStreamService.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,13 @@ function* getAllTopics() {
3838
topics.push(tp);
3939
// if the topic was not handled yet, then let the consumer handle the topic
4040
if (!consumers[tp]) {
41-
consumer.subscribe(tp, { time: Kafka.LATEST_OFFSET }, dataHandler);
41+
consumer.subscribe(tp, { time: Kafka.EARLIEST_OFFSET }, dataHandler);
4242
consumers[tp] = consumer;
4343
}
4444
}
4545
});
46+
// sort topics
47+
topics.sort();
4648
return topics;
4749
}
4850

test/datastream.test.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ describe('Topcoder Data Stream REST API Tests', () => {
1818
return done(err);
1919
}
2020
expect(res.body.length > 0).to.equal(true);
21+
// ensures the topics are sorted properly
22+
for (let i = 0; i + 1 < res.body.length; i += 1) {
23+
expect(res.body[i] < res.body[i + 1]).to.equal(true);
24+
}
2125
return done();
2226
});
2327
});

ui/.gitignore

100644100755
File mode changed.

ui/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The following parameters can be set in config file:
1313

1414
- API_URL: the API URL prefix
1515
- WS_URL: the Web Socket URL
16+
- DEFAULT_MESSAGE_COUNT: the default message count per topic
1617

1718
## Local deployment
1819

ui/create-react-app-README.md

100644100755
File mode changed.

ui/package-lock.json

100644100755
File mode changed.

ui/package.json

100644100755
File mode changed.

ui/public/favicon.ico

100644100755
-2.66 KB
Binary file not shown.

ui/public/index.html

100644100755
File mode changed.

ui/public/manifest.json

100644100755
File mode changed.

ui/src/App.css

100644100755
Lines changed: 84 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,103 @@
1-
.App {
2-
text-align: center;
1+
.app {
2+
width: 100%;
3+
}
4+
5+
.left-nav {
6+
width: 300px;
7+
background-color: #444444;
8+
color: #EEEEEE;
9+
font-size: 13px;
310
}
411

5-
.App-logo {
6-
animation: App-logo-spin infinite 20s linear;
12+
.logo {
713
height: 80px;
14+
width: 100%;
15+
padding-left: 60px;
16+
padding-right: 60px;
17+
padding-top: 22px;
18+
padding-bottom: 22px;
19+
background-color: #333333;
820
}
921

10-
.App-header {
11-
background-color: #222;
12-
height: 150px;
13-
padding: 20px;
14-
color: white;
22+
.left-nav-item {
23+
padding-left: 30px;
24+
margin-top: 30px;
25+
margin-bottom: 20px;
26+
}
27+
28+
.topic-select {
29+
width: 65%;
30+
display: inline-block;
31+
margin-right: 10px;
1532
}
1633

17-
.App-title {
18-
font-size: 1.5em;
34+
.filter {
35+
padding-left: 0px;
36+
text-align: center;
37+
font-style: italic;
1938
}
2039

21-
.App-item {
22-
font-size: large;
23-
margin: 10px;
40+
.filter-control {
41+
width: 90%;
2442
}
2543

26-
.topic-select {
27-
width: 50%;
28-
display: inline-block;
29-
margin: 10px;
44+
.timestamp-control {
45+
color: #000000;
3046
}
3147

32-
.label {
33-
width: 150px;
34-
display: inline-block;
48+
.apply-button {
3549
margin-left: 30px;
36-
margin-right: 10px;
3750
}
3851

39-
.filter {
40-
width: 250px;
41-
display: inline-block;
52+
.main-content {
53+
background-color: #F5F5F5;
54+
padding: 20px;
55+
width: calc(100% - 300px);
56+
}
57+
58+
.error-msg {
59+
color: red;
60+
margin-bottom: 30px;
61+
}
62+
63+
.message-input {
64+
width: 75%;
65+
margin-right: 20px;
66+
display: inline;
67+
}
68+
69+
.extra-message-count {
70+
width: 100px;
71+
display: inline;
72+
}
73+
74+
.view-button {
75+
vertical-align: top;
76+
}
77+
78+
.get-button {
79+
vertical-align: top;
80+
margin-left: 10px;
81+
}
82+
83+
.send-message-button {
84+
vertical-align: text-bottom;
85+
}
86+
87+
.loading-img-container {
88+
position: fixed;
89+
top: 0;
90+
bottom: 0;
91+
left: 0;
92+
right: 0;
93+
background: rgba(0, 0, 0, 0.8);
94+
z-index: 9999;
95+
opacity: 0.2;
4296
}
4397

44-
@keyframes App-logo-spin {
45-
from { transform: rotate(0deg); }
46-
to { transform: rotate(360deg); }
98+
.loading-img {
99+
position: fixed;
100+
top: 50%;
101+
left: 50%;
102+
z-index: 99999;
47103
}

0 commit comments

Comments
 (0)