diff --git a/.circleci/config.yml b/.circleci/config.yml index da48833..ca3986b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -41,16 +41,22 @@ build_steps: &build_steps command: | ./awsconfiguration.sh ${DEPLOY_ENV} source awsenvconf - ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar + + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar source buildenvvar ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - + echo "Running Masterscript - deploy postgres-ifx-processer producer" if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar source buildenvvar ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - + + echo "Running Masterscript - deploy postgres-ifx-processer producer_dd" + if [ -e ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json; fi + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer jobs: # Build & Deploy against development backend # "build-dev": @@ -61,7 +67,16 @@ jobs: GLOBAL_ENV: "dev" APP_NAME: "postgres-ifx-processer" steps: *build_steps - # Build & Deploy against production backend + # Build & Deploy against development backend # + "build-test": + <<: *defaults + environment: + DEPLOY_ENV: "DEV" + LOGICAL_ENV: "TEST" + GLOBAL_ENV: "dev" + APP_NAME: "postgres-ifx-processer" + steps: *build_steps + # Build & Deploy against production backend "build-prod": <<: *defaults environment: @@ -82,10 +97,16 @@ workflows: only: - dev - dev-retryfeature + - "build-test": + context : org-global + filters: + branches: + only: + - dev-test-pg + - dev-test-pg-rf - "build-prod": context : org-global filters: branches: only: - master - diff --git a/config/default.js b/config/default.js index 54a0793..d22bb48 100644 --- a/config/default.js +++ b/config/default.js @@ -22,8 +22,8 @@ module.exports = { database: process.env.PG_DATABASE || 'postgres', // database must exist before running the tool password: process.env.PG_PASSWORD || 'password', port: parseInt(process.env.PG_PORT, 10) || 5432, - triggerFunctions: process.env.TRIGGER_FUNCTIONS || ['db_notifications'], // List of trigger functions to listen to - triggerTopics: process.env.TRIGGER_TOPICS || ['db.postgres.sync'], // Names of the topic in the trigger payload + triggerFunctions: process.env.TRIGGER_FUNCTIONS || ['test_db_notifications'], // List of trigger functions to listen to + triggerTopics: process.env.TRIGGER_TOPICS || ['test.db.postgres.sync'], // Names of the topic in the trigger payload triggerOriginators: process.env.TRIGGER_ORIGINATORS || ['tc-postgres-delta-processor'] // Names of the originator in the trigger payload }, KAFKA: { // Kafka connection options @@ -38,6 +38,20 @@ module.exports = { errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error', recipients: ['admin@abc.com'] // Kafka partitions to use }, + SLACK: { + URL: process.env.SLACKURL || 'us-east-1', + SLACKCHANNEL: process.env.SLACKCHANNEL || 'ifxpg-migrator', + SLACKNOTIFY: process.env.SLACKNOTIFY || 'false' + }, + RECONSILER:{ + RECONSILER_START: process.env.RECONSILER_START || 10, + RECONSILER_END: process.env.RECONSILER_END || 5, + RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm' + }, + DYNAMODB: + { + DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync' + }, AUTH0_URL: process.env.AUTH0_URL , AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE , diff --git a/config/test.js b/config/test.js index 228861f..a198f88 100644 --- a/config/test.js +++ b/config/test.js @@ -7,9 +7,9 @@ module.exports = { host: process.env.INFORMIX_HOST || 'localhost', port: parseInt(process.env.INFORMIX_PORT, 10) || 2021, user: process.env.INFORMIX_USER || 'informix', - password: process.env.INFORMIX_PASSWORD || '1nf0rm1x', - database: process.env.INFORMIX_DATABASE || 'tcs_catalog', - server: process.env.INFORMIX_SERVER || 'informixoltp_tcp', + password: process.env.INFORMIX_PASSWORD || 'password', + database: process.env.INFORMIX_DATABASE || 'db', + server: process.env.INFORMIX_SERVER || 'informixp', minpool: parseInt(process.env.MINPOOL, 10) || 1, maxpool: parseInt(process.env.MAXPOOL, 10) || 60, maxsize: parseInt(process.env.MAXSIZE, 10) || 0, diff --git a/package.json b/package.json index 21b72b6..f026db9 100644 --- a/package.json +++ b/package.json @@ -8,11 +8,13 @@ "lint:fix": "standard --env mocha --fix", "producer": "node ./src/producer.js", "consumer": "node ./src/consumer.js", - "start": "npm run producer & npm run consumer" + "producer_dd": "node ./src/producer.js failover", + "start": "npm run producer & npm run producer_dd & npm run consumer" }, "author": "Topcoder", "license": "ISC", "dependencies": { + "aws-sdk": "*", "config": "^3.2.2", "informix-wrapper": "git+https://github.com/appirio-tech/informix-wrapper.git", "no-kafka": "^3.4.3", diff --git a/pg-identity-func-trig-seq.sql b/pg-identity-func-trig-seq.sql new file mode 100644 index 0000000..6679c67 --- /dev/null +++ b/pg-identity-func-trig-seq.sql @@ -0,0 +1,299 @@ +SET search_path TO common_oltp; + +CREATE TABLE + pgifx_sync_audit + ( + seq_id SERIAL NOT NULL, + payloadseqid CHARACTER VARYING, + processid INTEGER, + tablename CHARACTER VARYING(64), + uniquecolumn CHARACTER VARYING(64), + dboperation CHARACTER VARYING(32), + syncstatus CHARACTER VARYING(64), + retrycount CHARACTER VARYING(64), + consumer_err CHARACTER VARYING, + producer_err CHARACTER VARYING, + payload CHARACTER VARYING, + auditdatetime TIMESTAMP(6) WITHOUT TIME ZONE, + topicname CHARACTER VARYING(64), + UNIQUE (payloadseqid) + ); + +CREATE OR REPLACE FUNCTION "common_oltp"."notify_trigger_common_oltp" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + rec RECORD; + payload TEXT; + column_name TEXT; + column_value TEXT; + payload_items TEXT[]; + uniquecolumn TEXT; + logtime TEXT; + payloadseqid INTEGER; +BEGIN + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + raise notice 'table name : %', TG_TABLE_NAME; + RAISE info 'hello world'; + -- Get required fields + FOREACH column_name IN ARRAY TG_ARGV LOOP + EXECUTE format('SELECT $1.%I::TEXT', column_name) + INTO column_value + USING rec; + case + when + column_name = 'upload_document' then + -- RAISE NOTICE 'upload_document boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'upload_document_required' then + -- RAISE NOTICE 'upload_document_required boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_email_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_handle_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'create_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'modify_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + -- when + -- column_name = 'achievement_date' then + --column_value := (select to_date (column_value, 'MM/DD/YYYY')); + --column_value := (select to_date (column_value)); + --when + --column_name = 'password' then + --column_value := regexp_replace(column_value, '\s', '', 'g'); + --column_value := regexp_replace(column_value, E'[\\n\\r]+', '\n\r', 'g'); + else + -- RAISE NOTICE ' not boolean'; + end case; + payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); + END LOOP; + --logtime := (select date_display_tz()); + logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('payloadsequence'::regclass)); + + uniquecolumn := (SELECT c.column_name + FROM information_schema.key_column_usage AS c + LEFT JOIN information_schema.table_constraints AS t + ON t.constraint_name = c.constraint_name + WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1); + + if (uniquecolumn = '') IS NOT FALSE then + uniquecolumn := 'Not-Available'; + end if; + + -- Build the payload + payload := '' + || '{' + || '"topic":"' || 'test.db.postgres.sync' || '",' + || '"originator":"' || 'tc-postgres-delta-processor' || '",' + || '"timestamp":"' || logtime || '",' + || '"mime-type":"' || 'application/json' || '",' + || '"payload": {' + || '"payloadseqid":"' || payloadseqid || '",' + || '"Uniquecolumn":"' || uniquecolumn || '",' + || '"operation":"' || TG_OP || '",' + || '"schema":"' || TG_TABLE_SCHEMA || '",' + || '"table":"' || TG_TABLE_NAME || '",' + || '"data": {' || array_to_string(payload_items, ',') || '}' + || '}}'; + + -- Notify the channel + PERFORM pg_notify('test_db_notifications', payload); + + RETURN rec; +END; +$body$ LANGUAGE plpgsql; + +CREATE TRIGGER "pg_email_trigger" + AFTER INSERT OR DELETE OR UPDATE ON email + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('user_id', 'email_id', 'email_type_id', 'address', 'primary_ind', 'status_id'); + +CREATE TRIGGER "pg_security_user_trigger" + AFTER INSERT OR DELETE OR UPDATE ON security_user + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('login_id', 'user_id', 'password', 'create_user_id'); + + +CREATE TRIGGER "pg_user_achievement_trigger" + AFTER INSERT OR DELETE OR UPDATE ON user_achievement + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('user_id', 'achievement_date', 'achievement_type_id', 'description', 'create_date'); + +CREATE TRIGGER "pg_user_group_xref_trigger" + AFTER INSERT OR DELETE OR UPDATE ON user_group_xref + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('user_group_id', 'login_id', 'group_id', 'create_user_id', 'security_status_id'); + +CREATE TRIGGER "pg_user_trigger" + AFTER INSERT OR DELETE OR UPDATE ON "user" + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('user_id', 'first_name', 'last_name', 'handle', 'status', 'activation_code', 'reg_source', 'utm_source', 'utm_medium', 'utm_campaign'); + +--drop SEQUENCE sequence_user_group_seq; +CREATE SEQUENCE sequence_user_group_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START +WITH 601000000 NO CYCLE; + + +--drop SEQUENCE sequence_email_seq; +CREATE SEQUENCE sequence_email_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START +WITH 70100000 NO CYCLE; + + +SET search_path TO informixoltp; + +CREATE OR REPLACE FUNCTION "informixoltp"."notify_trigger_informixoltp" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + rec RECORD; + payload TEXT; + column_name TEXT; + column_value TEXT; + payload_items TEXT[]; + uniquecolumn TEXT; + logtime TEXT; + payloadseqid INTEGER; +BEGIN + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + raise notice 'table name : %', TG_TABLE_NAME; + RAISE info 'hello world'; + -- Get required fields + FOREACH column_name IN ARRAY TG_ARGV LOOP + EXECUTE format('SELECT $1.%I::TEXT', column_name) + INTO column_value + USING rec; + case + when + column_name = 'upload_document' then + -- RAISE NOTICE 'upload_document boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'upload_document_required' then + -- RAISE NOTICE 'upload_document_required boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_email_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_handle_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'create_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'modify_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + -- when + -- column_name = 'achievement_date' then + --column_value := (select to_date (column_value, 'MM/DD/YYYY')); + --column_value := (select to_date (column_value)); + --when + --column_name = 'password' then + --column_value := regexp_replace(column_value, '\s', '', 'g'); + --column_value := regexp_replace(column_value, E'[\\n\\r]+', '\n\r', 'g'); + else + -- RAISE NOTICE ' not boolean'; + end case; + payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); + END LOOP; + --logtime := (select date_display_tz()); + logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('payloadsequence'::regclass)); + + uniquecolumn := (SELECT c.column_name + FROM information_schema.key_column_usage AS c + LEFT JOIN information_schema.table_constraints AS t + ON t.constraint_name = c.constraint_name + WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1); + + if (uniquecolumn = '') IS NOT FALSE then + uniquecolumn := 'Not-Available'; + end if; + + -- Build the payload + payload := '' + || '{' + || '"topic":"' || 'test.db.postgres.sync' || '",' + || '"originator":"' || 'tc-postgres-delta-processor' || '",' + || '"timestamp":"' || logtime || '",' + || '"mime-type":"' || 'application/json' || '",' + || '"payload": {' + || '"payloadseqid":"' || payloadseqid || '",' + || '"Uniquecolumn":"' || uniquecolumn || '",' + || '"operation":"' || TG_OP || '",' + || '"schema":"' || TG_TABLE_SCHEMA || '",' + || '"table":"' || TG_TABLE_NAME || '",' + || '"data": {' || array_to_string(payload_items, ',') || '}' + || '}}'; + + -- Notify the channel + PERFORM pg_notify('test_db_notifications', payload); + + RETURN rec; +END; +$body$ LANGUAGE plpgsql; + + +CREATE TRIGGER "pg_algo_rating" + AFTER INSERT OR DELETE OR UPDATE ON algo_rating + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_informixoltp('coder_id', 'rating', 'vol', 'round_id', 'num_ratings', 'algo_rating_type_id', 'modify_date'); + +CREATE TRIGGER "pg_coder" + AFTER INSERT OR DELETE OR UPDATE ON coder + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_informixoltp('coder_id', 'quote', 'coder_type_id', 'comp_country_code', 'display_quote', 'quote_location', 'quote_color', 'display_banner', 'banner_style'); diff --git a/src/consumer.js b/src/consumer.js index 826e978..c5d1e9b 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -9,6 +9,7 @@ const pushToKafka = require('./services/pushToKafka') const healthcheck = require('topcoder-healthcheck-dropin'); const auditTrail = require('./services/auditTrail'); const kafkaOptions = config.get('KAFKA') +const postMessage = require('./services/posttoslack') const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key const consumer = new Kafka.SimpleConsumer({ connectionString: kafkaOptions.brokers_url, @@ -20,7 +21,6 @@ const consumer = new Kafka.SimpleConsumer({ }) }) - const check = function () { if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) { return false; @@ -33,7 +33,7 @@ const check = function () { return connected; }; - +let cs_processId; const terminate = () => process.exit() /** * @@ -41,49 +41,99 @@ const terminate = () => process.exit() * @param {String} topic The name of the message topic * @param {Number} partition The kafka partition to which messages are written */ +var retryvar=""; +//let cs_payloadseqid; async function dataHandler(messageSet, topic, partition) { - for (const m of messageSet) { // Process messages sequentially +let cs_payloadseqid + for (const m of messageSet) { // Process messages sequentially let message try { message = JSON.parse(m.message.value) logger.debug('Received message from kafka:') - logger.debug(JSON.stringify(message)) - await updateInformix(message) + if (message.payload.payloadseqid) cs_payloadseqid = message.payload.payloadseqid; + logger.debug(`consumer : ${message.payload.payloadseqid} ${message.payload.table} ${message.payload.Uniquecolumn} ${message.payload.operation} ${message.timestamp} `); + await updateInformix(message) await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success - await auditTrail([message.payload.payloadseqid,'scorecard_consumer',message.payload.table,message.payload.Uniquecolumn, - message.payload.operation,1,0,"",message.timestamp,new Date(),message.payload.data],'consumer') + if (message.payload['retryCount']) retryvar = message.payload.retryCount; + auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, + message.payload.operation,"Informix-updated",retryvar,"","",message.payload.data, message.timestamp,message.topic],'consumer') } catch (err) { - logger.error('Could not process kafka message') + const errmsg2 = `Could not process kafka message or informix DB error: "${err.message}"` + logger.error(errmsg2) + //await callposttoslack(errmsg2) //logger.logFullError(err) + logger.debug(`error-sync: consumer "${err.message}"`) + if (!cs_payloadseqid){ + cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); + } + + await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', + 'message.payload.operation',"Error-Consumer","",err.message,"",'message.payload.data',new Date(),'message.topic'],'consumer') try { + //var retryvar + if (message.payload['retryCount']) retryvar = message.payload.retryCount; await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish - logger.debug('Trying to push same message after adding retryCounter') + // await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', + // 'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer') + //await callposttoslack(`Retry for Kafka push : retrycount : "${retryvar}"`) + logger.debug(`Trying to push same message after adding retryCounter`) if (!message.payload.retryCount) { message.payload.retryCount = 0 logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry); } if (message.payload.retryCount >= config.KAFKA.maxRetry) { logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic); - - let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic }) + logger.debug(`error-sync: consumer max-retry-limit reached`) + // push to slack - alertIt("slack message" + await callposttoslack(`error-sync: postgres-ifx-processor : consumer max-retry-limit reached: "${message.payload.table}": payloadseqid : "${cs_payloadseqid}"`) + let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic }) notifiyMessage.payload['recipients'] = config.KAFKA.recipients logger.debug('pushing following message on kafka error alert queue:') - logger.debug(notifiyMessage) - await pushToKafka(notifiyMessage) + //logger.debug(notifiyMessage) + await pushToKafka(notifiyMessage) return } message.payload['retryCount'] = message.payload.retryCount + 1; await pushToKafka(message) - logger.debug('pushed same message after adding retryCount') + var errmsg9 = `Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"` + logger.debug(errmsg9) + //await callposttoslack(errmsg9) } catch (err) { - //await auditTrail([payload.payload.payloadseqid,'scorecard_consumer',payload.payload.table,payload.payload.Uniquecolumn, - // payload.payload.operation,0,message.payload.retryCount,"re-publish kafka err",payload.timestamp,new Date(),""],'consumer') - logger.error("Error occured in re-publishing kafka message", err) + + await auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, + message.payload.operation,"Error-republishing",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer') + const errmsg1 = `postgres-ifx-processor: consumer : Error-republishing: "${err.message}"` + logger.error(errmsg1) + logger.debug(`error-sync: consumer re-publishing "${err.message}"`) + // push to slack - alertIt("slack message" + await callposttoslack(errmsg1) } } } } +async function callposttoslack(slackmessage) { +if(config.SLACK.SLACKNOTIFY === 'true') { + return new Promise(function (resolve, reject) { + postMessage(slackmessage, (response) => { + console.log(`respnse : ${response}`) + if (response.statusCode < 400) { + logger.debug('Message posted successfully'); + //callback(null); + } else if (response.statusCode < 500) { + const errmsg1 =`Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` + logger.debug(`error-sync: ${errmsg1}`) + } + else { + logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + } + resolve("done") + }); + }) //end +} + +} /** * Initialize kafka consumer @@ -91,12 +141,15 @@ async function dataHandler(messageSet, topic, partition) { async function setupKafkaConsumer() { try { await consumer.init() - await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler) + //await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler) + await consumer.subscribe(kafkaOptions.topic, dataHandler) + logger.info('Initialized kafka consumer') healthcheck.init([check]) } catch (err) { logger.error('Could not setup kafka consumer') logger.logFullError(err) + logger.debug(`error-sync: consumer kafka-setup "${err.message}"`) terminate() } } diff --git a/src/producer.js b/src/producer.js index ce59982..983de78 100644 --- a/src/producer.js +++ b/src/producer.js @@ -5,56 +5,125 @@ const config = require('config') const pg = require('pg') const logger = require('./common/logger') const pushToKafka = require('./services/pushToKafka') +const pushToDynamoDb = require('./services/pushToDynamoDb') const pgOptions = config.get('POSTGRES') +const postMessage = require('./services/posttoslack') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` const pgClient = new pg.Client(pgConnectionString) const auditTrail = require('./services/auditTrail'); const express = require('express') const app = express() const port = 3000 +const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false - -async function setupPgClient () { - try { +async function setupPgClient() { +var payloadcopy +try { await pgClient.connect() for (const triggerFunction of pgOptions.triggerFunctions) { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { try { + payloadcopy = "" const payload = JSON.parse(message.payload) - const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator + payloadcopy = message + const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { - console.log(`${payload.topic} ${payload.payload.table} ${payload.payload.operation} ${payload.timestamp}`); - await pushToKafka(payload) - } else { + if (!isFailover) { + //logger.info('trying to push on kafka topic') + await pushToKafka(payload) + logger.info('Push to kafka and added for audit trail') + } else { + logger.info('Push to dynamodb for reconciliation') + await pushToDynamoDb(payload) + } + audit(message) + } else { logger.debug('Ignoring message with incorrect topic or originator') + // push to slack - alertIt("slack message") } - await auditTrail([payload.payload.payloadseqid,'scorecard_producer',1,payload.topic,payload.payload.table,payload.payload.Uniquecolumn, - payload.payload.operation,"",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer') } catch (error) { logger.error('Could not parse message payload') - await auditTrail([payload.payload.payloadseqid,'scorecard_producer',0,payload.topic,payload.payload.table,payload.payload.Uniquecolumn, - payload.payload.operation,"error",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer') + logger.debug(`error-sync: producer parse message : "${error.message}"`) + const errmsg1 = `postgres-ifx-processor: producer or dd : Error Parse or payload : "${error.message}" \n payload : "${payloadcopy.payload}"` logger.logFullError(error) + audit(error) + // push to slack - alertIt("slack message" + await callposttoslack(errmsg1) } }) - logger.info('Listening to notifications') + logger.info('pg-ifx-sync producer: Listening to pg-trigger channel.') } catch (err) { - logger.error('Could not setup postgres client') + const errmsg = `postgres-ifx-processor: producer or dd : Error in setting up postgres client: "${err.message}"` + logger.error(errmsg) logger.logFullError(err) + // push to slack - alertIt("slack message") + await callposttoslack(errmsg) terminate() } } -async function run () { +const terminate = () => process.exit() + +async function run() { + logger.debug("Initialising producer setup...") await setupPgClient() } +async function callposttoslack(slackmessage) { +if(config.SLACK.SLACKNOTIFY === 'true') { + return new Promise(function (resolve, reject) { + postMessage(slackmessage, (response) => { + console.log(`respnse : ${response}`) + if (response.statusCode < 400) { + logger.debug('Message posted successfully'); + //callback(null); + } else if (response.statusCode < 500) { + const errmsg1 =`Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` + logger.debug(`error-sync: ${errmsg1}`) + } + else { + logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + } + resolve("done") + }); + }) //end +} + +} +// execute run() +async function audit(message) { + const pl_processid = message.processId + if (pl_processid != 'undefined') { + const payload = JSON.parse(message.payload) + const pl_seqid = payload.payload.payloadseqid + const pl_topic = payload.topic // TODO can move in config ? + const pl_table = payload.payload.table + const pl_uniquecolumn = payload.payload.Uniquecolumn + const pl_operation = payload.payload.operation + const pl_timestamp = payload.timestamp + const pl_payload = JSON.stringify(payload.payload) + const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` + if (!isFailover) { + logger.debug(`producer : ${logMessage}`); + } else { + logger.debug(`Producer DynamoDb : ${logMessage}`); + } + auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", JSON.stringify(message), pl_timestamp, pl_topic], 'producer') + } else { + const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) + if (!isFailover) { + await auditTrail([pl_randonseq, 1111, "", "", "", "error-producer", "", "", message.message, "", new Date(), ""], 'producer') + } + } +} + app.get('/health', (req, res) => { - res.send('health ok') + res.send('health ok') }) app.listen(port, () => console.log(`app listening on port ${port}!`)) diff --git a/src/reconsiler-audit.js b/src/reconsiler-audit.js new file mode 100644 index 0000000..0c44570 --- /dev/null +++ b/src/reconsiler-audit.js @@ -0,0 +1,126 @@ +const config = require('config') +const pg = require('pg') +const logger = require('./common/logger') +const pushToKafka = require('./services/pushToKafka') +const pgOptions = config.get('POSTGRES') +const postMessage = require('./services/posttoslack') +const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` +const pgClient = new pg.Client(pgConnectionString) +const auditTrail = require('./services/auditTrail'); +const port = 3000 + +async function setupPgClient() { + var payloadcopy + try { + await pgClient.connect() + //sql1= "select * from pgifx_sync_audit where syncstatus in ('Informix-updated') + //and auditdatetime >= (now()::date - interval '10m') and auditdatetime <= (now()::date - interval '5m') "; + //>= 12.53 and <= 12.48 + rec_d_start = config.RECONSILER.RECONSILER_START + rec_d_end = config.RECONSILER.RECONSILER_END + rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE + var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end]; + //var paramvalues = ['push-to-kafka','60002027']; + //sql1 = 'select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)' + sql1 = " select seq_id, payloadseqid,auditdatetime at time zone 'utc' at time zone 'Asia/Calcutta',syncstatus, payload where pgifx_sync_audit.syncstatus =($1)" + sql2 = " and pgifx_sync_audit.auditdatetime >= DATE(NOW()) - INTERVAL '1"+ rec_d_type + "' * ($2)" + sql3 = " and pgifx_sync_audit.auditdatetime <= DATE(NOW()) - INTERVAL '1"+ rec_d_type + "' * ($3)" + sql = sql1 + sql2 + sql3 + console.log('sql ', sql) + // sql3 = ' select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)' + // sql4 = " and pgifx_sync_audit.payloadseqid = ($2)" + + //sql = sql3 + sql4 + //const result = await pgClient.query(sql1, (err, res) => { + await pgClient.query(sql,paramvalues, async (err,result) => { + if (err) { + var errmsg0 = `error-sync: Audit reconsiler query "${err.message}"` + logger.debug (errmsg0) + // await callposttoslack(errmsg0) + } + else{ + console.log("Reconsiler Rowcount = ", result.rows.length) + for (var i = 0; i < result.rows.length; i++) { + for(var columnName in result.rows[i]) { + // console.log('column "%s" has a value of "%j"', columnName, result.rows[i][columnName]); + //if ((columnName === 'seq_id') || (columnName === 'payload')){ + if ((columnName === 'payload')){ + var reconsiler_payload = result.rows[i][columnName] + } + }//column for loop + try { + var s_payload = reconsiler_payload + payload = JSON.parse(s_payload) + payload1 = payload.payload + await pushToKafka(payload1) + logger.info('Reconsiler : Push to kafka and added for audit trail') + audit(s_payload) + } catch (error) { + logger.error('Reconsiler: Could not parse message payload') + logger.debug(`error-sync: Reconsiler parse message : "${error.message}"`) + const errmsg1 = `postgres-ifx-processor: Reconsiler : Error Parse or payload : "${error.message}" \n payload : "${payloadcopy.payload}"` + logger.logFullError(error) + audit(error) + await callposttoslack(errmsg1) + } + }//result for loop + } + pgClient.end() + }) + }catch (err) { + const errmsg = `postgres-ifx-processor: Reconsiler : Error in setting up postgres client: "${err.message}"` + logger.error(errmsg) + logger.logFullError(err) + await callposttoslack(errmsg) + terminate() + } +} + +const terminate = () => process.exit() + +async function run() { + logger.debug("Initialising Reconsiler setup...") + await setupPgClient() +} + +async function callposttoslack(slackmessage) { + if (config.SLACK.SLACKNOTIFY === 'true') { + return new Promise(function (resolve, reject) { + postMessage(slackmessage, (response) => { + console.log(`respnse : ${response}`) + if (response.statusCode < 400) { + logger.debug('Message posted successfully'); + //callback(null); + } else if (response.statusCode < 500) { + const errmsg1 = `Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` + logger.debug(`error-sync: ${errmsg1}`) + } else { + logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + } + resolve("done") + }); + }) //end + } + +} +// execute +run() + +async function audit(message) { + const pl_processid = 5555 + const jsonpayload = JSON.parse(message) + payload = JSON.parse(jsonpayload.payload) + payload1 = payload.payload + const pl_seqid = payload1.payloadseqid + const pl_topic = payload1.topic // TODO can move in config ? + const pl_table = payload1.table + const pl_uniquecolumn = payload1.Uniquecolumn + const pl_operation = payload1.operation + const pl_timestamp = payload1.timestamp + //const pl_payload = JSON.stringify(payload.payload) + const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` + logger.debug(`reconsiler : ${logMessage}`); + //await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka-reconsiler", "", "reconsiler", "", "", new Date(), ""], 'producer') +} + diff --git a/src/services/auditTrail.js b/src/services/auditTrail.js index fcd26a5..3f45f95 100644 --- a/src/services/auditTrail.js +++ b/src/services/auditTrail.js @@ -5,7 +5,7 @@ const logger = require('../common/logger') const pgOptions = config.get('POSTGRES') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` let pgClient2 - +//console.log(`"${pgConnectionString}"`); async function setupPgClient2 () { pgClient2 = new pg.Client(pgConnectionString) try { @@ -24,20 +24,28 @@ if (!pgClient2) { await setupPgClient2() } if (sourcetype === 'producer'){ - sql = 'INSERT INTO tcs_catalog.producer_scorecard_audit(payloadseqid,origin_source,kafka_post_status,topic_name,table_name,Uniquecolumn,operationtype,errormessage,payloadtime,auditdatetime,payload) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)' -logger.debug(`--Audit Trail update producer--`) + sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)' + sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus) = ($6) where pgifx_sync_audit.payloadseqid = $1'; + sql = sql0 + sql1 + logger.debug(`--Audit Trail update producer--`) } else { -sql = 'INSERT INTO tcs_catalog.consumer_scorecard_audit(payloadseqid,origin_source,table_name,Uniquecolumn,operationtype,dest_db_status, dest_retry_count,errormessage,payloadtime,auditdatetime,dest_operationquery) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)' -logger.debug(`--Audit Trail update consumer--`) + sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)' + sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus,consumer_err,retrycount) = ($6,$8,$7)'; + // where pgifx_sync_audit.payloadseqid = $1'; + //and pgifx_sync_audit.processId = $2'; + sql = sql0 + sql1 + logger.debug(`--Audit Trail update consumer--`) + //logger.debug(`sql values "${sql}"`); } return pgClient2.query(sql, data, (err, res) => { if (err) { logger.debug(`--Audit Trail update error-- ${err.stack}`) //pgClient2.end() } else { - logger.debug(`--Audit Trail update success-- `) + // logger.debug(`--Audit Trail update success-- `) } }) +pgClient2.end() } diff --git a/src/services/posttoslack.js b/src/services/posttoslack.js new file mode 100644 index 0000000..f5cf15a --- /dev/null +++ b/src/services/posttoslack.js @@ -0,0 +1,42 @@ +const config = require('config'); +const zlib = require('zlib'); +const url = require('url'); +const https = require('https'); +hookUrl = config.SLACK.URL +slackChannel = config.SLACK.SLACKCHANNEL +async function postMessage(message, callback) { + var slackMessage = { + channel: `${slackChannel}`, + text: `${message}`, + } + const body = JSON.stringify(slackMessage); + const options = url.parse(hookUrl); + options.method = 'POST'; + options.headers = { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body), + }; + return new Promise(function (resolve, reject) { + const postReq = https.request(options, (res) => { + const chunks = []; + res.setEncoding('utf8'); + res.on('data', (chunk) => chunks.push(chunk)); + res.on('end', () => { + if (callback) { + callback({ + body: chunks.join(''), + statusCode: res.statusCode, + statusMessage: res.statusMessage, + }); + } + resolve(res) + }); + // return res; + }); + + postReq.write(body); + postReq.end(); + }) +} + +module.exports = postMessage diff --git a/src/services/pushToDynamoDb.js b/src/services/pushToDynamoDb.js new file mode 100644 index 0000000..78d995d --- /dev/null +++ b/src/services/pushToDynamoDb.js @@ -0,0 +1,30 @@ +const config = require('config') +const logger = require('../common/logger') +const _ = require('lodash') +var AWS = require("aws-sdk"); +async function pushToDynamoDb(payload) { + try { console.log('----Push To DynomoDB -------'); + var params = { + TableName: config.DYNAMODB.DYNAMODB_TABLE, + Item: { + payloadseqid: payload.payload.payloadseqid, + pl_document: payload, + pl_table: payload.payload.table, + pl_uniquecolumn: payload.payload.Uniquecolumn, + pl_operation: payload.payload.operation, + pl_time: payload.timestamp, + timestamp: Date.now() + } + } + var docClient = new AWS.DynamoDB.DocumentClient({region: 'us-east-1',convertEmptyValues: true}); + docClient.put(params, function(err, data) { + if (err) logger.error(err); + else logger.info(data); + }); + + } catch (e) { + logger.error(`error-sync: Error at PushToDynamoDB "${e}"`) + } +} + +module.exports = pushToDynamoDb diff --git a/src/services/updateInformix.js b/src/services/updateInformix.js index 5098162..70c868d 100644 --- a/src/services/updateInformix.js +++ b/src/services/updateInformix.js @@ -28,11 +28,11 @@ async function updateInformix (payload) { sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update : set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val" } break - // case 'delete': - // { - // sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from :
where primary_key_col=primary_key_val" - // } - // break + case 'delete': + { + sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from :
where primary_key_col=primary_key_val" + } + break default: throw new Error(`Operation ${operation} is not supported`) }