From 5c9cad95cf335904ddfce285327075dd80991593 Mon Sep 17 00:00:00 2001 From: veshu Date: Fri, 22 Jun 2018 22:10:32 +0545 Subject: [PATCH] fixes for https://github.com/topcoder-platform/topcoder-x-ui/issues/21 --- README.md | 6 +- config/default.js | 4 +- configuration.md | 2 +- package-lock.json | 264 +++++++++++++++++----------------------------- package.json | 2 +- utils/kafka.js | 71 ++++--------- 6 files changed, 124 insertions(+), 225 deletions(-) diff --git a/README.md b/README.md index efbb0ca..3facc2a 100755 --- a/README.md +++ b/README.md @@ -45,12 +45,12 @@ The following config parameters are supported, they are defined in `config/defau |FIX_ACCEPTED_ISSUE_LABEL|the label name for fix accepted, should be one of the label configured in topcoder x ui|'Fix Accepted'| |TC_OR_DETAIL_LINK|the link to online review detail of challenge| see `default.js`, OR link for dev environment| -KAFKA_OPTIONS should be object as described in https://github.com/SOHU-Co/kafka-node#kafkaclient +KAFKA_OPTIONS should be object as described in https://github.com/oleksiyk/kafka#ssl For using with SSL, the options should be as ``` { - kafkaHost: '', - sslOptions: { + connectionString: '', + ssl: { cert: '', key: '' } diff --git a/config/default.js b/config/default.js index 6f7556e..32ae4a8 100644 --- a/config/default.js +++ b/config/default.js @@ -17,8 +17,8 @@ module.exports = { PARTITION: process.env.PARTITION || 0, TOPIC: process.env.TOPIC || 'tc-x-events', KAFKA_OPTIONS: { - kafkaHost: process.env.KAFKA_HOST || 'localhost:9092', - sslOptions: { + connectionString: process.env.KAFKA_HOST || 'localhost:9092', + ssl: { cert: process.env.KAFKA_CLIENT_CERT || fs.readFileSync('./kafka_client.cer'), // eslint-disable-line no-sync key: process.env.KAFKA_CLIENT_CERT_KEY || fs.readFileSync('./kafka_client.key'), // eslint-disable-line no-sync } diff --git a/configuration.md b/configuration.md index ac148a8..d6bdba3 100644 --- a/configuration.md +++ b/configuration.md @@ -30,7 +30,7 @@ The following config parameters are supported, they are defined in `config/defau |RETRY_INTERVAL| the interval at which the event should be retried to process in milliseconds | 120000| |READY_FOR_REVIEW_ISSUE_LABEL| the label name for ready for review, should be one of the label configured in topcoder x ui|'Ready for review'| -KAFKA_OPTIONS should be object as described in https://github.com/SOHU-Co/kafka-node#kafkaclient +KAFKA_OPTIONS should be object as described in https://github.com/oleksiyk/kafka#ssl For using with SSL, the options should be as ``` { diff --git a/package-lock.json b/package-lock.json index e95f2b8..0f2ea89 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4,6 +4,16 @@ "lockfileVersion": 1, "requires": true, "dependencies": { + "@types/bluebird": { + "version": "3.5.0", + "resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.0.tgz", + "integrity": "sha1-JjNHCk6r6aR82aRf2yDtX5NAe8o=" + }, + "@types/lodash": { + "version": "4.14.110", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.110.tgz", + "integrity": "sha512-iXYLa6olt4tnsCA+ZXeP6eEW3tk1SulWeYyP/yooWfAtXjozqXgtX4+XUtMuOCfYjKGz3F34++qUc3Q+TJuIIw==" + }, "acorn": { "version": "5.1.2", "resolved": "https://registry.npmjs.org/acorn/-/acorn-5.1.2.tgz", @@ -125,21 +135,6 @@ "resolved": "https://registry.npmjs.org/assert-plus/-/assert-plus-1.0.0.tgz", "integrity": "sha1-8S4PPF13sLHN2RRpQuTpbB5N1SU=" }, - "async": { - "version": "2.6.1", - "resolved": "https://registry.npmjs.org/async/-/async-2.6.1.tgz", - "integrity": "sha512-fNEiL2+AZt6AlAw/29Cr0UDe4sRAHCpEHh54WMz+Bb7QfNcFw4h3loofyJpLeQs4Yx7yuqu/2dLgM5hKOs6HlQ==", - "requires": { - "lodash": "^4.17.10" - }, - "dependencies": { - "lodash": { - "version": "4.17.10", - "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.10.tgz", - "integrity": "sha512-UejweD1pDoXu+AD825lWwp4ZGtSwgnpZxb3JDViD7StjQz+Nb/6l093lx4OQ0foGWNRoc19mWy7BzL+UAK2iVg==" - } - } - }, "asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -178,7 +173,8 @@ "balanced-match": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.0.tgz", - "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=" + "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=", + "dev": true }, "bcrypt-pbkdf": { "version": "1.0.1", @@ -189,56 +185,20 @@ "tweetnacl": "^0.14.3" } }, - "binary": { - "version": "0.3.0", - "resolved": "https://registry.npmjs.org/binary/-/binary-0.3.0.tgz", - "integrity": "sha1-n2BVO8XOjDOG87VTz/R0Yq3sqnk=", - "requires": { - "buffers": "~0.1.1", - "chainsaw": "~0.1.0" - } - }, - "bindings": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.3.0.tgz", - "integrity": "sha512-DpLh5EzMR2kzvX1KIlVC0VkC3iZtHKTgdtZ0a3pglBZdaQFjt5S9g9xd1lE+YvXyfd6mtCeRnrUfOLYiTMlNSw==", - "optional": true - }, - "bl": { - "version": "1.2.2", - "resolved": "https://registry.npmjs.org/bl/-/bl-1.2.2.tgz", - "integrity": "sha512-e8tQYnZodmebYDWGH7KMRvtzKXaJHx3BbilrgZCfvyLUYdKpK1t5PSPmpkny/SgiTSCnjfLW7v5rlONXVFkQEA==", + "bin-protocol": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/bin-protocol/-/bin-protocol-3.0.4.tgz", + "integrity": "sha1-RlqdNQb+sOEmtStbIWDZNuFbJ/Q=", "requires": { - "readable-stream": "^2.3.5", - "safe-buffer": "^5.1.1" + "lodash": "^4.1.0", + "long": "^3.0.3", + "protocol-buffers-schema": "^3.0.0" }, "dependencies": { - "process-nextick-args": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.0.tgz", - "integrity": "sha512-MtEC1TqN0EU5nephaJ4rAtThHtC86dNN9qCuEhtshvpVBkAW5ZO7BASN9REnF9eoXGcRub+pFuKEpOHE+HbEMw==" - }, - "readable-stream": { - "version": "2.3.6", - "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.3.6.tgz", - "integrity": "sha512-tQtKA9WIAhBF3+VLAseyMqZeBjW0AHJoxOtYqSUZNJxauErmLbVm2FW1y+J/YA9dUrAC39ITejlZWhVIwawkKw==", - "requires": { - "core-util-is": "~1.0.0", - "inherits": "~2.0.3", - "isarray": "~1.0.0", - "process-nextick-args": "~2.0.0", - "safe-buffer": "~5.1.1", - "string_decoder": "~1.1.1", - "util-deprecate": "~1.0.1" - } - }, - "string_decoder": { - "version": "1.1.1", - "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", - "integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==", - "requires": { - "safe-buffer": "~5.1.0" - } + "long": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/long/-/long-3.2.0.tgz", + "integrity": "sha1-2CG3E4yhy1gcFymQ7xTbIAtcR0s=" } } }, @@ -259,6 +219,7 @@ "version": "1.1.8", "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.8.tgz", "integrity": "sha1-wHshHHyVLsH479Uad+8NHTmQopI=", + "dev": true, "requires": { "balanced-match": "^1.0.0", "concat-map": "0.0.1" @@ -279,19 +240,6 @@ "resolved": "https://registry.npmjs.org/buffer-shims/-/buffer-shims-1.0.0.tgz", "integrity": "sha1-mXjOMXOIxkmth5MCjDR37wRKi1E=" }, - "buffermaker": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/buffermaker/-/buffermaker-1.2.0.tgz", - "integrity": "sha1-u3MlLsCIK3Y56bVWuCnav8LK4bo=", - "requires": { - "long": "1.1.2" - } - }, - "buffers": { - "version": "0.1.1", - "resolved": "https://registry.npmjs.org/buffers/-/buffers-0.1.1.tgz", - "integrity": "sha1-skV5w77U1tOWru5tmorn9Ugqt7s=" - }, "builtin-modules": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/builtin-modules/-/builtin-modules-1.1.1.tgz", @@ -318,14 +266,6 @@ "resolved": "https://registry.npmjs.org/caseless/-/caseless-0.12.0.tgz", "integrity": "sha1-G2gcIf+EAzyCZUMJBolCDRhxUdw=" }, - "chainsaw": { - "version": "0.1.0", - "resolved": "https://registry.npmjs.org/chainsaw/-/chainsaw-0.1.0.tgz", - "integrity": "sha1-XqtQsor+WAdNDVgpE4iCi15fvJg=", - "requires": { - "traverse": ">=0.3.0 <0.4" - } - }, "chalk": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/chalk/-/chalk-1.1.3.tgz", @@ -392,7 +332,8 @@ "concat-map": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz", - "integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=" + "integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=", + "dev": true }, "concat-stream": { "version": "1.6.0", @@ -414,6 +355,11 @@ "os-homedir": "1.0.2" } }, + "connection-parse": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/connection-parse/-/connection-parse-0.0.7.tgz", + "integrity": "sha1-GOcxiqsGppkmc3KxDFIm0locmmk=" + }, "contains-path": { "version": "0.1.0", "resolved": "https://registry.npmjs.org/contains-path/-/contains-path-0.1.0.tgz", @@ -1163,6 +1109,15 @@ "ansi-regex": "^2.0.0" } }, + "hashring": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/hashring/-/hashring-3.2.0.tgz", + "integrity": "sha1-/aTv3oqiLNuX+x0qZeiEAeHBRM4=", + "requires": { + "connection-parse": "0.0.x", + "simple-lru-cache": "0.0.x" + } + }, "hawk": { "version": "6.0.2", "resolved": "https://registry.npmjs.org/hawk/-/hawk-6.0.2.tgz", @@ -1525,27 +1480,6 @@ "resolved": "https://registry.npmjs.org/jwt-decode/-/jwt-decode-2.2.0.tgz", "integrity": "sha1-fYa9VmefWM5qhHBKZX3TkruoGnk=" }, - "kafka-node": { - "version": "2.6.1", - "resolved": "https://registry.npmjs.org/kafka-node/-/kafka-node-2.6.1.tgz", - "integrity": "sha512-tpivkSLjiGHRLwx0YN87fMUATOK4NYWESJneHlpikEBNNA5od7fW/ikovS3tWooMqG4Nri55vPFRUNiNvNBWZA==", - "requires": { - "async": "^2.5.0", - "binary": "~0.3.0", - "bl": "^1.2.0", - "buffer-crc32": "~0.2.5", - "buffermaker": "~1.2.0", - "debug": "^2.1.3", - "lodash": "^4.17.4", - "minimatch": "^3.0.2", - "nested-error-stacks": "^2.0.0", - "node-zookeeper-client": "~0.2.2", - "optional": "^0.1.3", - "retry": "^0.10.1", - "snappy": "^6.0.1", - "uuid": "^3.0.0" - } - }, "kareem": { "version": "1.5.0", "resolved": "https://registry.npmjs.org/kareem/-/kareem-1.5.0.tgz", @@ -1610,11 +1544,6 @@ "integrity": "sha1-9HGh2khr5g9quVXRcRVSPdHSVdU=", "dev": true }, - "long": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/long/-/long-1.1.2.tgz", - "integrity": "sha1-6u9ZUcp1UdlpJrgtokLbnWso+1M=" - }, "loose-envify": { "version": "1.3.1", "resolved": "https://registry.npmjs.org/loose-envify/-/loose-envify-1.3.1.tgz", @@ -1668,6 +1597,7 @@ "version": "3.0.4", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.0.4.tgz", "integrity": "sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==", + "dev": true, "requires": { "brace-expansion": "^1.1.7" } @@ -1802,34 +1732,60 @@ "resolved": "https://registry.npmjs.org/muri/-/muri-1.3.0.tgz", "integrity": "sha512-FiaFwKl864onHFFUV/a2szAl7X0fxVlSKNdhTf+BM8i8goEgYut8u5P9MqQqIYwvaMxjzVESsoEm/2kfkFH1rg==" }, + "murmur-hash-js": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/murmur-hash-js/-/murmur-hash-js-1.0.0.tgz", + "integrity": "sha1-UEEEkmnJZjPIZjhpYLL0KJ515bA=" + }, "mute-stream": { "version": "0.0.5", "resolved": "https://registry.npmjs.org/mute-stream/-/mute-stream-0.0.5.tgz", "integrity": "sha1-j7+rsKmKJT0xhDMfno3rc3L6xsA=", "dev": true }, - "nan": { - "version": "2.10.0", - "resolved": "https://registry.npmjs.org/nan/-/nan-2.10.0.tgz", - "integrity": "sha512-bAdJv7fBLhWC+/Bls0Oza+mvTaNQtP+1RyhhhvD95pgUJz6XM5IzgmxOkItJ9tkoCiplvAnXI1tNmmUD/eScyA==", - "optional": true - }, "natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz", "integrity": "sha1-Sr6/7tdUHywnrPspvbvRXI1bpPc=", "dev": true }, - "nested-error-stacks": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/nested-error-stacks/-/nested-error-stacks-2.0.1.tgz", - "integrity": "sha512-SrQrok4CATudVzBS7coSz26QRSmlK9TzzoFbeKfcPBUFPjcQM9Rqvr/DlJkOrwI/0KcgvMub1n1g5Jt9EgRn4A==" - }, "netrc": { "version": "0.1.4", "resolved": "https://registry.npmjs.org/netrc/-/netrc-0.1.4.tgz", "integrity": "sha1-a+lPysqNd63gqWcNxGCRTJRHJEQ=" }, + "nice-simple-logger": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/nice-simple-logger/-/nice-simple-logger-1.0.1.tgz", + "integrity": "sha1-D55khSe+e+PkmrdvqMjAmK+VG/Y=", + "requires": { + "lodash": "^4.3.0" + } + }, + "no-kafka": { + "version": "3.2.10", + "resolved": "https://registry.npmjs.org/no-kafka/-/no-kafka-3.2.10.tgz", + "integrity": "sha1-0sq8QwZbSS24wVyiOK6V8WgIGvU=", + "requires": { + "@types/bluebird": "3.5.0", + "@types/lodash": "^4.14.55", + "bin-protocol": "^3.0.4", + "bluebird": "^3.3.3", + "buffer-crc32": "^0.2.5", + "hashring": "^3.2.0", + "lodash": "=4.17.5", + "murmur-hash-js": "^1.0.0", + "nice-simple-logger": "^1.0.1", + "wrr-pool": "^1.0.3" + }, + "dependencies": { + "lodash": { + "version": "4.17.5", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.5.tgz", + "integrity": "sha512-svL3uiZf1RwhH+cWrfZn3A4+U58wbP0tGVTLQPbjplZxZ8ROD9VLuNgsRniTlLe7OlSqR79RUehXgpBW/s0IQw==" + } + } + }, "node-fetch": { "version": "1.7.3", "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-1.7.3.tgz", @@ -1852,22 +1808,6 @@ "url-join": "^4.0.0" } }, - "node-zookeeper-client": { - "version": "0.2.2", - "resolved": "https://registry.npmjs.org/node-zookeeper-client/-/node-zookeeper-client-0.2.2.tgz", - "integrity": "sha1-CXvaAZme749gLOBotjJgAGnb9oU=", - "requires": { - "async": "~0.2.7", - "underscore": "~1.4.4" - }, - "dependencies": { - "async": { - "version": "0.2.10", - "resolved": "https://registry.npmjs.org/async/-/async-0.2.10.tgz", - "integrity": "sha1-trvgsGdLnXGXCMo43owjfLUmw9E=" - } - } - }, "nodemailer": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/nodemailer/-/nodemailer-4.4.0.tgz", @@ -1923,11 +1863,6 @@ "integrity": "sha1-ofeDj4MUxRbwXs78vEzP4EtO14k=", "dev": true }, - "optional": { - "version": "0.1.4", - "resolved": "https://registry.npmjs.org/optional/-/optional-0.1.4.tgz", - "integrity": "sha512-gtvrrCfkE08wKcgXaVwQVgwEQ8vel2dc5DDBn9RLQZ3YtmtkBss6A2HY6BnJH4N/4Ku97Ri/SF8sNWE2225WJw==" - }, "optionator": { "version": "0.8.2", "resolved": "https://registry.npmjs.org/optionator/-/optionator-0.8.2.tgz", @@ -2092,6 +2027,11 @@ "loose-envify": "^1.3.1" } }, + "protocol-buffers-schema": { + "version": "3.3.2", + "resolved": "https://registry.npmjs.org/protocol-buffers-schema/-/protocol-buffers-schema-3.3.2.tgz", + "integrity": "sha512-Xdayp8sB/mU+sUV4G7ws8xtYMGdQnxbeIfLjyO9TZZRJdztBGhlmbI5x1qcY4TG5hBkIKGnc28i7nXxaugu88w==" + }, "punycode": { "version": "1.4.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-1.4.1.tgz", @@ -2277,11 +2217,6 @@ "onetime": "^1.0.0" } }, - "retry": { - "version": "0.10.1", - "resolved": "https://registry.npmjs.org/retry/-/retry-0.10.1.tgz", - "integrity": "sha1-52OI0heZLCUnUCQdPTlW/tmNj/Q=" - }, "rimraf": { "version": "2.6.2", "resolved": "https://registry.npmjs.org/rimraf/-/rimraf-2.6.2.tgz", @@ -2333,6 +2268,11 @@ "rechoir": "^0.6.2" } }, + "simple-lru-cache": { + "version": "0.0.2", + "resolved": "https://registry.npmjs.org/simple-lru-cache/-/simple-lru-cache-0.0.2.tgz", + "integrity": "sha1-1ZzDoZPBpdAyD4Tucy9uRxPlEd0=" + }, "slice-ansi": { "version": "0.0.4", "resolved": "https://registry.npmjs.org/slice-ansi/-/slice-ansi-0.0.4.tgz", @@ -2344,16 +2284,6 @@ "resolved": "https://registry.npmjs.org/sliced/-/sliced-1.0.1.tgz", "integrity": "sha1-CzpmK10Ewxd7GSa+qCsD+Dei70E=" }, - "snappy": { - "version": "6.0.4", - "resolved": "https://registry.npmjs.org/snappy/-/snappy-6.0.4.tgz", - "integrity": "sha512-+MjETxi/G7fLtiLFWW9n9VLzpJvOVqRRohJ7kTgaU4bUJ37rsoWwxhZzO91BOB7sCgOILtKsGtCUviUo3REIfQ==", - "optional": true, - "requires": { - "bindings": "^1.3.0", - "nan": "^2.10.0" - } - }, "sntp": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/sntp/-/sntp-2.0.2.tgz", @@ -2762,11 +2692,6 @@ "punycode": "^1.4.1" } }, - "traverse": { - "version": "0.3.9", - "resolved": "https://registry.npmjs.org/traverse/-/traverse-0.3.9.tgz", - "integrity": "sha1-cXuPIgzAu3tE5AUUwisui7xw2Lk=" - }, "tryit": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/tryit/-/tryit-1.0.3.tgz", @@ -2813,11 +2738,6 @@ "resolved": "https://registry.npmjs.org/uc.micro/-/uc.micro-1.0.3.tgz", "integrity": "sha1-ftUNXg+an7ClczeSWfKndFjVAZI=" }, - "underscore": { - "version": "1.4.4", - "resolved": "https://registry.npmjs.org/underscore/-/underscore-1.4.4.tgz", - "integrity": "sha1-YaajIBBiKvoHljvzJSA88SI51gQ=" - }, "unique-string": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/unique-string/-/unique-string-1.0.0.tgz", @@ -2925,6 +2845,14 @@ "mkdirp": "^0.5.1" } }, + "wrr-pool": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/wrr-pool/-/wrr-pool-1.1.3.tgz", + "integrity": "sha1-/a0i8uofMDY//l14HPeUl6d/8H4=", + "requires": { + "lodash": "^4.0.1" + } + }, "xtend": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.1.tgz", diff --git a/package.json b/package.json index 7eee338..c848434 100644 --- a/package.json +++ b/package.json @@ -27,11 +27,11 @@ "github": "^12.0.2", "joi": "^13.0.0", "jwt-decode": "^2.2.0", - "kafka-node": "^2.6.1", "lodash": "^4.17.4", "markdown-it": "^8.4.0", "moment": "^2.19.1", "mongoose": "^4.12.3", + "no-kafka": "^3.2.10", "node-gitlab-api": "^2.2.6", "nodemailer": "^4.4.0", "topcoder-api-challenges": "^1.0.6", diff --git a/utils/kafka.js b/utils/kafka.js index 39558f2..f83df34 100644 --- a/utils/kafka.js +++ b/utils/kafka.js @@ -11,54 +11,32 @@ */ 'use strict'; -const {promisify} = require('util'); -const kafka = require('kafka-node'); const config = require('config'); const _ = require('lodash'); +const kafka = require('no-kafka'); const IssueService = require('../services/IssueService'); const logger = require('./logger'); -const Offset = kafka.Offset; - class Kafka { constructor() { - this.client = new kafka.KafkaClient(config.KAFKA_OPTIONS); - this.consumer = new kafka.Consumer(this.client, [{topic: config.TOPIC, partition: config.PARTITION}], {autoCommit: true}); - this.consumer.setOffset(config.TOPIC, 0, 0); - this.offset = new Offset(this.client); - this.producer = new kafka.Producer(this.client); - logger.info(`Connecting on topic: ${config.TOPIC}`); - - this.sendAsync = promisify(this.producer.send).bind(this.producer); - } - - run() { - this.consumer.on('error', (err) => { - logger.error(`ERROR ${err}`); - }); + this.consumer = new kafka.SimpleConsumer(config.KAFKA_OPTIONS); - this.consumer.on('offsetOutOfRange', (topic) => { - logger.debug(`TOPIC ${topic}`); - logger.info('offset OutOfRange. resetting.'); - this.offset.fetch([topic], (errOffsetFetch, offsets) => { - if (errOffsetFetch) { - logger.error(errOffsetFetch); - return console.error(errOffsetFetch); - } - - const min = Math.min(offsets[topic.topic][topic.partition]); - logger.info(`Setting offset to ${min}`); - return this.consumer.setOffset(config.TOPIC, topic.partition, min); - }); + this.producer = new kafka.Producer(config.KAFKA_OPTIONS); + this.producer.init().then(() => { + logger.info('kafka producer is ready.'); + }).catch((err) => { + logger.error(`kafka producer is not connected. ${err.stack}`); }); + } - this.consumer.on('message', (message) => { - logger.info(`received message from kafka: ${message.value}`); + messageHandler(messageSet) { + messageSet.forEach((item) => { + logger.info(`received message from kafka: ${item.message.value.toString('utf8')}`); // The event should be a JSON object let event; try { - event = JSON.parse(message.value); + event = JSON.parse(item.message.value.toString('utf8')); } catch (err) { logger.error(`"message" is not a valid JSON-formatted string: ${err.message}`); return; @@ -70,26 +48,19 @@ class Kafka { .catch(logger.error); } }); - this.consumer.on('ready', () => { - logger.info('kafka consumer is ready.'); - }); - this.producer.on('ready', () => { - logger.info('kafka producer is ready.'); + } - this.producer.createTopics([config.TOPIC], true, (err) => { - if (err) { - logger.error(`error in creating topic: ${config.TOPIC}, error: ${err.stack}`); - } else { - logger.info(`kafka topic: ${config.TOPIC} is ready`); - } - }); - }); - this.producer.on('error', (err) => { - logger.error(`kafka is not connected. ${err.stack}`); + run() { + this.consumer.init().then(() => { + logger.info('kafka consumer is ready'); + this.consumer.subscribe(config.TOPIC, {}, this.messageHandler); + }).catch((err) => { + logger.error(`kafka consumer is not connected. ${err.stack}`); }); } + send(message) { - return this.sendAsync([{topic: config.TOPIC, messages: message}]); + return this.producer.send({topic: config.TOPIC, message: {value: message}}); } }