Skip to content

aggregate topic #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jul 2, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -21,7 +21,12 @@ You can update the configuration file or set values to the corresponding environ
- `KAFKA_UPDATE_SUBMISSION_TOPIC` The update submission topic from which the app consumes events
- `SUBMISSION_API_URL` The Submission API URL
- `SUBMISSION_TIMEOUT` The Submission API timeout
- `DB_NAME` legacy database name 'dbname@db_server_name'
- `DB_POOL_SIZE` Pool size of database server, default to be 10
- `DB_SERVER` legacy database server, e.g 'informixoltp_tcp'
- `DB_NAME` legacy database name, e.g 'tcs_catalog'
- `DB_ID_NAME` legacy CommonOLTP database name, e.g 'common_oltp'
- `DB_HOST` legacy database host, e.g 'informix'
- `DB_SERVICE` legacy database service, e.g 'sqlexec'
- `DB_USERNAME` database username
- `DB_PASSWORD` database password
- `ID_SEQ_UPLOAD` upload database sequence
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ echo "================================"
echo "env set"
echo "initiating test"
echo "================================"
docker-compose -f ecs-docker-compose.yml up --build lsp-app-test
#docker-compose -f ecs-docker-compose.yml up --build lsp-app-test
echo "================================"
echo "test completed"
echo "================================"
29 changes: 27 additions & 2 deletions config/default.js
Original file line number Diff line number Diff line change
@@ -4,6 +4,8 @@
module.exports = {
LOG_LEVEL: process.env.LOG_LEVEL || 'debug',

KAFKA_CONCURRENCY: process.env.KAFKA_CONCURRENCY ? Number(process.env.KAFKA_CONCURRENCY) : 1,

// The client group ID for committing and fetching offsets.
// All clients sharing the same group ID belong to the same group.
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'tc-submission-legacy-processor',
@@ -17,6 +19,8 @@ module.exports = {
// The client cert key, can be (1) the path to the cert key file, or (2) the cert key content
KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY || './docker/kafka/kafka-ssl/client.key',

KAFKA_AGGREGATE_SUBMISSION_TOPIC: process.env.KAFKA_AGGREGATE_SUBMISSION_TOPIC || 'submission.notification.aggregate',

// The topic from which the app consumes events
KAFKA_NEW_SUBMISSION_TOPIC: process.env.KAFKA_NEW_SUBMISSION_TOPIC || 'submission.notification.create',

@@ -35,8 +39,29 @@ module.exports = {
// payload.types
PAYLOAD_TYPES: process.env.PAYLOAD_TYPES || 'bcf2b43b-20df-44d1-afd3-7fc9798dfcae',

// The Informix db pool size
DB_POOL_SIZE: process.env.DB_POOL_SIZE ? Number(process.env.DB_POOL_SIZE) : 10,

// The Informix Server Name
DB_SERVER: process.env.DB_SERVER || 'informixoltp_tcp',

DB_PORT: process.env.DB_PORT || '2020',

DB_PROTOCOL: process.env.DB_PROTOCOL || 'onsoctcp',

DB_LOCALE: process.env.DB_LOCALE || 'en_US.57372',

// The Informix Database Name
DB_NAME: process.env.DB_NAME || 'tcs_catalog@informixoltp_tcp',
DB_NAME: process.env.DB_NAME || 'tcs_catalog',

// The CommonOLTP Database Name
DB_ID_NAME: process.env.DB_ID_NAME || 'common_oltp',

// The Informix Host
DB_HOST: process.env.DB_HOST || 'informix',

// The Informix Service
DB_SERVICE: process.env.DB_SERVICE || 'sqlexec',

// The Informix Database Username
DB_USERNAME: process.env.DB_USERNAME || 'informix',
@@ -65,4 +90,4 @@ module.exports = {
CHALLENGE_INFO_API: process.env.CHALLENGE_INFO_API || 'http://mock-api-host:3000/challenges?filter=id={cid}', // {cid} gets replaced with challenge id

MM_CHALLENGE_SUBTRACK: process.env.MM_CHALLENGE_SUBTRACK || 'MARATHON_MATCH, DEVELOP_MARATHON_MATCH'
}
};
2 changes: 1 addition & 1 deletion config/production.js
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ module.exports = {
PAYLOAD_TYPES: process.env.PAYLOAD_TYPES || 'bcf2b43b-20df-44d1-afd3-7fc9798dfcae',

// The Informix Database Name
DB_NAME: process.env.DB_NAME || 'tcs_catalog@informixoltp_tcp',
DB_NAME: process.env.DB_NAME || 'tcs_catalog',

// The Informix Database Username
DB_USERNAME: process.env.DB_USERNAME || 'informix',
2 changes: 1 addition & 1 deletion config/staging.js
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ module.exports = {
PAYLOAD_TYPES: process.env.PAYLOAD_TYPES || 'bcf2b43b-20df-44d1-afd3-7fc9798dfcae',

// The Informix Database Name
DB_NAME: process.env.DB_NAME || 'tcs_catalog@informixoltp_tcp',
DB_NAME: process.env.DB_NAME || 'tcs_catalog',

// The Informix Database Username
DB_USERNAME: process.env.DB_USERNAME || 'informix',
2 changes: 1 addition & 1 deletion config/test.js
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ module.exports = {
PAYLOAD_TYPES: process.env.PAYLOAD_TYPES || 'bcf2b43b-20df-44d1-afd3-7fc9798dfcae',

// The Informix Database Name
DB_NAME: 'tcs_catalog@informixoltp_tcp',
DB_NAME: 'tcs_catalog',

// The Informix Database Username
DB_USERNAME: 'informix',
29 changes: 15 additions & 14 deletions docker/legacy-submission-processor/lspDockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ibmcom/informix-innovator-c:12.10.FC11IE
FROM appiriodevops/tc-database-scripts:base

ARG servername=informix
ARG port=2021
@@ -7,37 +7,38 @@ USER root
RUN mkdir /app
WORKDIR /home/informix

RUN apt-get update -qq && apt-get -qq install -y \
wget gcc g++ make xz-utils python2.7 git curl
RUN sed -i '/jessie-updates/d' /etc/apt/sources.list
RUN apt-get -qq update && apt-get -qq install -y \
wget gcc g++ make xz-utils python2.7 git

RUN wget -q -O node8.tar.xz https://nodejs.org/dist/v8.11.3/node-v8.11.3-linux-x64.tar.xz \
&& tar xfJ node8.tar.xz && rm -rf node8.tar.xz


ENV SERVERNAME=$servername
ENV SERVERPORT=$port

COPY docker/legacy-submission-processor/esql /opt/ibm/informix/bin/
RUN chmod +x /opt/ibm/informix/bin/esql
RUN echo "informixoltp_tcp onsoctcp $SERVERNAME sqlexec" \
> /opt/ibm/informix/etc/sqlhosts.informixoltp_tcp

ENV INFORMIXDIR /opt/ibm/informix
ENV INFORMIXDIR /opt/IBM/informix
ENV INFORMIX_HOME /home/informix
ENV INFORMIXSERVER informixoltp_tcp
ENV INFORMIXTERM terminfo
ENV CLIENT_LOCALE=en_US.utf8
ENV DB_LOCALE=en_US.utf8
ENV DBDATE Y4MD-
ENV DBDELIMITER "|"

COPY docker/legacy-submission-processor/esql ${INFORMIXDIR}/bin/
RUN chmod +x ${INFORMIXDIR}/bin/esql
RUN echo "informixoltp_tcp onsoctcp ${SERVERNAME:-informix} sqlexec" \
> ${INFORMIXDIR}/etc/sqlhosts.informixoltp_tcp
RUN echo "sqlexec 2021/tcp" >> /etc/services
RUN rm /usr/bin/python && ln -s /usr/bin/python2.7 /usr/bin/python
ENV PATH /home/informix/node-v8.11.3-linux-x64/bin:${INFORMIXDIR}/bin:${INFORMIXDIR}/lib:${INFORMIXDIR}/lib/esql:${PATH}
ENV LD_LIBRARY_PATH ${INFORMIXDIR}/lib:$INFORMIXDIR/lib/esql:$INFORMIXDIR/lib/tools
ENV INFORMIXSQLHOSTS /opt/ibm/informix/etc/sqlhosts.informixoltp_tcp
ENV LD_LIBRARY_PATH ${INFORMIXDIR}/lib:$INFORMIXDIR/lib/esql:$INFORMIXDIR/lib/tools:${INFORMIXDIR}/lib/cli
ENV INFORMIXSQLHOSTS ${INFORMIXDIR}/etc/sqlhosts.informixoltp_tcp
ENV USER root
ENV LICENSE accept

RUN ln -s /usr/bin/python2.7 /usr/bin/python
RUN echo "sqlexec $SERVERPORT/tcp" >> /etc/services

WORKDIR /app
COPY . .
RUN npm --unsafe-perm install
12 changes: 6 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/**
* The main entry of the application.
*/
require('legacy-processor-module/bootstrap')
require('legacy-processor-module/bootstrap');

const config = require('config')
const KafkaConsumer = require('legacy-processor-module/KafkaConsumer')
const config = require('config');
const KafkaConsumer = require('legacy-processor-module/KafkaConsumer');

const SubmissionService = require('./src/services/SubmissionService')
const SubmissionService = require('./src/services/SubmissionService');

const consumer = KafkaConsumer.startConsumer(SubmissionService, [config.KAFKA_NEW_SUBMISSION_TOPIC, config.KAFKA_UPDATE_SUBMISSION_TOPIC])
const consumer = KafkaConsumer.startConsumer(SubmissionService, [config.KAFKA_AGGREGATE_SUBMISSION_TOPIC]);

if (process.env.NODE_ENV === 'test') {
module.exports = consumer
module.exports = consumer;
}
465 changes: 433 additions & 32 deletions package-lock.json

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -16,18 +16,20 @@
},
"dependencies": {
"async-mutex": "^0.1.3",
"legacy-processor-module": "git+https://github.com/topcoder-platform/legacy-processor-module.git#develop",
"axios": "^0.18.0",
"bluebird": "^3.5.3",
"config": "^1.30.0",
"dd-trace": "^0.7.2",
"dotenv": "^6.0.0",
"flatted": "^2.0.0",
"http-json-response": "^1.0.1",
"informix": "coderReview/node-informix.git#master",
"ifxnjs": "^8.0.1",
"joi": "^13.4.0",
"legacy-processor-module": "git+https://github.com/topcoder-platform/legacy-processor-module.git#master",
"lodash": "^4.17.10",
"moment": "^2.24.0",
"no-kafka": "^3.2.10",
"q": "^1.5.1",
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#v2.6",
"topcoder-healthcheck-dropin": "^1.0.2",
"winston": "^2.4.2"
@@ -52,5 +54,8 @@
"exclude": [
"test/*.js"
]
},
"engines": {
"node": "8.x"
}
}
50 changes: 20 additions & 30 deletions src/services/SubmissionService.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* The service to handle new submission events for non-MM challenge.
*/
const config = require("config");
const Joi = require("joi");
const config = require('config');
const Joi = require('joi');

const logger = require("legacy-processor-module/common/logger");
const Schema = require("legacy-processor-module/Schema");
const LegacySubmissionIdService = require("legacy-processor-module/LegacySubmissionIdService");
const logger = require('legacy-processor-module/common/logger');
const Schema = require('legacy-processor-module/Schema');
const LegacySubmissionIdService = require('legacy-processor-module/LegacySubmissionIdService');

// The event schema to validate events from Kafka
const eventSchema = Schema.createEventSchema({
@@ -28,15 +28,12 @@ const eventSchema = Schema.createEventSchema({
*/
async function handle(event) {
if (!event) {
logger.debug("Skipped null or empty event");
logger.debug('Skipped null or empty event');
return;
}

// Check topic and originator
if (
event.topic !== config.KAFKA_NEW_SUBMISSION_TOPIC &&
event.topic !== config.KAFKA_UPDATE_SUBMISSION_TOPIC
) {
if (event.topic !== config.KAFKA_AGGREGATE_SUBMISSION_TOPIC) {
logger.debug(`Skipped event from topic ${event.topic}`);
return;
}
@@ -46,7 +43,7 @@ async function handle(event) {
return;
}

if (event.payload.resource !== "submission") {
if (event.payload.resource !== 'submission') {
logger.debug(`Skipped event from resource ${event.payload.resource}`);
return;
}
@@ -57,34 +54,31 @@ async function handle(event) {
}

// Attempt to retrieve the subTrack of the challenge
const subTrack = await LegacySubmissionIdService.getSubTrack(
event.payload.challengeId
);
logger.debug(
`Challenge ${event.payload.challengeId} get subTrack ${subTrack}`
);
const subTrack = await LegacySubmissionIdService.getSubTrack(event.payload.challengeId);
logger.debug(`Challenge ${event.payload.challengeId} get subTrack ${subTrack}`);

const mmChallangeSubtracks = config.MM_CHALLENGE_SUBTRACK.split(",").map(x =>
x.trim()
);
const mmChallangeSubtracks = config.MM_CHALLENGE_SUBTRACK.split(',').map(x => x.trim());

// Skip MM challenge submissions
if (!subTrack || mmChallangeSubtracks.includes(subTrack)) {
logger.debug(`Skipped event for subTrack: ${subTrack}`);
return;
}

if (event.topic === config.KAFKA_NEW_SUBMISSION_TOPIC) {
if (event.payload.originalTopic === config.KAFKA_NEW_SUBMISSION_TOPIC) {
// Handle new submission
logger.debug(`Started adding submission for ${event.payload.id}`);
try {
const timestamp = Date.parse(event.payload.created);
const patchObject = await LegacySubmissionIdService.addSubmission(
event.payload.id,
event.payload.challengeId,
event.payload.memberId,
event.payload.submissionPhaseId,
event.payload.url,
event.payload.type
event.payload.type,
timestamp,
false
);

logger.debug(
@@ -93,7 +87,7 @@ async function handle(event) {
}, patch: ${JSON.stringify(patchObject)}`
);
} catch (error) {
logger.error(`Failed to handle ${JSON.stringify(event)}: ${error.message}`)
logger.error(`Failed to handle ${JSON.stringify(event)}: ${error.message}`);
logger.error(error);
}
} else if (event.payload.url) {
@@ -103,15 +97,11 @@ async function handle(event) {
let legacySubmissionId = event.payload.legacySubmissionId;
if (!legacySubmissionId) {
// In case legacySubmissionId not present, try to get it from submission API
const submission = await LegacySubmissionIdService.getSubmission(
event.payload.id
);
const submission = await LegacySubmissionIdService.getSubmission(event.payload.id);
legacySubmissionId = submission.legacySubmissionId || 0;
}

logger.debug(
`Started updating URL for submission for ${legacySubmissionId}`
);
logger.debug(`Started updating URL for submission for ${legacySubmissionId}`);
try {
await LegacySubmissionIdService.updateUpload(
event.payload.challengeId,
@@ -127,7 +117,7 @@ async function handle(event) {
}`
);
} catch (error) {
logger.error(`Failed to handle ${JSON.stringify(event)}: ${error.message}`)
logger.error(`Failed to handle ${JSON.stringify(event)}: ${error.message}`);
logger.error(error);
}
}