diff --git a/.circleci/config.yml b/.circleci/config.yml index a1acf0c..9df60cb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,10 +6,11 @@ defaults: &defaults install_dependency: &install_dependency name: Installation of build and deployment dependencies. command: | + sudo apt-get update sudo apt install jq sudo pip install awscli --upgrade sudo pip install docker-compose - sudo apt-get install default-jdk + sudo apt-get install default-jdk --fix-missing install_deploysuite: &install_deploysuite name: Installation of install_deploysuite. #Git Clone -change back to v1.3 or latest once counter var is generalized. @@ -39,30 +40,42 @@ build_steps: &build_steps - deploy: name: Running MasterScript. command: | + ./awsconfiguration.sh ${DEPLOY_ENV} source awsenvconf - - # ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar + #scorecard test consumer remove later + #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer_scorecard-deployvar #source buildenvvar #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - echo "Running Masterscript - deploy postgres-ifx-processer producer" - if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi - ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar + #scorecard test producer remove later + #echo "Running Masterscript - deploy postgres-ifx-processer producer" + #if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi + #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_scorecard-deployvar + #source buildenvvar + #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar source buildenvvar ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + + #echo "Running Masterscript - deploy postgres-ifx-processer producer" + #if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi + #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar + #source buildenvvar + #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - echo "Running Masterscript - deploy postgres-ifx-processer producer_dd" - if [ -e ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json; fi - ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar - source buildenvvar - ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - - #echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1" - #if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi - #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar + #echo "Running Masterscript - deploy postgres-ifx-processer producer_dd" + #if [ -e ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json; fi + #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar #source buildenvvar #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + + echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1" + if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer jobs: # Build & Deploy against development backend # @@ -103,13 +116,12 @@ workflows: branches: only: - dev - - dev-retryfeature - "build-test": context : org-global filters: branches: - only: - - dev-test-pg + only: + - dev-test-pg - dev-test-pg-rf - "build-prod": context : org-global diff --git a/config/default.js b/config/default.js index 0fc39a8..859d680 100644 --- a/config/default.js +++ b/config/default.js @@ -22,8 +22,8 @@ module.exports = { database: process.env.PG_DATABASE || 'postgres', // database must exist before running the tool password: process.env.PG_PASSWORD || 'password', port: parseInt(process.env.PG_PORT, 10) || 5432, - triggerFunctions: process.env.TRIGGER_FUNCTIONS || ['dev_db_notifications'], // List of trigger functions to listen to - triggerTopics: process.env.TRIGGER_TOPICS || ['dev.db.postgres.sync'], // Names of the topic in the trigger payload + triggerFunctions: process.env.TRIGGER_FUNCTIONS || ['test_db_notifications'], // List of trigger functions to listen to + triggerTopics: process.env.TRIGGER_TOPICS || ['test.db.postgres.sync'], // Names of the topic in the trigger payload triggerOriginators: process.env.TRIGGER_ORIGINATORS || ['tc-postgres-delta-processor'] // Names of the originator in the trigger payload }, KAFKA: { // Kafka connection options @@ -34,32 +34,36 @@ module.exports = { }, topic: process.env.KAFKA_TOPIC || 'db.topic.sync', // Kafka topic to push and receive messages partition: process.env.partition || [0], // Kafka partitions to use - maxRetry: process.env.MAX_RETRY || 3, + maxRetry: process.env.MAX_RETRY || 10, errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error', - recipients: ['admin@abc.com'] // Kafka partitions to use + recipients: ['admin@abc.com'], // Kafka partitions to use, + KAFKA_URL: process.env.KAFKA_URL, + KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'test-postgres-ifx-consumer', + KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT ? process.env.KAFKA_CLIENT_CERT.replace('\\n', '\n') : null, + KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ? process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null, }, SLACK: { URL: process.env.SLACKURL || 'us-east-1', SLACKCHANNEL: process.env.SLACKCHANNEL || 'ifxpg-migrator', - SLACKNOTIFY: process.env.SLACKNOTIFY || 'false' + SLACKNOTIFY: process.env.SLACKNOTIFY || 'false' }, - RECONSILER:{ + RECONSILER: { RECONSILER_START: process.env.RECONSILER_START || 5, RECONSILER_END: process.env.RECONSILER_END || 1, RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm' }, DYNAMODB: - { - DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'dev_pg_ifx_payload_sync', - DD_ElapsedTime: process.env.DD_ElapsedTime || 600000 - }, + { + DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync', + DD_ElapsedTime: process.env.DD_ElapsedTime || 600000 + }, - AUTH0_URL: process.env.AUTH0_URL , - AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE , - TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME , - AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID , - AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET , - BUSAPI_URL : process.env.BUSAPI_URL , - KAFKA_ERROR_TOPIC : process.env.KAFKA_ERROR_TOPIC , - AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL + AUTH0_URL: process.env.AUTH0_URL, + AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE, + TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME, + AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID, + AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET, + BUSAPI_URL: process.env.BUSAPI_URL, + KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC, + AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL } diff --git a/informix-identity-trigger-proc.sql b/informix-identity-trigger-proc.sql index 21ed378..bb41b66 100644 --- a/informix-identity-trigger-proc.sql +++ b/informix-identity-trigger-proc.sql @@ -1,13 +1,15 @@ database common_oltp + DROP PROCEDURE proc_user_update(varchar,decimal); -DROP PROCEDURE proc_user_update; CREATE PROCEDURE informix.proc_user_update( new_handle varchar(50), user_id decimal(10,0)) if (USER != 'ifxsyncuser') then UPDATE user SET handle_lower = lower(new_handle), modify_date = current WHERE user.user_id = user_id; End if; -end procedure; +end procedure; + +DROP PROCEDURE proc_user_update; create procedure "informix".proc_user_update( user_id DECIMAL(10,0), old_first_name VARCHAR(64), @@ -25,7 +27,7 @@ new_middle_name VARCHAR(64), old_timezone_id decimal(5,0), new_timezone_id decimal(5,0) ) - + if (USER != 'ifxsyncuser') then if ((old_first_name != new_first_name) or (old_last_name != new_last_name ) or (old_middle_name != new_middle_name )) then insert into audit_user (column_name, old_value, new_value, user_id) @@ -57,7 +59,7 @@ user_id) user_id) values ('TIMEZONE_ID', old_timezone_id, new_timezone_id, user_id); End If; - if (USER != 'ifxsyncuser') then + UPDATE user SET handle_lower = lower(new_handle), modify_date = current WHERE user.user_id = user_id; End if; end procedure; @@ -75,7 +77,7 @@ new_primary_ind DECIMAL(1,0), old_status_id DECIMAL(3,0), new_status_id DECIMAL(3,0) ) - + if (USER != 'ifxsyncuser') then if (old_email_type_id != new_email_type_id) then insert into audit_user (column_name, old_value, new_value, user_id) values ('EMAIL_TYPE', old_email_type_id, new_email_type_id, user_id); @@ -95,12 +97,122 @@ new_status_id DECIMAL(3,0) insert into audit_user (column_name, old_value, new_value, user_id) values ('EMAIL_PRIMARY_IND', old_primary_ind, new_primary_ind, user_id); End If; - if (USER != 'ifxsyncuser') then + update email set modify_date = current where email.email_id = email_id; End if; end procedure; +DROP PROCEDURE informix.proc_user_last_login; +CREATE PROCEDURE informix.proc_user_last_login (user_id DECIMAL(10,0), o_last_login DATETIME YEAR TO FRACTION, +n_last_login DATETIME YEAR TO FRACTION) + + if (o_last_login != n_last_login) then + if (USER != 'ifxsyncuser') then + insert into corona_event (corona_event_type_id,user_id, corona_event_timestamp) values (1, user_id, n_last_login); + end if; + End if; +end procedure; + +DROP PROCEDURE informix.proc_phone_update; +CREATE PROCEDURE informix.proc_phone_update( +phone_id decimal(10,0), +user_id DECIMAL(10,0), +old_phone_type_id DECIMAL(5,0), +new_phone_type_id DECIMAL(5,0), +old_number VARCHAR(64), +new_number VARCHAR(64), +old_primary_ind DECIMAL(1,0), +new_primary_ind DECIMAL(1,0) +) + if (USER != 'ifxsyncuser') then + if (old_phone_type_id != new_phone_type_id) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_TYPE', old_phone_type_id, new_phone_type_id, user_id); + End If; + + if (old_number != new_number) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_NUMBER', old_number, new_number, user_id); + End If; + + if (old_primary_ind != new_primary_ind) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_PRIMARY_IND', old_primary_ind, new_primary_ind, user_id); + End If; +update phone set modify_date = current where phone.phone_id = phone_id; +End if; +end procedure; + +DROP PROCEDURE informix.proc_address_update; +CREATE PROCEDURE informix.proc_address_update( + address_id DECIMAL(10,0), + old_address_type_id DECIMAL(5,0), + new_address_type_id DECIMAL(5,0), + old_address1 VARCHAR(254), + new_address1 VARCHAR(254), + old_address2 VARCHAR(254), + new_address2 VARCHAR(254), + old_address3 VARCHAR(254), + new_address3 VARCHAR(254), + old_city VARCHAR(64), + new_city VARCHAR(64), + old_state_code CHAR(2), + new_state_code CHAR(2), + old_province VARCHAR(64), + new_province VARCHAR(64), + old_zip VARCHAR(15), + new_zip VARCHAR(15), + old_country_code CHAR(3), + new_country_code CHAR(3) +) + define user_id DECIMAL(10,0); + let user_id = NVL((select min(x.user_id) from user_address_xref x where x.address_id = address_id), -1); + if (USER != 'ifxsyncuser') then + if (user_id > 0 and old_address1 != new_address1) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS1', old_address1, new_address1, user_id); + End If; + if (user_id > 0 and old_address2 != new_address2) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS2', old_address2, new_address2, user_id); + End If; + if (user_id > 0 and old_address3 != new_address3) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS3', old_address3, new_address3, user_id); + End If; + if (user_id > 0 and old_city != new_city) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS_CITY', old_city, new_city, user_id); + End If; + if (user_id > 0 and old_state_code != new_state_code) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS_STATE', old_state_code, new_state_code, user_id); + End If; + if (user_id > 0 and old_province != new_province) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS_PROVINCE', old_province, new_province, user_id); + End If; + if (user_id > 0 and old_zip != new_zip) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS_ZIP', old_zip, new_zip, user_id); + End If; + if (user_id > 0 and old_country_code != new_country_code) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS_COUNTRY', old_country_code, new_country_code, user_id); + End If; + update address set modify_date = current where address.address_id = address_id; + End if; +end procedure; + database informixoltp DROP PROCEDURE informix.proc_coder_update; CREATE PROCEDURE informix.proc_coder_update( @@ -110,7 +222,8 @@ v_oldlanguage_id decimal(3,0), v_newlanguage_id decimal(3,0), v_oldcoder_type_id decimal(3,0), v_newcoder_type_id decimal(3,0), v_oldcomp_country_code varchar(3), v_newcomp_country_code varchar(3) ) - + if (USER != 'ifxsyncuser') then + if (v_oldquote != v_newquote) then insert into audit_coder (column_name, old_value, new_value, user_id) values ('QUOTE', v_oldquote , v_newquote, v_oldcoder_id); @@ -131,10 +244,8 @@ v_oldcomp_country_code varchar(3), v_newcomp_country_code varchar(3) values ('COMP_COUNTRY', v_oldcomp_country_code , v_newcomp_country_code, v_oldcoder_id); End if; - if (USER != 'ifxsyncuser') then update coder set modify_date = current where coder_id = v_oldcoder_id; End if; - end procedure; database tcs_catalog; @@ -155,7 +266,7 @@ new_rating decimal(5,4) update user_reliability set modify_date = current where user_id = p_user_id and phase_id = p_phase_id; End if; end procedure; - + DROP PROCEDURE proc_rating_update; CREATE PROCEDURE informix.proc_rating_update( p_user_id DECIMAL(10,0), diff --git a/package.json b/package.json index 5f68789..179b315 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,7 @@ "dependencies": { "aws-sdk": "*", "config": "^3.2.2", - "informix-wrapper": "git+https://github.com/appirio-tech/informix-wrapper.git", + "informix-wrapper": "git+https://github.com/appirio-tech/informix-wrapper.git#prepare_stmt_fix", "no-kafka": "^3.4.3", "pg": "^7.12.1", "sleep": "^6.1.0", diff --git a/pg-identity-func-trig-seq2.sql b/pg-identity-func-trig-seq2.sql new file mode 100644 index 0000000..60d61f0 --- /dev/null +++ b/pg-identity-func-trig-seq2.sql @@ -0,0 +1,732 @@ +SET search_path TO common_oltp; + +CREATE TABLE sync_test_id +( + uniqid INTEGER NOT NULL, + description varchar(200), + created_at TIMESTAMP(6) WITH TIME ZONE DEFAULT now(), + PRIMARY KEY (uniqid) + ); +CREATE TRIGGER "pg_sync_test_id_trigger" + AFTER INSERT OR DELETE OR UPDATE ON sync_test_id + FOR EACH ROW +EXECUTE PROCEDURE common_oltp.notify_trigger_common_oltp('uniqid', 'description', 'created_at'); + +ALTER TABLE "common_oltp"."sync_test_id" disable TRIGGER "pg_sync_test_id_trigger" + +CREATE OR REPLACE FUNCTION "common_oltp"."notify_trigger_common_oltp" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + rec RECORD; + payload TEXT; + column_name TEXT; + column_value TEXT; + pguserval TEXT; + --payload_items TEXT[]; + payload_items JSONB; + uniquecolumn TEXT; + logtime TEXT; + payloadseqid INTEGER; +BEGIN + +pguserval := (SELECT current_user); + if pguserval = 'pgsyncuser' then + RAISE notice 'pgsyncuser name : %', pguserval; + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + return rec; + -- else + end if; + + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + raise notice 'table name : %', TG_TABLE_NAME; + RAISE info 'hello world'; + -- Get required fields + FOREACH column_name IN ARRAY TG_ARGV LOOP + EXECUTE format('SELECT $1.%I::TEXT', column_name) + INTO column_value + USING rec; + case + when + column_name = 'upload_document' then + -- RAISE NOTICE 'upload_document boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'upload_document_required' then + -- RAISE NOTICE 'upload_document_required boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_email_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_handle_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'social_email_verified' then + if column_value = 'false' then + column_value = 'f'; + else + column_value = 't'; + end if; + when + column_name = 'create_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'modify_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'last_login' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'last_site_hit_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'corona_event_timestamp' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'created_at' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + else + -- RAISE NOTICE ' not boolean'; + end case; + --payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb; + payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,replace(column_value,'"','\"'))::jsonb; + -- payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); + END LOOP; + --logtime := (select date_display_tz()); + logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('common_oltp.payloadsequence'::regclass)); + + uniquecolumn := (SELECT c.column_name + FROM information_schema.key_column_usage AS c + LEFT JOIN information_schema.table_constraints AS t + ON t.constraint_name = c.constraint_name + WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1); + + if (uniquecolumn = '') IS NOT FALSE then + uniquecolumn := 'Not-Available'; + end if; + + -- exclude any null value columns. + payload_items := jsonb_strip_nulls(payload_items); + + -- Build the payload + payload := '' + || '{' + || '"topic":"' || 'dev.db.postgres.sync' || '",' + || '"originator":"' || 'tc-postgres-delta-processor' || '",' + || '"timestamp":"' || logtime || '",' + || '"mime-type":"' || 'application/json' || '",' + || '"payload": {' + || '"payloadseqid":"' || payloadseqid || '",' + || '"Uniquecolumn":"' || uniquecolumn || '",' + || '"operation":"' || TG_OP || '",' + || '"schema":"' || TG_TABLE_SCHEMA || '",' + || '"table":"' || TG_TABLE_NAME || '",' + || '"data": ' || payload_items + || '}}'; + -- Notify the channel + PERFORM pg_notify('dev_db_notifications', payload); + RETURN rec; +END; +$body$ LANGUAGE plpgsql + +CREATE OR REPLACE FUNCTION "common_oltp"."proc_email_update" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE +pguserval TEXT; +BEGIN + pguserval := (SELECT current_user); + if pguserval != 'pgsyncuser' then + if (OLD.email_type_id != NEW.email_type_id) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('EMAIL_TYPE', OLD.email_type_id, NEW.email_type_id, OLD.user_id); + End If; + + if (OLD.status_id != NEW.status_id) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('EMAIL_STATUS', OLD.status_id, NEW.status_id, OLD.user_id); + End If; + + if (OLD.address != NEW.address) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('EMAIL_ADDRESS', OLD.address, NEW.address, OLD.user_id); + End If; + + if (OLD.primary_ind != NEW.primary_ind) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('EMAIL_PRIMARY_IND', OLD.primary_ind, NEW.primary_ind, OLD.user_id); + End If; + + -- if pguserval != 'pgsyncuser' then + NEW.modify_date = current_timestamp; + end if; + + + RETURN NEW; +END; +$body$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION "common_oltp"."proc_phone_update" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE +pguserval TEXT; +BEGIN +pguserval := (SELECT current_user); +if pguserval != 'pgsyncuser' then + if (OLD.phone_type_id != NEW.phone_type_id) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_TYPE', OLD.phone_type_id, NEW.phone_type_id, OLD.user_id); + End If; + + if (OLD.phone_number != NEW.phone_number) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_NUMBER', OLD.phone_number, NEW.phone_number, OLD.user_id); + End If; + + if (OLD.primary_ind != NEW.primary_ind) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_PRIMARY_IND', OLD.primary_ind, NEW.primary_ind, OLD.user_id); + End If; + + NEW.modify_date = current_timestamp; + end if; + RETURN NEW; +END; +$body$ LANGUAGE plpgsql + +CREATE OR REPLACE FUNCTION "common_oltp"."proc_user_update" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE +pguserval TEXT; +BEGIN +pguserval := (SELECT current_user); +if pguserval != 'pgsyncuser' then + IF (TG_OP = 'UPDATE') THEN + if ((OLD.first_name != NEW.first_name) or (OLD.last_name != NEW.last_name ) or (OLD.middle_name != NEW.middle_name )) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('NAME', NULLIF(OLD.first_name, '') || ' ' || NULLIF(OLD.middle_name, '') || ' ' || NULLIF(OLD.last_name, ''), + NULLIF(NEW.first_name, '') || ' ' || NULLIF(NEW.middle_name, '') || ' ' || NULLIF(NEW.last_name, ''), OLD.user_id); + End if; + + if (OLD.handle != NEW.handle) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('HANDLE', OLD.handle, NEW.handle, OLD.user_id); + End If; + + if (OLD.status != NEW.status) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('STATUS', OLD.status, NEW.status, OLD.user_id); + End If; + + if (OLD.activation_code != NEW.activation_code) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('ACTIVATION_CODE', OLD.activation_code, NEW.activation_code, OLD.user_id); + End If; + + if (OLD.timezone_id != NEW.timezone_id) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('TIMEZONE_ID', OLD.timezone_id, NEW.timezone_id, OLD.user_id); + End If; + + + NEW.modify_date = current_timestamp; + end if; + + + END IF; + + NEW.handle_lower = lower(NEW.handle); + + RETURN NEW; +END; +$body$ LANGUAGE plpgsql + +CREATE OR REPLACE FUNCTION "common_oltp"."proc_address_update" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE +pguserval TEXT; + user_id DECIMAL(10,0); +BEGIN + user_id := NULLIF((select min(x.user_id) from user_address_xref x where x.address_id = OLD.address_id), -1); + pguserval := (SELECT current_user); + if pguserval != 'pgsyncuser' then + if (user_id > 0 and OLD.address1 != NEW.address1) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS1', OLD.address1, NEW.address1, user_id); + End If; + + if (user_id > 0 and OLD.address2 != NEW.address2) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS2', OLD.address2, NEW.address2, user_id); + End If; + + if (user_id > 0 and OLD.address3 != NEW.address3) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS3', OLD.address3, NEW.address3, user_id); + End If; + + if (user_id > 0 and OLD.city != NEW.city) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS_CITY', OLD.city, NEW.city, user_id); + End If; + + if (user_id > 0 and OLD.state_code != NEW.state_code) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS_STATE', OLD.state_code, NEW.state_code, user_id); + End If; + + if (user_id > 0 and OLD.province != NEW.province) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS_PROVINCE', OLD.province, NEW.province, user_id); + End If; + + if (user_id > 0 and OLD.zip != NEW.zip) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS_ZIP', OLD.zip, NEW.zip, user_id); + End If; + + if (user_id > 0 and OLD.country_code != NEW.country_code) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS_COUNTRY', OLD.country_code, NEW.country_code, user_id); + End If; + + NEW.modify_date = current_timestamp; + end if; + RETURN NEW; +END; +$body$ LANGUAGE plpgsql + + +CREATE OR REPLACE FUNCTION "common_oltp"."proc_user_last_login" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE +pguserval TEXT; +BEGIN +pguserval := (SELECT current_user); + if pguserval != 'pgsyncuser' then + if (OLD.last_login != NEW.last_login) then + insert into common_oltp.corona_event(corona_event_type_id, user_id, corona_event_timestamp) + values (1, OLD.user_id, NEW.last_login); + end if; + end if; + + RETURN NULL; +END; +$body$ LANGUAGE plpgsql + + +CREATE TRIGGER "pg_security_groups_trigger" + AFTER INSERT OR DELETE OR UPDATE ON security_groups + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('group_id', 'description', 'challenge_group_ind', 'create_user_id'); + + CREATE TRIGGER "pg_social_login_provider_trigger" +AFTER INSERT OR DELETE OR UPDATE ON social_login_provider +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('social_login_provider_id', 'name'); + + +CREATE TRIGGER "pg_sso_login_provider_trigger" +AFTER INSERT OR DELETE OR UPDATE ON sso_login_provider +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('sso_login_provider_id', 'name','type','identify_email_enabled','identify_handle_enabled'); + +CREATE TRIGGER "pg_Country_trigger" +AFTER INSERT OR DELETE OR UPDATE ON Country +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('country_code', 'country_name','modify_date','participating','default_taxform_id','longitude','latitude','region','iso_name','iso_alpha2_code','iso_alpha3_code'); + +CREATE TRIGGER "pg_invalid_handles_trigger" +AFTER INSERT OR DELETE OR UPDATE ON invalid_handles +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('invalid_handle_id', 'invalid_handle'); + +CREATE TRIGGER "pg_achievement_type_lu_trigger" +AFTER INSERT OR DELETE OR UPDATE ON achievement_type_lu +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('achievement_type_id','achievement_type_desc'); + + + +ALTER TABLE "user" DISABLE TRIGGER pg_user_trigger; +ALTER TABLE email DISABLE TRIGGER pg_email_trigger; +ALTER TABLE security_user DISABLE TRIGGER pg_security_user_trigger; +ALTER TABLE user_sso_login DISABLE TRIGGER pg_user_sso_login_trigger; +ALTER TABLE user_achievement DISABLE TRIGGER pg_user_achievement_trigger; +ALTER TABLE user_group_xref DISABLE TRIGGER pg_user_group_xref_trigger; +ALTER TABLE security_groups DISABLE TRIGGER pg_security_groups_trigger; +ALTER TABLE user_social_login DISABLE TRIGGER pg_user_social_login_trigger; +ALTER TABLE social_login_provider DISABLE TRIGGER pg_social_login_provider_trigger; +ALTER TABLE sso_login_provider DISABLE TRIGGER pg_sso_login_provider_trigger; +ALTER TABLE country DISABLE TRIGGER pg_country_trigger; +ALTER TABLE invalid_handles DISABLE TRIGGER pg_invalid_handles_trigger; +ALTER TABLE achievement_type_lu DISABLE TRIGGER pg_achievement_type_lu_trigger; +ALTER TABLE corana_event DISABLE TRIGGER pg_corana_event_trigger; +ALTER TABLE audit_user DISABLE TRIGGER pg_audit_user_trigger; + +DROP sequence "common_oltp"."sequence_user_group_seq"; +CREATE SEQUENCE sequence_user_group_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 +START WITH 951000000 NO CYCLE; + +DROP sequence "common_oltp"."sequence_user_seq"; +CREATE SEQUENCE sequence_user_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH +488770000 NO CYCLE; + +ALTER SEQUENCE corona_event_corona_event_id_seq RESTART WITH 577770000; + +SET search_path TO informixoltp; + +CREATE OR REPLACE FUNCTION "informixoltp"."notify_trigger_informixoltp" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + rec RECORD; + payload TEXT; + column_name TEXT; + column_value TEXT; + -- payload_items TEXT[]; + payload_items JSONB; + pguserval TEXT; + uniquecolumn TEXT; + logtime TEXT; + payloadseqid INTEGER; +BEGIN + +pguserval := (SELECT current_user); + if pguserval = 'pgsyncuser' then + RAISE notice 'pgsyncuser name : %', pguserval; + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + return rec; + -- else + end if; + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + raise notice 'table name : %', TG_TABLE_NAME; + -- RAISE info 'hello world'; + -- Get required fields + FOREACH column_name IN ARRAY TG_ARGV LOOP + EXECUTE format('SELECT $1.%I::TEXT', column_name) + INTO column_value + USING rec; + case + when + column_name = 'upload_document' then + -- RAISE NOTICE 'upload_document boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'upload_document_required' then + -- RAISE NOTICE 'upload_document_required boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_email_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_handle_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'create_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'modify_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'member_since' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + else + -- RAISE NOTICE ' not boolean'; + end case; + -- payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); + --payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb; + payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,replace(column_value,'"','\"'))::jsonb; + END LOOP; + logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('common_oltp.payloadsequence'::regclass)); + + uniquecolumn := (SELECT c.column_name + FROM information_schema.key_column_usage AS c + LEFT JOIN information_schema.table_constraints AS t + ON t.constraint_name = c.constraint_name + WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1); + + if (uniquecolumn = '') IS NOT FALSE then + uniquecolumn := 'Not-Available'; + end if; + + -- exclude any null value columns. + payload_items := jsonb_strip_nulls(payload_items); + + -- Build the payload + payload := '' + || '{' + || '"topic":"' || 'dev.db.postgres.sync' || '",' + || '"originator":"' || 'tc-postgres-delta-processor' || '",' + || '"timestamp":"' || logtime || '",' + || '"mime-type":"' || 'application/json' || '",' + || '"payload": {' + || '"payloadseqid":"' || payloadseqid || '",' + || '"Uniquecolumn":"' || uniquecolumn || '",' + || '"operation":"' || TG_OP || '",' + || '"schema":"' || TG_TABLE_SCHEMA || '",' + || '"table":"' || TG_TABLE_NAME || '",' + || '"data": ' || payload_items + || '}}'; + + -- Notify the channel + PERFORM pg_notify('dev_db_notifications', payload); + + RETURN rec; +END; +$body$ LANGUAGE plpgsql + +CREATE OR REPLACE FUNCTION "informixoltp"."proc_coder_update" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + pguserval TEXT; +begin + if (OLD.quote != NEW.quote) then + insert into audit_coder (column_name, old_value, new_value, user_id) + values ('QUOTE', OLD.quote , NEW.quote, OLD.coder_id); + end if; + + if (OLD.coder_type_id != NEW.coder_type_id) then + insert into audit_coder (column_name, old_value, new_value, user_id) + values ('CODER_TYPE', OLD.coder_type_id , NEW.coder_type_id, OLD.coder_id); + end if; + if (OLD.language_id != NEW.language_id) then + insert into audit_coder (column_name, old_value, new_value, user_id) + values ('LANGUAGE', OLD.language_id , NEW.language_id, OLD.coder_id); + end if; + if (OLD.comp_country_code != NEW.comp_country_code) then + insert into audit_coder (column_name, old_value, new_value, user_id) + values ('COMP_COUNTRY', OLD.comp_country_code , NEW.comp_country_code, OLD.coder_id); + end if; + pguserval := (SELECT current_user); + if pguserval != 'pgsyncuser' then + -- RAISE info 'current_user'; + -- raise notice 'inside current_user : %', current_user; + --update coder set modify_date = current_timestamp where coder_id = OLD.coder_id; + NEW.modify_date = current_timestamp; + end if; + + return NEW; +end ; +$body$ LANGUAGE plpgsql + + +CREATE TRIGGER "pg_coder_referral_trigger" +AFTER INSERT OR DELETE OR UPDATE ON coder_referral +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_informixoltp('coder_id', 'referral_id','reference_id','other'); + +ALTER TABLE coder DISABLE TRIGGER pg_coder; +ALTER TABLE algo_rating DISABLE TRIGGER pg_algo_rating; +ALTER TABLE coder_referral DISABLE TRIGGER pg_coder_referral_trigger; + +SET search_path TO tcs_catalog; + +CREATE OR REPLACE FUNCTION "tcs_catalog"."notify_trigger" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + rec RECORD; + payload TEXT; + column_name TEXT; + column_value TEXT; + --payload_items TEXT[]; + payload_items JSONB; + pguserval TEXT; + uniquecolumn TEXT; + logtime TEXT; + payloadseqid INTEGER; +BEGIN + +pguserval := (SELECT current_user); + if pguserval = 'pgsyncuser' then + RAISE notice 'pgsyncuser name : %', pguserval; + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + return rec; + -- else + end if; + + -- Set record row depending on operation + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + -- Get required fields + FOREACH column_name IN ARRAY TG_ARGV LOOP + EXECUTE format('SELECT $1.%I::TEXT', column_name) + INTO column_value + USING rec; + case + when + column_name = 'upload_document' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'upload_document_required' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + else + -- RAISE NOTICE ' not boolean'; + end case; + --payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); + --payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb; + payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,replace(column_value,'"','\"'))::jsonb; + + END LOOP; + logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('payloadsequence'::regclass)); + + uniquecolumn := (SELECT c.column_name + FROM information_schema.key_column_usage AS c + LEFT JOIN information_schema.table_constraints AS t + ON t.constraint_name = c.constraint_name + WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' limit 1); + + if (uniquecolumn = '') IS NOT FALSE then + uniquecolumn := 'Not-Available'; + end if; + -- exclude any null value columns. + payload_items := jsonb_strip_nulls(payload_items); + + RAISE Notice ' payload val: "%"', payload; + -- Build the payload + --payload := '' + -- || '{' + -- || '"topic":"' || 'dev.db.postgres.sync' || '",' + -- || '"originator":"' || 'tc-postgres-delta-processor' || '",' + -- || '"timestamp":"' || logtime || '",' + -- || '"mime-type":"' || 'application/json' || '",' + -- || '"payload": {' + -- || '"payloadseqid":"' || payloadseqid || '",' + -- || '"Uniquecolumn":"' || uniquecolumn || '",' + -- || '"operation":"' || TG_OP || '",' + -- || '"schema":"' || TG_TABLE_SCHEMA || '",' + -- || '"table":"' || TG_TABLE_NAME || '",' + -- || '"data": {' || array_to_string(payload_items, ',') || '}' + -- || '}}'; + + payload := '' + || '{' + || '"topic":"' || 'dev.db.postgres.sync' || '",' + || '"originator":"' || 'tc-postgres-delta-processor' || '",' + || '"timestamp":"' || logtime || '",' + || '"mime-type":"' || 'application/json' || '",' + || '"payload": {' + || '"payloadseqid":"' || payloadseqid || '",' + || '"Uniquecolumn":"' || uniquecolumn || '",' + || '"operation":"' || TG_OP || '",' + || '"schema":"' || TG_TABLE_SCHEMA || '",' + || '"table":"' || TG_TABLE_NAME || '",' + || '"data":' || payload_items + || '}}'; + + -- Notify the channel + PERFORM pg_notify('dev_db_notifications', payload); + + RETURN rec; +END; +$body$ LANGUAGE plpgsql; + + +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO pgsyncuser; +GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO pgsyncuser; + +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO coder; +GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO coder; + +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO postgres; +GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO postgres; + +grant USAGE ON SCHEMA common_oltp,informixoltp,tcs_catalog To pgsyncuser; +grant USAGE ON SCHEMA common_oltp,informixoltp,tcs_catalog To coder; +grant USAGE ON SCHEMA common_oltp,informixoltp,tcs_catalog To postgres; diff --git a/src/consumer.js b/src/consumer.js index 9531776..fd74484 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -10,16 +10,17 @@ const healthcheck = require('topcoder-healthcheck-dropin'); const auditTrail = require('./services/auditTrail'); const kafkaOptions = config.get('KAFKA') const postMessage = require('./services/posttoslack') -const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key -const consumer = new Kafka.SimpleConsumer({ - connectionString: kafkaOptions.brokers_url, - ...(isSslEnabled && { // Include ssl options if present - ssl: { - cert: kafkaOptions.SSL.cert, - key: kafkaOptions.SSL.key - } - }) -}) +//const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key + +const options = { + groupId: kafkaOptions.KAFKA_GROUP_ID, + connectionString: kafkaOptions.KAFKA_URL, + ssl: { + cert: kafkaOptions.KAFKA_CLIENT_CERT, + key: kafkaOptions.KAFKA_CLIENT_CERT_KEY + } +}; +const consumer = new Kafka.GroupConsumer(options); const check = function () { if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) { @@ -35,100 +36,114 @@ const check = function () { let cs_processId; const terminate = () => process.exit() + /** * * @param {Array} messageSet List of messages from kafka * @param {String} topic The name of the message topic * @param {Number} partition The kafka partition to which messages are written */ -var retryvar=""; +var retryvar = ""; //let cs_payloadseqid; async function dataHandler(messageSet, topic, partition) { -let cs_payloadseqid - for (const m of messageSet) { // Process messages sequentially + for (const m of messageSet) { // Process messages sequentially let message + let ifxstatus = 0 try { + // let ifxstatus = 0 + let cs_payloadseqid; message = JSON.parse(m.message.value) - logger.debug('Received message from kafka:') + //logger.debug(`Consumer Received from kafka :${JSON.stringify(message)}`) if (message.payload.payloadseqid) cs_payloadseqid = message.payload.payloadseqid; logger.debug(`consumer : ${message.payload.payloadseqid} ${message.payload.table} ${message.payload.Uniquecolumn} ${message.payload.operation} ${message.timestamp} `); - await updateInformix(message) - await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success - if (message.payload['retryCount']) retryvar = message.payload.retryCount; - auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, - message.payload.operation,"Informix-updated",retryvar,"","",JSON.stringify(message), message.timestamp,message.topic],'consumer') + //await updateInformix(message) + ifxstatus = await updateInformix(message) + // if (ifxstatus === 0 && `${message.payload.operation}` === 'INSERT') { + // logger.debug(`operation : ${message.payload.operation}`) + // logger.debug(`Consumer :informixt status for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus} - Retrying`) + // auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn, + // message.payload.operation, "push-to-kafka", retryvar, "", "", JSON.stringify(message), new Date(), message.topic], 'consumer') + // await retrypushtokakfa(message, topic, m, partition) + //} else { + logger.debug(`Consumer :informix status for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus}`) + if (message.payload['retryCount']) retryvar = message.payload.retryCount; + await auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn, + message.payload.operation, "Informix-updated", retryvar, "", "", JSON.stringify(message), new Date(), message.topic], 'consumer') + //} } catch (err) { - const errmsg2 = `error-sync: Could not process kafka message or informix DB error: "${err.message}"` + logger.debug(`Consumer:ifx return status error for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus}`) + const errmsg2 = `error-sync: Could not process kafka message or informix DB error: "${err.message}"` logger.error(errmsg2) - logger.debug(`error-sync: consumer "${err.message}"`) - if (!cs_payloadseqid){ - cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2);} -/* await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', - 'message.payload.operation',"Error-Consumer","",err.message,"",'message.payload.data',new Date(),'message.topic'],'consumer') - }else{ - auditTrail([cs_payloadseqid,4444,message.payload.table,message.payload.Uniquecolumn, - message.payload.operation,"Informix-updated",retryvar,"consumer2","",JSON.stringify(message), message.timestamp,message.topic],'consumer') - }*/ - - try { - if (message.payload['retryCount']) retryvar = message.payload.retryCount; - await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish - logger.debug(`Trying to push same message after adding retryCounter`) - if (!message.payload.retryCount) { - message.payload.retryCount = 0 - logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry); - } - if (message.payload.retryCount >= config.KAFKA.maxRetry) { - logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic); - logger.debug(`error-sync: consumer max-retry-limit reached`) - // push to slack - alertIt("slack message" - await callposttoslack(`error-sync: postgres-ifx-processor : consumer max-retry-limit reached: "${message.payload.table}": payloadseqid : "${cs_payloadseqid}"`) - let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic }) - notifiyMessage.payload['recipients'] = config.KAFKA.recipients - logger.debug('pushing following message on kafka error alert queue:') - //logger.debug(notifiyMessage) - await pushToKafka(notifiyMessage) - return - } - message.payload['retryCount'] = message.payload.retryCount + 1; - await pushToKafka(message) - var errmsg9 = `error-sync: Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"` - logger.debug(errmsg9) - //await callposttoslack(errmsg9) - } catch (err) { - - await auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, - message.payload.operation,"Error-republishing",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer') - const errmsg1 = `error-sync: postgres-ifx-processor: consumer : Error-republishing: "${err.message}"` - logger.error(errmsg1) - logger.debug(`error-sync: consumer re-publishing "${err.message}"`) - // push to slack - alertIt("slack message" - await callposttoslack(errmsg1) - } + logger.debug(`error-sync: consumer "${err.message}"`) + await retrypushtokakfa(message, topic, m, partition) + } finally { + await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success } } } +async function retrypushtokakfa(message, topic, m, partition) { + let cs_payloadseqid + if (message.payload.payloadseqid) cs_payloadseqid = message.payload.payloadseqid; + logger.debug(`Consumer : At retry function`) + if (!cs_payloadseqid) { + cs_payloadseqid = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); + } + try { + if (message.payload['retryCount']) retryvar = message.payload.retryCount; + logger.debug(`Trying to push same message after adding retryCounter`) + if (!message.payload.retryCount) { + message.payload.retryCount = 0 + logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry); + } + if (message.payload.retryCount >= config.KAFKA.maxRetry) { + logger.debug('Reached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic); + logger.debug(`error-sync: consumer max-retry-limit reached`) + //await callposttoslack(`error-sync: postgres-ifx-processor : consumer max-retry-limit reached: "${message.payload.table}": payloadseqid : "${cs_payloadseqid}"`) + let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic }) + notifiyMessage.payload['recipients'] = config.KAFKA.recipients + logger.debug('pushing following message on kafka error alert queue:') + //retry push to error topic kafka again + await pushToKafka(notifiyMessage) + return + } + message.payload['retryCount'] = message.payload.retryCount + 1; + await pushToKafka(message) + var errmsg9 = `consumer : Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"` + logger.debug(errmsg9) + } + catch (err) { + await auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn, + message.payload.operation, "Error-republishing", message.payload['retryCount'], err.message, "", message.payload.data, new Date(), message.topic], 'consumer') + const errmsg1 = `error-sync: postgres-ifx-processor: consumer : Error-republishing: "${err.message}"` + logger.error(errmsg1) + logger.debug(`error-sync: consumer re-publishing "${err.message}"`) + // await callposttoslack(errmsg1) + } finally { + await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish + } +} + async function callposttoslack(slackmessage) { -if(config.SLACK.SLACKNOTIFY === 'true') { + if (config.SLACK.SLACKNOTIFY === 'true') { return new Promise(function (resolve, reject) { - postMessage(slackmessage, (response) => { - console.log(`respnse : ${response}`) - if (response.statusCode < 400) { - logger.debug('Message posted successfully'); - //callback(null); - } else if (response.statusCode < 500) { - const errmsg1 =`Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` - logger.debug(`error-sync: ${errmsg1}`) - } - else { - logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); - //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); - } - resolve("done") - }); + postMessage(slackmessage, (response) => { + console.log(`respnse : ${response}`) + if (response.statusCode < 400) { + logger.debug('Message posted successfully'); + //callback(null); + } else if (response.statusCode < 500) { + const errmsg1 = `Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` + logger.debug(`error-sync: ${errmsg1}`) + } + else { + logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + } + resolve("done") + }); }) //end -} + } } @@ -137,16 +152,17 @@ if(config.SLACK.SLACKNOTIFY === 'true') { */ async function setupKafkaConsumer() { try { - await consumer.init() - //await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler) - await consumer.subscribe(kafkaOptions.topic, dataHandler) - + const strategies = [{ + subscriptions: [kafkaOptions.topic], + handler: dataHandler + }]; + await consumer.init(strategies) logger.info('Initialized kafka consumer') healthcheck.init([check]) } catch (err) { logger.error('Could not setup kafka consumer') logger.logFullError(err) - logger.debug(`error-sync: consumer kafka-setup "${err.message}"`) + logger.debug(`error-sync: consumer kafka-setup "${err.message}"`) terminate() } } diff --git a/src/producer.js b/src/producer.js index 75e11a5..e931264 100644 --- a/src/producer.js +++ b/src/producer.js @@ -20,19 +20,15 @@ async function setupPgClient() { var payloadcopy try { await pgClient.connect() - for (const triggerFunction of pgOptions.triggerFunctions) { + //for (const triggerFunction of pgOptions.triggerFunctions) { + for (const triggerFunction of pgOptions.triggerFunctions.split(',')) { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { try { payloadcopy = "" - logger.debug('Entering producer 1') - logger.debug(message.toString()) - logger.debug('Entering producer 2') + logger.debug('Entering producer 2') logger.debug(message) - logger.debug('Entering producer 3') - logger.debug(JSON.stringify(message.payload)) - const payload = JSON.parse(message.payload) payloadcopy = message @@ -42,11 +38,12 @@ try { //logger.info('trying to push on kafka topic') await pushToKafka(payload) logger.info('Push to kafka and added for audit trail') + audit(message) } else { logger.info('Push to dynamodb for reconciliation') await pushToDynamoDb(payload) } - audit(message) + } else { logger.debug('Ignoring message with incorrect topic or originator') // push to slack - alertIt("slack message") @@ -121,7 +118,7 @@ async function audit(message) { } else { logger.debug(`Producer DynamoDb : ${logMessage}`); } - auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", JSON.stringify(message), pl_timestamp, pl_topic], 'producer') + await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", JSON.stringify(message), pl_timestamp, pl_topic], 'producer') } else { const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) if (!isFailover) { diff --git a/src/reconsiler-audit.js b/src/reconsiler-audit.js index 4491fd1..a77e3d2 100644 --- a/src/reconsiler-audit.js +++ b/src/reconsiler-audit.js @@ -24,7 +24,7 @@ async function setupPgClient() { rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end]; sql1 = "select pgifx_sync_audit.seq_id, pgifx_sync_audit.payloadseqid,pgifx_sync_audit.auditdatetime ,pgifx_sync_audit.syncstatus, pgifx_sync_audit.payload from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)" - sql2 = " and pgifx_sync_audit.producer_err <> 'Reconsiler1' and pgifx_sync_audit.auditdatetime between (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($2)" + sql2 = " and pgifx_sync_audit.tablename != 'sync_test_id' and pgifx_sync_audit.producer_err <> 'Reconsiler1' and pgifx_sync_audit.auditdatetime between (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($2)" sql3 = " and (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($3)" sql = sql1 + sql2 + sql3 await pgClient.query(sql,paramvalues, async (err,result) => { diff --git a/src/services/auditTrail.js b/src/services/auditTrail.js index 7e0d645..ed78523 100644 --- a/src/services/auditTrail.js +++ b/src/services/auditTrail.js @@ -1,12 +1,12 @@ const config = require('config') -const pg = require('pg') +const pgpool = require('./db.js'); const logger = require('../common/logger') - -const pgOptions = config.get('POSTGRES') -const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` -let pgClient2 +//const pg = require('pg') +//const pgOptions = config.get('POSTGRES') +//const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` +//let pgClient2 //console.log(`"${pgConnectionString}"`); -async function setupPgClient2 () { +/*async function setupPgClient2 () { pgClient2 = new pg.Client(pgConnectionString) try { await pgClient2.connect() @@ -17,12 +17,12 @@ async function setupPgClient2 () { logger.logFullError(err) process.exit() } -} +}*/ async function auditTrail (data,sourcetype) { -if (!pgClient2) { +/*if (!pgClient2) { await setupPgClient2() -} +}*/ if (sourcetype === 'producer'){ sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)' sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus,producer_err) = ($6,$9) where pgifx_sync_audit.payloadseqid = $1'; @@ -31,13 +31,9 @@ if (sourcetype === 'producer'){ } else { sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)' sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus,consumer_err,retrycount) = ($6,$8,$7)'; - // where pgifx_sync_audit.payloadseqid = $1'; - //and pgifx_sync_audit.processId = $2'; sql = sql0 + sql1 - logger.debug(`--${1} ${3} 1 Audit Trail update consumer--`) - //logger.debug(`sql values "${sql}"`); } - return pgClient2.query(sql, data, (err, res) => { + /*return pgClient2.query(sql, data, (err, res) => { if (err) { logger.debug(`-- Audit Trail update error-- ${err.stack}`) //pgClient2.end() @@ -45,7 +41,27 @@ if (sourcetype === 'producer'){ // logger.debug(`--Audit Trail update success-- `) } }) -pgClient2.end() +pgClient2.end() */ + +pgpool.on('error', (err, client) => { + logger.debug(`Unexpected error on idle client : ${err}`) + process.exit(-1) + }) + +await pgpool.connect(async (err, client, release) => { + if (err) { + return logger.debug(`Error acquiring client : ${err.stack}`) + } + await client.query(sql, data, (err, res) => { + release() + if (err) { + return logger.debug(`Error executing Query : ${err.stack}`) + } + logger.debug(`Audit Trail update : ${res.rowCount}`) + }) + }) + + } diff --git a/src/services/db.js b/src/services/db.js new file mode 100644 index 0000000..40f7dec --- /dev/null +++ b/src/services/db.js @@ -0,0 +1,5 @@ +const config = require('config') +const pg = require('pg') + const pgOptions = config.get('POSTGRES') + var pool = new pg.Pool(pgOptions); +module.exports = pool; diff --git a/src/services/updateInformix.js b/src/services/updateInformix.js index 70c868d..8a798a8 100644 --- a/src/services/updateInformix.js +++ b/src/services/updateInformix.js @@ -2,42 +2,56 @@ const informix = require('../common/informixWrapper') const logger = require('../common/logger') -/** - * Updates informix database with insert/update/delete operation - * @param {Object} payload The DML trigger data - */ -async function updateInformix (payload) { - logger.debug('=====Starting to update informix with data:====') - //const operation = payload.operation.toLowerCase() - const operation = payload.payload.operation.toLowerCase() - console.log("level 1 informix ",operation) - let sql = null +String.prototype.escapeSpecialChars = function () { + return this.replace(/\n/g, "\\n"); +}; - const columns = payload.payload.data - const primaryKey = payload.payload.Uniquecolumn +async function updateInformix(payload) { + logger.debug(`Informix Received from consumer-kafka :${JSON.stringify(payload)}`) + const operation = payload.payload.operation.toLowerCase() + console.log("=====Informix DML Operation :==========", operation) + let sql = null + let t0 = [] + const columns = payload.payload.data + const primaryKey = payload.payload.Uniquecolumn // Build SQL query switch (operation) { case 'insert': { const columnNames = Object.keys(columns) - sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `'${columns[k]}'`).join(', ')});` // "insert into : (col_1, col_2, ...) values (val_1, val_2, ...)" + sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `?`).join(', ')});` + t0 = Object.keys(columns).map((key) => `{"value":"${columns[key]}"}`) } break case 'update': { - sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update :
set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val" + sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}= ?`).join(', ')} where ${primaryKey}= ?;` + t0 = Object.keys(columns).map((key) => `{"value":"${columns[key]}"}`) + t0.push(`{"value":"${columns[primaryKey]}"}`) //param value for appended for where clause } break case 'delete': { - sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from :
where primary_key_col=primary_key_val" + sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}= ?;` + t0.push(`{"value":"${columns[primaryKey]}"}`) } break default: throw new Error(`Operation ${operation} is not supported`) } - const result = await informix.executeQuery(payload.payload.schema, sql, null) + //Preparedstatement for informix + t0.forEach((name, index) => t0[index] = `${name.escapeSpecialChars()}`); + //logger.debug(`Param values : ${t0}`); + let temp1 = "[" + `${t0}` + "]" + let finalparam = JSON.parse(temp1) + + /*console.log(`Typeof finalparam : ${typeof(finalparam)}`) + if (finalparam.constructor === Array ) console.log('isarray') + else console.log('finalparam not an array')*/ + logger.debug(`Final sql and param values are -- ${sql} ${JSON.stringify(finalparam)}`); + const result = await informix.executeQuery(payload.payload.schema, sql, finalparam) + logger.debug(`ifx execute query result : ${result}`) return result }