Skip to content

Backend websocket support #151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ workflows:
context : org-global
filters:
branches:
only: [dev, 'hotfix/V5-API-Standards', 'v5-upgrade']
only: [dev, 'feature/websocket']
- "build-prod":
context : org-global
filters:
Expand Down
4 changes: 4 additions & 0 deletions Consumer-Verification.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ info: Successfully sent notifications.action.email.connect.project.notifications
error: Failed to send email to user id: 5, handle: handle5
...
info: Saved 8 notifications for users: 1, 2, 3, 4, 5, 6, 7, 8
info: Going to push 8 notifications to websocket.
info: Pushed 8 notifications to websocket
info: Handler handleChallengeCreated was run successfully
```

Expand Down Expand Up @@ -131,6 +133,8 @@ info: Successfully sent notifications.action.email.connect.project.notifications
error: Failed to send email to user id: 5, handle: handle5
...
info: Saved 8 notifications for users: 1, 2, 3, 4, 5, 6, 7, 8
info: Going to push 8 notifications to websocket.
info: Pushed 8 notifications to websocket
info: Handler handleChallengePhaseWarning was run successfully
```

Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ The following parameters can be set in config files or in env variables:
- `AUTH0_PROXY_SERVER_URL`: auth0 proxy server URL
- **Consumer handlers**
- `KAFKA_CONSUMER_HANDLERS`: mapping from consumer topic to handlers
- **Consumer websocket support**
- `WS_MAX_MESSAGE_COUNT`: Maximum number of messages kept in memory per topic
- `WS_PORT`: Port to expose websocket endpoint
- `WS_ZLIB_DEFLATE_CHUNK_SIZE`: Gzip Deflate chunk size
- `WS_ZLIB_DEFLATE_MEM_LEVEL`: Gzip and Deflate compression memory level
- `WS_ZLIB_INFLATE_CHUNK_SIZE`: Gzip Inflate chunk size
- `WS_CLIENT_NO_CONTEXT_TAKEOVER`: Acknowledge disabling of client context takeover.
- `WS_SERVER_NO_CONTEXT_TAKEOVER`: Whether to use context takeover or not.
- `WS_SERVER_MAX_WINDOW_BITS`: Gzip window bits
- `WS_CONCURRENCY_LIMIT`: The number of concurrent calls to zlib. Calls above this limit will be queued
- `WS_BYTES_THRESHOLD`: Payloads smaller than this will not be compressed.
- **Email notification**
- `ENV`: used to construct email category
- `ENABLE_EMAILS`: whether to enable email notifications
Expand Down
153 changes: 153 additions & 0 deletions Websocket-Verification.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# TOPCODER NOTIFICATIONS - WEBSOCKET VERIFICATION

## Local Setup

Use local docker-compse file under test. Docker-compose file contains
- Zookeeper
- Kafka
- Postgresql 9.6

It maps port 9092 and 5432 to your local machine for kafka and database.

```
cd test
docker-compose up -d
```

## Create topics and database

```
cd test
$ ./create-topics.sh
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.

```
or run each create topic command manually

```
cd test
docker-compose exec kafka opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic challenge.notification.events
docker-compose exec kafka opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic notifications.autopilot.events
docker-compose exec kafka opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic submission.notification.create
docker-compose exec kafka opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic dummy.topic
```

Set below environment variables

```
export LOG_LEVEL=debug
export DATABASE_URL=postgres://postgres:postgres@localhost:5432/postgres
export KAFKA_URL=localhost:9092
export KAFKA_GROUP_ID=tc-notifications
export ENV=test
export [email protected]
export [email protected]
export AUTH0_CLIENT_ID=dummy
export AUTH0_CLIENT_SECRET=dummy
export AUTH0_URL=dummy
export AUTH0_AUDIENCE=dummy
export AUTH_SECRET=secret
export VALID_ISSUERS=[\"abc.com\"]
```

- install dependencies `npm install`
- run code lint check `npm run lint`
- create db tables if not present `node test/init-db`, this is needed only for local test

Update KAFKA_CONSUMER_RULESETS default config with dummy topic and consumer

```
'dummy.topic': [
{
handleDummy: {
roles: ['Topcoder User'],
},
},
],
```


- start notification consumer `npm run startConsumer`

## Verification

Open the sample html page under test/websocket.html.

- Click : Run WebSocket, you will see below messages in order.
```
WebSocket is supported by your Browser!
Token is sent...
Subscribed to topic watch for the messages..
```
And in the html page you will see

```
1-{"full":true,"topic":"dummy.topic","messages":[]}
```

Also you can check server logs for below. There is a custom generated token in the html page. You will see role of connected user.
The token will be masked in the logs for security reasons
```
debug: web socket connected
debug: web socket message: token:eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlcyI6WyJUb3Bjb2RlciBVc2VyIiwiYWRtaW5pc3RyYXRvciJdLCJpc3MiOiJhYmMuY29tIiwiaGFuZGxlIjoic2FjaGluIiwidXNlcklkIjoiMTAwIiwiZW1haWwiOiJhYmMuY29tIiwianRpIjoiMTdiYzc1Y2EtNmI2Yi00NzIyLWFlMzMtMzQ2NTg4YzlmZjJhIiwiaWF0IjoxNTY3NjgwNjU2LCJleHAiOjE1NzU0NTY2NTZ9.0Lo-t422h7n-Jmt_8qnTK81lmBiFVRWld8kYR2VeKr8
debug: web socket authorized with roles: Topcoder User,administrator
debug: web socket message: {"topic":"dummy.topic","count":5}
```

Topic items are empty in application now. We will put a sample message

Connect to console producer
```
cd test
docker-compose exec kafka opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dummy.topic
```

- Put a sample message
```
{ "topic": "dummy.topic", "originator": "tc-autopilot", "timestamp": "2018-02-16T00:00:00", "mime-type": "application/json", "payload": { "challengeId": 30054674, "challengeTitle": "test", "challengeUrl": "http://www.topcoder.com/123", "phase": "Submission", "remainingTime": 12345, "userId": 8547899, "initiatorUserId": 123, "type": "UPDATE_DRAFT_CHALLENGE" , "data" : {"id" : "30055164" }} }
```

- You will see logging in the app console:

```
info: Handle Kafka event message; Topic: dummy.topic; Partition: 0; Offset: 1; Message: { "topic": "dummy.topic", "originator": "tc-autopilot", "timestamp": "2018-02-16T00:00:00", "mime-type": "application/json", "payload": { "challengeId": 30054674, "challengeTitle": "test", "challengeUrl": "http://www.topcoder.com/123", "phase": "Submission", "remainingTime": 12345, "userId": 8547899, "initiatorUserId": 123, "type": "UPDATE_DRAFT_CHALLENGE" , "data" : {"id" : "30055164" }} }.
info: Run handler handleDummy
info: Going to insert 1 notifications in database.
Executing (default): INSERT INTO "Notifications" ("id","userId","type","contents","read","seen","version","createdAt","updatedAt") VALUES (DEFAULT,123,'dummy.topic','{"topic":"dummy.topic","originator":"tc-autopilot","timestamp":"2018-02-16T00:00:00","mime-type":"application/json","payload":{"challengeId":30054674,"challengeTitle":"test","challengeUrl":"http://www.topcoder.com/123","phase":"Submission","remainingTime":12345,"userId":8547899,"initiatorUserId":123,"type":"UPDATE_DRAFT_CHALLENGE","data":{"id":"30055164"}}}',false,false,NULL,'2019-09-09 19:14:38.349 +00:00','2019-09-09 19:14:38.349 +00:00');
info: Saved 1 notifications
info: Going to push 1 notifications to websocket.
info: Pushed 1 notifications to websocket
info: Handler handleDummy executed successfully

```

Now please check the websocket.html again and see that new message is inserted.

```
{"full":false,"topic":"dummy.topic","messages":[[{"userId":123,"notification":{"topic":"dummy.topic","originator":"tc-autopilot","timestamp":"2018-02-16T00:00:00","mime-type":"application/json","payload":{"challengeId":30054674,"challengeTitle":"test","challengeUrl":"http://www.topcoder.com/123","phase":"Submission","remainingTime":12345,"userId":8547899,"initiatorUserId":123,"type":"UPDATE_DRAFT_CHALLENGE","data":{"id":"30055164"}}}}]]}
```

Refresh the page and connect to websocket again. You should see that previous messages are published

```
{"full":true,"topic":"dummy.topic","messages":[[{"userId":123,"notification":{"topic":"dummy.topic","originator":"tc-autopilot","timestamp":"2018-02-16T00:00:00","mime-type":"application/json","payload":{"challengeId":30054674,"challengeTitle":"test","challengeUrl":"http://www.topcoder.com/123","phase":"Submission","remainingTime":12345,"userId":8547899,"initiatorUserId":123,"type":"UPDATE_DRAFT_CHALLENGE","data":{"id":"30055164"}}}}]]}
```

## Database Verification

Use some PostgreSQL client to connect to the database. Credentials are same as below

- Username : postgres
- Password : postgres
- Hostname : localhost
- Post : 5432
- Database : postgres

```
select notifications: select * from "Notifications";
```

Then you will see generated notification records same as browser notifications
7 changes: 6 additions & 1 deletion config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ module.exports = {
idle: 10000,
},
},

AUTH_SECRET: process.env.AUTH_SECRET,
VALID_ISSUERS: process.env.VALID_ISSUERS ? process.env.VALID_ISSUERS.replace(/\\"/g, '') : null,
// keep it here for dev purposes, it's only needed by modified version of tc-core-library-js
Expand Down Expand Up @@ -98,6 +97,12 @@ module.exports = {
},
},
],
'test.sachin': [
{
handleDummy:
{}
},
],
//'notifications.community.challenge.created': ['handleChallengeCreated'],
//'notifications.community.challenge.phasewarning': ['handleChallengePhaseWarning'],
},
Expand Down
2 changes: 1 addition & 1 deletion connect/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ module.exports = {
DEFAULT_REPLY_EMAIL: process.env.DEFAULT_REPLY_EMAIL,

CONNECT_URL: process.env.CONNECT_URL || 'https://connect.topcoder-dev.com',
ACCOUNTS_APP_URL: process.env.ACCOUNTS_APP_URL || "https://accounts.topcoder-dev.com",
ACCOUNTS_APP_URL: process.env.ACCOUNTS_APP_URL || 'https://accounts.topcoder-dev.com',
};
7 changes: 3 additions & 4 deletions connect/connectNotificationServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ const getNotificationsForMentionedUser = (logger, eventConfig, content) => {
_.forEach(notifications, (notification) => {
const mentionedUser = _.find(users, { handle: notification.userHandle });
notification.userId = mentionedUser ? mentionedUser.userId.toString() : null;
if (!notification.userId && logger) {// such notifications would be discarded later after aggregation
if (!notification.userId && logger) { // such notifications would be discarded later after aggregation
logger.info(`Unable to find user with handle ${notification.userHandle}`);
}
});
Expand All @@ -102,12 +102,11 @@ const getNotificationsForMentionedUser = (logger, eventConfig, content) => {
logger.error(error);
logger.info('Unable to send notification to mentioned user');
}
//resolves with empty notification which essentially means we are unable to send notification to mentioned user
// resolves with empty notification which essentially means we are unable to send notification to mentioned user
return Promise.resolve([]);
});
} else {
return Promise.resolve([]);
}
return Promise.resolve([]);
};

/**
Expand Down
45 changes: 43 additions & 2 deletions consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ const healthcheck = require('topcoder-healthcheck-dropin');
const logger = require('./src/common/logger');
const models = require('./src/models');
const processors = require('./src/processors');
const notificationStreamWS = require('./src/notificationStreamWS');
const http = require('http');
const express = require('express');


/**
Expand All @@ -24,6 +27,8 @@ function startKafkaConsumer() {
options.ssl = { cert: config.KAFKA_CLIENT_CERT, key: config.KAFKA_CLIENT_CERT_KEY };
}
const consumer = new Kafka.GroupConsumer(options);
// Setup websocket server
logger.debug('Setting ws socket');

// data handler
const messageHandler = (messageSet, topic, partition) => Promise.each(messageSet, (m) => {
Expand Down Expand Up @@ -80,16 +85,34 @@ function startKafkaConsumer() {
if (notifications && notifications.length > 0) {
// save notifications in bulk to improve performance
logger.info(`Going to insert ${notifications.length} notifications in database.`);
let wsData = [];
yield models.Notification.bulkCreate(_.map(notifications, (n) => ({
userId: n.userId,
type: n.type || topic,
contents: n.contents || n.notification || messageJSON.payload || {},
read: false,
seen: false,
version: n.version || null,
})));
})), { returning: true })
.then((result) => {
_.each(result, (model) => {
const item = model.toJSON();
wsData.push(item);
});
})
.catch((errors) => {
logger.logFullError(errors);
})
// logging
logger.info(`Saved ${notifications.length} notifications`);
logger.info(`Going to push ${notifications.length} notifications to websocket.`);

// Trigger websocket notifications
if (wsData.length > 0) {
yield notificationStreamWS.pushNotifications(topic, wsData, handlerRuleSets);
logger.info(`Pushed ${wsData.length} notifications to websocket`);
}

/* logger.info(` for users: ${
_.map(notifications, (n) => n.userId).join(', ')
}`); */
Expand Down Expand Up @@ -132,12 +155,30 @@ function startKafkaConsumer() {
}])
.then(() => {
logger.info('Kafka consumer initialized successfully');
healthcheck.init([check]);
//healthcheck.init([check]); // checking in middleware
})
.catch((err) => {
logger.error('Kafka consumer failed');
logger.logFullError(err);
});

// setup websocket server
const app = express();
app.set('port', config.PORT);
app.use(healthcheck.middleware([check]));
//app.use('/ws-check', express.static('./docs/ws-check.html'));
app.use((req, res) => {
res.status(404).json({ error: 'route not found' });
});
app.use((err, req, res) => {
logger.logFullError(err);
res.status(400).json({ error: err.message });
});

const server = http.createServer(app);
notificationStreamWS.setup(server);
server.listen(app.get('port'));
logger.info(`Websocket server listening on port ${app.get('port')}`);
}

startKafkaConsumer();
Loading