diff --git a/.circleci/config.yml b/.circleci/config.yml index f856f53..30adcee 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,10 +6,11 @@ defaults: &defaults install_dependency: &install_dependency name: Installation of build and deployment dependencies. command: | + sudo apt-get update sudo apt install jq sudo pip install awscli --upgrade sudo pip install docker-compose - sudo apt-get install default-jdk + sudo apt-get install default-jdk --fix-missing install_deploysuite: &install_deploysuite name: Installation of install_deploysuite. #Git Clone -change back to v1.3 or latest once counter var is generalized. @@ -39,31 +40,51 @@ build_steps: &build_steps - deploy: name: Running MasterScript. command: | + ./awsconfiguration.sh ${DEPLOY_ENV} source awsenvconf + #scorecard test consumer remove later + #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer_scorecard-deployvar + #source buildenvvar + #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + + #scorecard test producer remove later + #echo "Running Masterscript - deploy postgres-ifx-processer producer" + #if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi + #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_scorecard-deployvar + #source buildenvvar + #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer # ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar #source buildenvvar #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + echo "Running Masterscript - deploy postgres-ifx-processer producer" - #if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi + if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar source buildenvvar ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - #echo "Running Masterscript - deploy postgres-ifx-processer producer_dd" - #if [ -e ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json; fi - #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar - #source buildenvvar - #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + echo "Running Masterscript - deploy postgres-ifx-processer producer_dd" + if [ -e ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json; fi + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + #echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1" #if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar #source buildenvvar #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - + + #echo "Running Masterscript - deploy postgres-ifx-processer reconsiler2" + #if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi + #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar + #source buildenvvar + #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + jobs: # Build & Deploy against development backend # "build-dev": @@ -103,13 +124,12 @@ workflows: branches: only: - dev - - dev-retryfeature - "build-test": context : org-global filters: branches: - only: - - dev-test-pg + only: + - dev-test-pg - dev-test-pg-rf - "build-prod": context : org-global diff --git a/config/default.js b/config/default.js index dbca65a..5f835b3 100644 --- a/config/default.js +++ b/config/default.js @@ -22,8 +22,9 @@ module.exports = { database: process.env.PG_DATABASE || 'postgres', // database must exist before running the tool password: process.env.PG_PASSWORD || 'password', port: parseInt(process.env.PG_PORT, 10) || 5432, - triggerFunctions: process.env.TRIGGER_FUNCTIONS || ['prod_db_notifications'], // List of trigger functions to listen to - triggerTopics: process.env.TRIGGER_TOPICS || ['prod.db.postgres.sync'], // Names of the topic in the trigger payload + + triggerFunctions: process.env.TRIGGER_FUNCTIONS || 'dev_db_notifications', // List of trigger functions to listen to + triggerTopics: process.env.TRIGGER_TOPICS || ['dev.db.postgres.sync'], // Names of the topic in the trigger payload triggerOriginators: process.env.TRIGGER_ORIGINATORS || ['tc-postgres-delta-processor'] // Names of the originator in the trigger payload }, KAFKA: { // Kafka connection options @@ -34,32 +35,36 @@ module.exports = { }, topic: process.env.KAFKA_TOPIC || 'db.topic.sync', // Kafka topic to push and receive messages partition: process.env.partition || [0], // Kafka partitions to use - maxRetry: process.env.MAX_RETRY || 3, + maxRetry: process.env.MAX_RETRY || 10, errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error', - recipients: ['admin@abc.com'] // Kafka partitions to use + recipients: ['admin@abc.com'], // Kafka partitions to use, + KAFKA_URL: process.env.KAFKA_URL, + KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'dev-postgres-ifx-consumer', + KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT ? process.env.KAFKA_CLIENT_CERT.replace('\\n', '\n') : null, + KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ? process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null, }, SLACK: { URL: process.env.SLACKURL || 'us-east-1', SLACKCHANNEL: process.env.SLACKCHANNEL || 'ifxpg-migrator', - SLACKNOTIFY: process.env.SLACKNOTIFY || 'false' + SLACKNOTIFY: process.env.SLACKNOTIFY || 'false' }, - RECONSILER:{ + RECONSILER: { RECONSILER_START: process.env.RECONSILER_START || 5, RECONSILER_END: process.env.RECONSILER_END || 1, RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm' }, DYNAMODB: - { - DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'dev_pg_ifx_payload_sync', - DD_ElapsedTime: process.env.DD_ElapsedTime || 600000 - }, + { + DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'dev_pg_ifx_payload_sync', + DD_ElapsedTime: process.env.DD_ElapsedTime || 600000 + }, - AUTH0_URL: process.env.AUTH0_URL , - AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE , - TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME , - AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID , - AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET , - BUSAPI_URL : process.env.BUSAPI_URL , - KAFKA_ERROR_TOPIC : process.env.KAFKA_ERROR_TOPIC , - AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL + AUTH0_URL: process.env.AUTH0_URL, + AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE, + TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME, + AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID, + AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET, + BUSAPI_URL: process.env.BUSAPI_URL, + KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC, + AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL } diff --git a/informix-identity-trigger-proc.sql b/informix-identity-trigger-proc.sql index 3dd4897..ddcd3d9 100644 --- a/informix-identity-trigger-proc.sql +++ b/informix-identity-trigger-proc.sql @@ -1,6 +1,8 @@ + database common_oltp; DROP PROCEDURE proc_user_update(varchar,decimal); +DROP PROCEDURE proc_user_update; CREATE PROCEDURE informix.proc_user_update( new_handle varchar(50), user_id decimal(10,0)) @@ -8,8 +10,7 @@ user_id decimal(10,0)) UPDATE user SET handle_lower = lower(new_handle), modify_date = current WHERE user.user_id = user_id; End if; end procedure; - -DROP PROCEDURE proc_user_update; + create procedure "informix".proc_user_update( user_id DECIMAL(10,0), old_first_name VARCHAR(64), @@ -27,8 +28,9 @@ new_middle_name VARCHAR(64), old_timezone_id decimal(5,0), new_timezone_id decimal(5,0) ) - if (USER != 'ifxsyncuser') then - + + if (USER != 'ifxsyncuser') then + if ((old_first_name != new_first_name) or (old_last_name != new_last_name ) or (old_middle_name != new_middle_name )) then insert into audit_user (column_name, old_value, new_value, user_id) @@ -60,7 +62,6 @@ user_id) user_id) values ('TIMEZONE_ID', old_timezone_id, new_timezone_id, user_id); End If; - UPDATE user SET handle_lower = lower(new_handle), modify_date = current WHERE user.user_id = user_id; End if; end procedure; @@ -79,7 +80,7 @@ old_status_id DECIMAL(3,0), new_status_id DECIMAL(3,0) ) - if (USER != 'ifxsyncuser') then + if (USER != 'ifxsyncuser') then if (old_email_type_id != new_email_type_id) then insert into audit_user (column_name, old_value, new_value, user_id) @@ -100,7 +101,7 @@ new_status_id DECIMAL(3,0) insert into audit_user (column_name, old_value, new_value, user_id) values ('EMAIL_PRIMARY_IND', old_primary_ind, new_primary_ind, user_id); End If; - + update email set modify_date = current where email.email_id = email_id; End if; end procedure; @@ -206,7 +207,119 @@ CREATE TRIGGER informix.ifxpgsync_user_group_xref_update update on "informix".us ( execute procedure "informix".do_auditing2(USER )); +DROP PROCEDURE informix.proc_user_last_login; +CREATE PROCEDURE informix.proc_user_last_login (user_id DECIMAL(10,0), o_last_login DATETIME YEAR TO FRACTION, +n_last_login DATETIME YEAR TO FRACTION) + + if (o_last_login != n_last_login) then + if (USER != 'ifxsyncuser') then + insert into corona_event (corona_event_type_id,user_id, corona_event_timestamp) values (1, user_id, n_last_login); + end if; + End if; +end procedure; + +DROP PROCEDURE informix.proc_phone_update; +CREATE PROCEDURE informix.proc_phone_update( +phone_id decimal(10,0), +user_id DECIMAL(10,0), +old_phone_type_id DECIMAL(5,0), +new_phone_type_id DECIMAL(5,0), +old_number VARCHAR(64), +new_number VARCHAR(64), +old_primary_ind DECIMAL(1,0), +new_primary_ind DECIMAL(1,0) +) + if (USER != 'ifxsyncuser') then + if (old_phone_type_id != new_phone_type_id) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_TYPE', old_phone_type_id, new_phone_type_id, user_id); + End If; + + if (old_number != new_number) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_NUMBER', old_number, new_number, user_id); + End If; + + if (old_primary_ind != new_primary_ind) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_PRIMARY_IND', old_primary_ind, new_primary_ind, user_id); + End If; +update phone set modify_date = current where phone.phone_id = phone_id; +End if; +end procedure; + +DROP PROCEDURE informix.proc_address_update; +CREATE PROCEDURE informix.proc_address_update( + address_id DECIMAL(10,0), + old_address_type_id DECIMAL(5,0), + new_address_type_id DECIMAL(5,0), + old_address1 VARCHAR(254), + new_address1 VARCHAR(254), + old_address2 VARCHAR(254), + new_address2 VARCHAR(254), + old_address3 VARCHAR(254), + new_address3 VARCHAR(254), + old_city VARCHAR(64), + new_city VARCHAR(64), + old_state_code CHAR(2), + new_state_code CHAR(2), + old_province VARCHAR(64), + new_province VARCHAR(64), + old_zip VARCHAR(15), + new_zip VARCHAR(15), + old_country_code CHAR(3), + new_country_code CHAR(3) +) + define user_id DECIMAL(10,0); + let user_id = NVL((select min(x.user_id) from user_address_xref x where x.address_id = address_id), -1); + if (USER != 'ifxsyncuser') then + if (user_id > 0 and old_address1 != new_address1) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS1', old_address1, new_address1, user_id); + End If; + if (user_id > 0 and old_address2 != new_address2) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS2', old_address2, new_address2, user_id); + End If; + if (user_id > 0 and old_address3 != new_address3) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS3', old_address3, new_address3, user_id); + End If; + if (user_id > 0 and old_city != new_city) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS_CITY', old_city, new_city, user_id); + End If; + if (user_id > 0 and old_state_code != new_state_code) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS_STATE', old_state_code, new_state_code, user_id); + End If; + if (user_id > 0 and old_province != new_province) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS_PROVINCE', old_province, new_province, user_id); + End If; + if (user_id > 0 and old_zip != new_zip) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS_ZIP', old_zip, new_zip, user_id); + End If; + if (user_id > 0 and old_country_code != new_country_code) then + insert into audit_user (column_name, old_value, new_value, +user_id) + values ('ADDRESS_COUNTRY', old_country_code, new_country_code, user_id); + End If; + update address set modify_date = current where address.address_id = address_id; + End if; +end procedure; + +database informixoltp +DROP PROCEDURE informix.proc_coder_update; CREATE TRIGGER informix.ifxpgsync_insert_user_sso_login insert on user_sso_login for each row ( execute procedure "informix".do_auditing2(USER )); @@ -284,6 +397,7 @@ create trigger informix.ifxpgsync_corona_event_insert insert on informix.corona_ database informixoltp; drop PROCEDURE proc_coder_update; + CREATE PROCEDURE informix.proc_coder_update( v_oldcoder_id decimal(10,0), v_oldquote varchar(255),v_newquote varchar (255), @@ -291,7 +405,8 @@ v_oldlanguage_id decimal(3,0), v_newlanguage_id decimal(3,0), v_oldcoder_type_id decimal(3,0), v_newcoder_type_id decimal(3,0), v_oldcomp_country_code varchar(3), v_newcomp_country_code varchar(3) ) - if (USER != 'ifxsyncuser') then + + if (USER != 'ifxsyncuser') then if (v_oldquote != v_newquote) then insert into audit_coder (column_name, old_value, new_value, user_id) values ('QUOTE', v_oldquote , v_newquote, v_oldcoder_id); @@ -311,6 +426,53 @@ v_oldcomp_country_code varchar(3), v_newcomp_country_code varchar(3) insert into audit_coder (column_name, old_value, new_value, user_id) values ('COMP_COUNTRY', v_oldcomp_country_code , v_newcomp_country_code, v_oldcoder_id); End if; + update coder set modify_date = current where coder_id = v_oldcoder_id; + End if; +end procedure; + +database tcs_catalog; +DROP PROCEDURE proc_reliability_update; +CREATE PROCEDURE informix.proc_reliability_update( +p_user_id DECIMAL(10,0), +p_phase_id decimal(3,0), +old_rating decimal(5,4), +new_rating decimal(5,4) +) + + if (USER != 'ifxsyncuser') then + if (old_rating != new_rating) then + insert into user_reliability_audit (column_name, old_value, new_value, user_id, phase_id) + values ('RATING', old_rating, new_rating, p_user_id, p_phase_id); + End If; + + update user_reliability set modify_date = current where user_id = p_user_id and phase_id = p_phase_id; +End if; +end procedure; + +DROP PROCEDURE proc_rating_update; +CREATE PROCEDURE informix.proc_rating_update( +p_user_id DECIMAL(10,0), +p_phase_id decimal(3,0), +old_rating decimal(10,0), +new_rating decimal(10,0), +old_vol decimal(10,0), +new_vol decimal(10,0), +old_num_ratings decimal(5,0), +new_num_ratings decimal(5,0), +old_last_rated_project_id decimal(12,0), +new_last_rated_project_id decimal(12,0) +) + if (USER != 'ifxsyncuser') then + if (old_rating != new_rating) then + insert into user_rating_audit (column_name, old_value, new_value, user_id, phase_id) + values ('RATING', old_rating, new_rating, p_user_id, p_phase_id); + End If; + + if (old_vol != new_vol) then + insert into user_rating_audit (column_name, old_value, new_value, user_id, phase_id) + values ('VOL', old_vol, new_vol, p_user_id, p_phase_id); + End If; + update coder set modify_date = current where coder_id = v_oldcoder_id; End if; @@ -321,6 +483,7 @@ CREATE PROCEDURE informix.do_auditing2(sessionusername LVARCHAR) EXTERNAL NAME "$INFORMIXDIR/extend/auditing/auditing.bld(do_auditing2)" LANGUAGE C; + CREATE TRIGGER informix.ifxpgsync_insert_coder insert on "informix".coder for each row ( execute procedure "informix".do_auditing2(USER )); diff --git a/package.json b/package.json index 5f68789..772e77e 100644 --- a/package.json +++ b/package.json @@ -10,14 +10,15 @@ "consumer": "node ./src/consumer.js", "producer_dd": "node ./src/producer.js failover", "reconsiler1": "node ./src/reconsiler-audit.js", - "start": "npm run producer & npm run producer_dd & npm run consumer & npm run reconsiler1" + "reconsiler2": "node ./src/reconsiler-dd.js", + "start": "npm run producer & npm run producer_dd & npm run consumer & npm run reconsiler1 & npm run reconsiler2" }, "author": "Topcoder", "license": "ISC", "dependencies": { "aws-sdk": "*", "config": "^3.2.2", - "informix-wrapper": "git+https://github.com/appirio-tech/informix-wrapper.git", + "informix-wrapper": "git+https://github.com/appirio-tech/informix-wrapper.git#prepare_stmt_fix", "no-kafka": "^3.4.3", "pg": "^7.12.1", "sleep": "^6.1.0", diff --git a/pg-identity-func-trig-seq2.sql b/pg-identity-func-trig-seq2.sql new file mode 100644 index 0000000..31c6e89 --- /dev/null +++ b/pg-identity-func-trig-seq2.sql @@ -0,0 +1,753 @@ +SET search_path TO common_oltp; + +CREATE INDEX IF NOT EXISTS email_address_idx ON common_oltp.email + ( + address + ); + +CREATE INDEX IF NOT EXISTS user_activ_code_idx ON common_oltp.user + ( + activation_code + ); + +CREATE INDEX IF NOT EXISTS user_open_id_idx ON common_oltp.user + ( + open_id + ); + +CREATE INDEX IF NOT EXISTS user_status_idx ON common_oltp.user + ( + status + ); + + +CREATE TABLE sync_test_id +( + uniqid INTEGER NOT NULL, + description varchar(200), + created_at TIMESTAMP(6) WITH TIME ZONE DEFAULT now(), + PRIMARY KEY (uniqid) + ); +CREATE TRIGGER "pg_sync_test_id_trigger" + AFTER INSERT OR DELETE OR UPDATE ON sync_test_id + FOR EACH ROW +EXECUTE PROCEDURE common_oltp.notify_trigger_common_oltp('uniqid', 'description', 'created_at'); + +ALTER TABLE "common_oltp"."sync_test_id" disable TRIGGER "pg_sync_test_id_trigger" + +CREATE OR REPLACE FUNCTION "common_oltp"."notify_trigger_common_oltp" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + rec RECORD; + payload TEXT; + column_name TEXT; + column_value TEXT; + pguserval TEXT; + --payload_items TEXT[]; + payload_items JSONB; + uniquecolumn TEXT; + logtime TEXT; + payloadseqid INTEGER; +BEGIN + +pguserval := (SELECT current_user); + if pguserval = 'pgsyncuser' then + RAISE notice 'pgsyncuser name : %', pguserval; + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + return rec; + -- else + end if; + + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + raise notice 'table name : %', TG_TABLE_NAME; + RAISE info 'hello world'; + -- Get required fields + FOREACH column_name IN ARRAY TG_ARGV LOOP + EXECUTE format('SELECT $1.%I::TEXT', column_name) + INTO column_value + USING rec; + case + when + column_name = 'upload_document' then + -- RAISE NOTICE 'upload_document boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'upload_document_required' then + -- RAISE NOTICE 'upload_document_required boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_email_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_handle_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'social_email_verified' then + if column_value = 'false' then + column_value = 'f'; + else + column_value = 't'; + end if; + when + column_name = 'create_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'modify_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'last_login' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'last_site_hit_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'corona_event_timestamp' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'created_at' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + else + -- RAISE NOTICE ' not boolean'; + end case; + --payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb; + payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,replace(column_value,'"','\"'))::jsonb; + -- payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); + END LOOP; + --logtime := (select date_display_tz()); + logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('common_oltp.payloadsequence'::regclass)); + + uniquecolumn := (SELECT c.column_name + FROM information_schema.key_column_usage AS c + LEFT JOIN information_schema.table_constraints AS t + ON t.constraint_name = c.constraint_name + WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1); + + if (uniquecolumn = '') IS NOT FALSE then + uniquecolumn := 'Not-Available'; + end if; + + -- exclude any null value columns. + payload_items := jsonb_strip_nulls(payload_items); + + -- Build the payload + payload := '' + || '{' + || '"topic":"' || 'dev.db.postgres.sync' || '",' + || '"originator":"' || 'tc-postgres-delta-processor' || '",' + || '"timestamp":"' || logtime || '",' + || '"mime-type":"' || 'application/json' || '",' + || '"payload": {' + || '"payloadseqid":"' || payloadseqid || '",' + || '"Uniquecolumn":"' || uniquecolumn || '",' + || '"operation":"' || TG_OP || '",' + || '"schema":"' || TG_TABLE_SCHEMA || '",' + || '"table":"' || TG_TABLE_NAME || '",' + || '"data": ' || payload_items + || '}}'; + -- Notify the channel + PERFORM pg_notify('dev_db_notifications', payload); + RETURN rec; +END; +$body$ LANGUAGE plpgsql + +CREATE OR REPLACE FUNCTION "common_oltp"."proc_email_update" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE +pguserval TEXT; +BEGIN + pguserval := (SELECT current_user); + if pguserval != 'pgsyncuser' then + if (OLD.email_type_id != NEW.email_type_id) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('EMAIL_TYPE', OLD.email_type_id, NEW.email_type_id, OLD.user_id); + End If; + + if (OLD.status_id != NEW.status_id) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('EMAIL_STATUS', OLD.status_id, NEW.status_id, OLD.user_id); + End If; + + if (OLD.address != NEW.address) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('EMAIL_ADDRESS', OLD.address, NEW.address, OLD.user_id); + End If; + + if (OLD.primary_ind != NEW.primary_ind) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('EMAIL_PRIMARY_IND', OLD.primary_ind, NEW.primary_ind, OLD.user_id); + End If; + + -- if pguserval != 'pgsyncuser' then + NEW.modify_date = current_timestamp; + end if; + + + RETURN NEW; +END; +$body$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION "common_oltp"."proc_phone_update" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE +pguserval TEXT; +BEGIN +pguserval := (SELECT current_user); +if pguserval != 'pgsyncuser' then + if (OLD.phone_type_id != NEW.phone_type_id) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_TYPE', OLD.phone_type_id, NEW.phone_type_id, OLD.user_id); + End If; + + if (OLD.phone_number != NEW.phone_number) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_NUMBER', OLD.phone_number, NEW.phone_number, OLD.user_id); + End If; + + if (OLD.primary_ind != NEW.primary_ind) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('PHONE_PRIMARY_IND', OLD.primary_ind, NEW.primary_ind, OLD.user_id); + End If; + + NEW.modify_date = current_timestamp; + end if; + RETURN NEW; +END; +$body$ LANGUAGE plpgsql + +CREATE OR REPLACE FUNCTION "common_oltp"."proc_user_update" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE +pguserval TEXT; +BEGIN +pguserval := (SELECT current_user); +if pguserval != 'pgsyncuser' then + IF (TG_OP = 'UPDATE') THEN + if ((OLD.first_name != NEW.first_name) or (OLD.last_name != NEW.last_name ) or (OLD.middle_name != NEW.middle_name )) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('NAME', NULLIF(OLD.first_name, '') || ' ' || NULLIF(OLD.middle_name, '') || ' ' || NULLIF(OLD.last_name, ''), + NULLIF(NEW.first_name, '') || ' ' || NULLIF(NEW.middle_name, '') || ' ' || NULLIF(NEW.last_name, ''), OLD.user_id); + End if; + + if (OLD.handle != NEW.handle) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('HANDLE', OLD.handle, NEW.handle, OLD.user_id); + End If; + + if (OLD.status != NEW.status) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('STATUS', OLD.status, NEW.status, OLD.user_id); + End If; + + if (OLD.activation_code != NEW.activation_code) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('ACTIVATION_CODE', OLD.activation_code, NEW.activation_code, OLD.user_id); + End If; + + if (OLD.timezone_id != NEW.timezone_id) then + insert into common_oltp.audit_user (column_name, old_value, new_value, user_id) + values ('TIMEZONE_ID', OLD.timezone_id, NEW.timezone_id, OLD.user_id); + End If; + + + NEW.modify_date = current_timestamp; + end if; + + + END IF; + + NEW.handle_lower = lower(NEW.handle); + + RETURN NEW; +END; +$body$ LANGUAGE plpgsql + +CREATE OR REPLACE FUNCTION "common_oltp"."proc_address_update" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE +pguserval TEXT; + user_id DECIMAL(10,0); +BEGIN + user_id := NULLIF((select min(x.user_id) from user_address_xref x where x.address_id = OLD.address_id), -1); + pguserval := (SELECT current_user); + if pguserval != 'pgsyncuser' then + if (user_id > 0 and OLD.address1 != NEW.address1) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS1', OLD.address1, NEW.address1, user_id); + End If; + + if (user_id > 0 and OLD.address2 != NEW.address2) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS2', OLD.address2, NEW.address2, user_id); + End If; + + if (user_id > 0 and OLD.address3 != NEW.address3) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS3', OLD.address3, NEW.address3, user_id); + End If; + + if (user_id > 0 and OLD.city != NEW.city) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS_CITY', OLD.city, NEW.city, user_id); + End If; + + if (user_id > 0 and OLD.state_code != NEW.state_code) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS_STATE', OLD.state_code, NEW.state_code, user_id); + End If; + + if (user_id > 0 and OLD.province != NEW.province) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS_PROVINCE', OLD.province, NEW.province, user_id); + End If; + + if (user_id > 0 and OLD.zip != NEW.zip) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS_ZIP', OLD.zip, NEW.zip, user_id); + End If; + + if (user_id > 0 and OLD.country_code != NEW.country_code) then + insert into audit_user (column_name, old_value, new_value, user_id) + values ('ADDRESS_COUNTRY', OLD.country_code, NEW.country_code, user_id); + End If; + + NEW.modify_date = current_timestamp; + end if; + RETURN NEW; +END; +$body$ LANGUAGE plpgsql + + +CREATE OR REPLACE FUNCTION "common_oltp"."proc_user_last_login" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE +pguserval TEXT; +BEGIN +pguserval := (SELECT current_user); + if pguserval != 'pgsyncuser' then + if (OLD.last_login != NEW.last_login) then + insert into common_oltp.corona_event(corona_event_type_id, user_id, corona_event_timestamp) + values (1, OLD.user_id, NEW.last_login); + end if; + end if; + + RETURN NULL; +END; +$body$ LANGUAGE plpgsql + + +CREATE TRIGGER "pg_security_groups_trigger" + AFTER INSERT OR DELETE OR UPDATE ON security_groups + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('group_id', 'description', 'challenge_group_ind', 'create_user_id'); + + CREATE TRIGGER "pg_social_login_provider_trigger" +AFTER INSERT OR DELETE OR UPDATE ON social_login_provider +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('social_login_provider_id', 'name'); + + +CREATE TRIGGER "pg_sso_login_provider_trigger" +AFTER INSERT OR DELETE OR UPDATE ON sso_login_provider +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('sso_login_provider_id', 'name','type','identify_email_enabled','identify_handle_enabled'); + +CREATE TRIGGER "pg_Country_trigger" +AFTER INSERT OR DELETE OR UPDATE ON Country +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('country_code', 'country_name','modify_date','participating','default_taxform_id','longitude','latitude','region','iso_name','iso_alpha2_code','iso_alpha3_code'); + +CREATE TRIGGER "pg_invalid_handles_trigger" +AFTER INSERT OR DELETE OR UPDATE ON invalid_handles +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('invalid_handle_id', 'invalid_handle'); + +CREATE TRIGGER "pg_achievement_type_lu_trigger" +AFTER INSERT OR DELETE OR UPDATE ON achievement_type_lu +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('achievement_type_id','achievement_type_desc'); + + + +ALTER TABLE "user" DISABLE TRIGGER pg_user_trigger; +ALTER TABLE email DISABLE TRIGGER pg_email_trigger; +ALTER TABLE security_user DISABLE TRIGGER pg_security_user_trigger; +ALTER TABLE user_sso_login DISABLE TRIGGER pg_user_sso_login_trigger; +ALTER TABLE user_achievement DISABLE TRIGGER pg_user_achievement_trigger; +ALTER TABLE user_group_xref DISABLE TRIGGER pg_user_group_xref_trigger; +ALTER TABLE security_groups DISABLE TRIGGER pg_security_groups_trigger; +ALTER TABLE user_social_login DISABLE TRIGGER pg_user_social_login_trigger; +ALTER TABLE social_login_provider DISABLE TRIGGER pg_social_login_provider_trigger; +ALTER TABLE sso_login_provider DISABLE TRIGGER pg_sso_login_provider_trigger; +ALTER TABLE country DISABLE TRIGGER pg_country_trigger; +ALTER TABLE invalid_handles DISABLE TRIGGER pg_invalid_handles_trigger; +ALTER TABLE achievement_type_lu DISABLE TRIGGER pg_achievement_type_lu_trigger; +ALTER TABLE corana_event DISABLE TRIGGER pg_corana_event_trigger; +ALTER TABLE audit_user DISABLE TRIGGER pg_audit_user_trigger; + +DROP sequence "common_oltp"."sequence_user_group_seq"; +CREATE SEQUENCE sequence_user_group_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 +START WITH 951000000 NO CYCLE; + +DROP sequence "common_oltp"."sequence_user_seq"; +CREATE SEQUENCE sequence_user_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH +488770000 NO CYCLE; + +ALTER SEQUENCE corona_event_corona_event_id_seq RESTART WITH 577770000; + +SET search_path TO informixoltp; + +CREATE OR REPLACE FUNCTION "informixoltp"."notify_trigger_informixoltp" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + rec RECORD; + payload TEXT; + column_name TEXT; + column_value TEXT; + -- payload_items TEXT[]; + payload_items JSONB; + pguserval TEXT; + uniquecolumn TEXT; + logtime TEXT; + payloadseqid INTEGER; +BEGIN + +pguserval := (SELECT current_user); + if pguserval = 'pgsyncuser' then + RAISE notice 'pgsyncuser name : %', pguserval; + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + return rec; + -- else + end if; + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + raise notice 'table name : %', TG_TABLE_NAME; + -- RAISE info 'hello world'; + -- Get required fields + FOREACH column_name IN ARRAY TG_ARGV LOOP + EXECUTE format('SELECT $1.%I::TEXT', column_name) + INTO column_value + USING rec; + case + when + column_name = 'upload_document' then + -- RAISE NOTICE 'upload_document boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'upload_document_required' then + -- RAISE NOTICE 'upload_document_required boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_email_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_handle_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'create_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'modify_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'member_since' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + else + -- RAISE NOTICE ' not boolean'; + end case; + -- payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); + --payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb; + payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,replace(column_value,'"','\"'))::jsonb; + END LOOP; + logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('common_oltp.payloadsequence'::regclass)); + + uniquecolumn := (SELECT c.column_name + FROM information_schema.key_column_usage AS c + LEFT JOIN information_schema.table_constraints AS t + ON t.constraint_name = c.constraint_name + WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1); + + if (uniquecolumn = '') IS NOT FALSE then + uniquecolumn := 'Not-Available'; + end if; + + -- exclude any null value columns. + payload_items := jsonb_strip_nulls(payload_items); + + -- Build the payload + payload := '' + || '{' + || '"topic":"' || 'dev.db.postgres.sync' || '",' + || '"originator":"' || 'tc-postgres-delta-processor' || '",' + || '"timestamp":"' || logtime || '",' + || '"mime-type":"' || 'application/json' || '",' + || '"payload": {' + || '"payloadseqid":"' || payloadseqid || '",' + || '"Uniquecolumn":"' || uniquecolumn || '",' + || '"operation":"' || TG_OP || '",' + || '"schema":"' || TG_TABLE_SCHEMA || '",' + || '"table":"' || TG_TABLE_NAME || '",' + || '"data": ' || payload_items + || '}}'; + + -- Notify the channel + PERFORM pg_notify('dev_db_notifications', payload); + + RETURN rec; +END; +$body$ LANGUAGE plpgsql + +CREATE OR REPLACE FUNCTION "informixoltp"."proc_coder_update" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + pguserval TEXT; +begin + if (OLD.quote != NEW.quote) then + insert into audit_coder (column_name, old_value, new_value, user_id) + values ('QUOTE', OLD.quote , NEW.quote, OLD.coder_id); + end if; + + if (OLD.coder_type_id != NEW.coder_type_id) then + insert into audit_coder (column_name, old_value, new_value, user_id) + values ('CODER_TYPE', OLD.coder_type_id , NEW.coder_type_id, OLD.coder_id); + end if; + if (OLD.language_id != NEW.language_id) then + insert into audit_coder (column_name, old_value, new_value, user_id) + values ('LANGUAGE', OLD.language_id , NEW.language_id, OLD.coder_id); + end if; + if (OLD.comp_country_code != NEW.comp_country_code) then + insert into audit_coder (column_name, old_value, new_value, user_id) + values ('COMP_COUNTRY', OLD.comp_country_code , NEW.comp_country_code, OLD.coder_id); + end if; + pguserval := (SELECT current_user); + if pguserval != 'pgsyncuser' then + -- RAISE info 'current_user'; + -- raise notice 'inside current_user : %', current_user; + --update coder set modify_date = current_timestamp where coder_id = OLD.coder_id; + NEW.modify_date = current_timestamp; + end if; + + return NEW; +end ; +$body$ LANGUAGE plpgsql + + +CREATE TRIGGER "pg_coder_referral_trigger" +AFTER INSERT OR DELETE OR UPDATE ON coder_referral +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_informixoltp('coder_id', 'referral_id','reference_id','other'); + +ALTER TABLE coder DISABLE TRIGGER pg_coder; +ALTER TABLE algo_rating DISABLE TRIGGER pg_algo_rating; +ALTER TABLE coder_referral DISABLE TRIGGER pg_coder_referral_trigger; + +SET search_path TO tcs_catalog; + +CREATE OR REPLACE FUNCTION "tcs_catalog"."notify_trigger" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + rec RECORD; + payload TEXT; + column_name TEXT; + column_value TEXT; + --payload_items TEXT[]; + payload_items JSONB; + pguserval TEXT; + uniquecolumn TEXT; + logtime TEXT; + payloadseqid INTEGER; +BEGIN + +pguserval := (SELECT current_user); + if pguserval = 'pgsyncuser' then + RAISE notice 'pgsyncuser name : %', pguserval; + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + return rec; + -- else + end if; + + -- Set record row depending on operation + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + -- Get required fields + FOREACH column_name IN ARRAY TG_ARGV LOOP + EXECUTE format('SELECT $1.%I::TEXT', column_name) + INTO column_value + USING rec; + case + when + column_name = 'upload_document' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'upload_document_required' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + else + -- RAISE NOTICE ' not boolean'; + end case; + --payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); + --payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb; + payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,replace(column_value,'"','\"'))::jsonb; + + END LOOP; + logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('payloadsequence'::regclass)); + + uniquecolumn := (SELECT c.column_name + FROM information_schema.key_column_usage AS c + LEFT JOIN information_schema.table_constraints AS t + ON t.constraint_name = c.constraint_name + WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' limit 1); + + if (uniquecolumn = '') IS NOT FALSE then + uniquecolumn := 'Not-Available'; + end if; + -- exclude any null value columns. + payload_items := jsonb_strip_nulls(payload_items); + + RAISE Notice ' payload val: "%"', payload; + -- Build the payload + --payload := '' + -- || '{' + -- || '"topic":"' || 'dev.db.postgres.sync' || '",' + -- || '"originator":"' || 'tc-postgres-delta-processor' || '",' + -- || '"timestamp":"' || logtime || '",' + -- || '"mime-type":"' || 'application/json' || '",' + -- || '"payload": {' + -- || '"payloadseqid":"' || payloadseqid || '",' + -- || '"Uniquecolumn":"' || uniquecolumn || '",' + -- || '"operation":"' || TG_OP || '",' + -- || '"schema":"' || TG_TABLE_SCHEMA || '",' + -- || '"table":"' || TG_TABLE_NAME || '",' + -- || '"data": {' || array_to_string(payload_items, ',') || '}' + -- || '}}'; + + payload := '' + || '{' + || '"topic":"' || 'dev.db.postgres.sync' || '",' + || '"originator":"' || 'tc-postgres-delta-processor' || '",' + || '"timestamp":"' || logtime || '",' + || '"mime-type":"' || 'application/json' || '",' + || '"payload": {' + || '"payloadseqid":"' || payloadseqid || '",' + || '"Uniquecolumn":"' || uniquecolumn || '",' + || '"operation":"' || TG_OP || '",' + || '"schema":"' || TG_TABLE_SCHEMA || '",' + || '"table":"' || TG_TABLE_NAME || '",' + || '"data":' || payload_items + || '}}'; + + -- Notify the channel + PERFORM pg_notify('dev_db_notifications', payload); + + RETURN rec; +END; +$body$ LANGUAGE plpgsql; + + +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO pgsyncuser; +GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO pgsyncuser; + +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO coder; +GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO coder; + +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO postgres; +GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA common_oltp,informixoltp,tcs_catalog TO postgres; + +grant USAGE ON SCHEMA common_oltp,informixoltp,tcs_catalog To pgsyncuser; +grant USAGE ON SCHEMA common_oltp,informixoltp,tcs_catalog To coder; +grant USAGE ON SCHEMA common_oltp,informixoltp,tcs_catalog To postgres; diff --git a/src/consumer.js b/src/consumer.js index 9531776..fd74484 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -10,16 +10,17 @@ const healthcheck = require('topcoder-healthcheck-dropin'); const auditTrail = require('./services/auditTrail'); const kafkaOptions = config.get('KAFKA') const postMessage = require('./services/posttoslack') -const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key -const consumer = new Kafka.SimpleConsumer({ - connectionString: kafkaOptions.brokers_url, - ...(isSslEnabled && { // Include ssl options if present - ssl: { - cert: kafkaOptions.SSL.cert, - key: kafkaOptions.SSL.key - } - }) -}) +//const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key + +const options = { + groupId: kafkaOptions.KAFKA_GROUP_ID, + connectionString: kafkaOptions.KAFKA_URL, + ssl: { + cert: kafkaOptions.KAFKA_CLIENT_CERT, + key: kafkaOptions.KAFKA_CLIENT_CERT_KEY + } +}; +const consumer = new Kafka.GroupConsumer(options); const check = function () { if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) { @@ -35,100 +36,114 @@ const check = function () { let cs_processId; const terminate = () => process.exit() + /** * * @param {Array} messageSet List of messages from kafka * @param {String} topic The name of the message topic * @param {Number} partition The kafka partition to which messages are written */ -var retryvar=""; +var retryvar = ""; //let cs_payloadseqid; async function dataHandler(messageSet, topic, partition) { -let cs_payloadseqid - for (const m of messageSet) { // Process messages sequentially + for (const m of messageSet) { // Process messages sequentially let message + let ifxstatus = 0 try { + // let ifxstatus = 0 + let cs_payloadseqid; message = JSON.parse(m.message.value) - logger.debug('Received message from kafka:') + //logger.debug(`Consumer Received from kafka :${JSON.stringify(message)}`) if (message.payload.payloadseqid) cs_payloadseqid = message.payload.payloadseqid; logger.debug(`consumer : ${message.payload.payloadseqid} ${message.payload.table} ${message.payload.Uniquecolumn} ${message.payload.operation} ${message.timestamp} `); - await updateInformix(message) - await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success - if (message.payload['retryCount']) retryvar = message.payload.retryCount; - auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, - message.payload.operation,"Informix-updated",retryvar,"","",JSON.stringify(message), message.timestamp,message.topic],'consumer') + //await updateInformix(message) + ifxstatus = await updateInformix(message) + // if (ifxstatus === 0 && `${message.payload.operation}` === 'INSERT') { + // logger.debug(`operation : ${message.payload.operation}`) + // logger.debug(`Consumer :informixt status for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus} - Retrying`) + // auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn, + // message.payload.operation, "push-to-kafka", retryvar, "", "", JSON.stringify(message), new Date(), message.topic], 'consumer') + // await retrypushtokakfa(message, topic, m, partition) + //} else { + logger.debug(`Consumer :informix status for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus}`) + if (message.payload['retryCount']) retryvar = message.payload.retryCount; + await auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn, + message.payload.operation, "Informix-updated", retryvar, "", "", JSON.stringify(message), new Date(), message.topic], 'consumer') + //} } catch (err) { - const errmsg2 = `error-sync: Could not process kafka message or informix DB error: "${err.message}"` + logger.debug(`Consumer:ifx return status error for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus}`) + const errmsg2 = `error-sync: Could not process kafka message or informix DB error: "${err.message}"` logger.error(errmsg2) - logger.debug(`error-sync: consumer "${err.message}"`) - if (!cs_payloadseqid){ - cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2);} -/* await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', - 'message.payload.operation',"Error-Consumer","",err.message,"",'message.payload.data',new Date(),'message.topic'],'consumer') - }else{ - auditTrail([cs_payloadseqid,4444,message.payload.table,message.payload.Uniquecolumn, - message.payload.operation,"Informix-updated",retryvar,"consumer2","",JSON.stringify(message), message.timestamp,message.topic],'consumer') - }*/ - - try { - if (message.payload['retryCount']) retryvar = message.payload.retryCount; - await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish - logger.debug(`Trying to push same message after adding retryCounter`) - if (!message.payload.retryCount) { - message.payload.retryCount = 0 - logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry); - } - if (message.payload.retryCount >= config.KAFKA.maxRetry) { - logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic); - logger.debug(`error-sync: consumer max-retry-limit reached`) - // push to slack - alertIt("slack message" - await callposttoslack(`error-sync: postgres-ifx-processor : consumer max-retry-limit reached: "${message.payload.table}": payloadseqid : "${cs_payloadseqid}"`) - let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic }) - notifiyMessage.payload['recipients'] = config.KAFKA.recipients - logger.debug('pushing following message on kafka error alert queue:') - //logger.debug(notifiyMessage) - await pushToKafka(notifiyMessage) - return - } - message.payload['retryCount'] = message.payload.retryCount + 1; - await pushToKafka(message) - var errmsg9 = `error-sync: Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"` - logger.debug(errmsg9) - //await callposttoslack(errmsg9) - } catch (err) { - - await auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, - message.payload.operation,"Error-republishing",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer') - const errmsg1 = `error-sync: postgres-ifx-processor: consumer : Error-republishing: "${err.message}"` - logger.error(errmsg1) - logger.debug(`error-sync: consumer re-publishing "${err.message}"`) - // push to slack - alertIt("slack message" - await callposttoslack(errmsg1) - } + logger.debug(`error-sync: consumer "${err.message}"`) + await retrypushtokakfa(message, topic, m, partition) + } finally { + await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success } } } +async function retrypushtokakfa(message, topic, m, partition) { + let cs_payloadseqid + if (message.payload.payloadseqid) cs_payloadseqid = message.payload.payloadseqid; + logger.debug(`Consumer : At retry function`) + if (!cs_payloadseqid) { + cs_payloadseqid = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); + } + try { + if (message.payload['retryCount']) retryvar = message.payload.retryCount; + logger.debug(`Trying to push same message after adding retryCounter`) + if (!message.payload.retryCount) { + message.payload.retryCount = 0 + logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry); + } + if (message.payload.retryCount >= config.KAFKA.maxRetry) { + logger.debug('Reached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic); + logger.debug(`error-sync: consumer max-retry-limit reached`) + //await callposttoslack(`error-sync: postgres-ifx-processor : consumer max-retry-limit reached: "${message.payload.table}": payloadseqid : "${cs_payloadseqid}"`) + let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic }) + notifiyMessage.payload['recipients'] = config.KAFKA.recipients + logger.debug('pushing following message on kafka error alert queue:') + //retry push to error topic kafka again + await pushToKafka(notifiyMessage) + return + } + message.payload['retryCount'] = message.payload.retryCount + 1; + await pushToKafka(message) + var errmsg9 = `consumer : Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"` + logger.debug(errmsg9) + } + catch (err) { + await auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn, + message.payload.operation, "Error-republishing", message.payload['retryCount'], err.message, "", message.payload.data, new Date(), message.topic], 'consumer') + const errmsg1 = `error-sync: postgres-ifx-processor: consumer : Error-republishing: "${err.message}"` + logger.error(errmsg1) + logger.debug(`error-sync: consumer re-publishing "${err.message}"`) + // await callposttoslack(errmsg1) + } finally { + await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish + } +} + async function callposttoslack(slackmessage) { -if(config.SLACK.SLACKNOTIFY === 'true') { + if (config.SLACK.SLACKNOTIFY === 'true') { return new Promise(function (resolve, reject) { - postMessage(slackmessage, (response) => { - console.log(`respnse : ${response}`) - if (response.statusCode < 400) { - logger.debug('Message posted successfully'); - //callback(null); - } else if (response.statusCode < 500) { - const errmsg1 =`Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` - logger.debug(`error-sync: ${errmsg1}`) - } - else { - logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); - //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); - } - resolve("done") - }); + postMessage(slackmessage, (response) => { + console.log(`respnse : ${response}`) + if (response.statusCode < 400) { + logger.debug('Message posted successfully'); + //callback(null); + } else if (response.statusCode < 500) { + const errmsg1 = `Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` + logger.debug(`error-sync: ${errmsg1}`) + } + else { + logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + } + resolve("done") + }); }) //end -} + } } @@ -137,16 +152,17 @@ if(config.SLACK.SLACKNOTIFY === 'true') { */ async function setupKafkaConsumer() { try { - await consumer.init() - //await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler) - await consumer.subscribe(kafkaOptions.topic, dataHandler) - + const strategies = [{ + subscriptions: [kafkaOptions.topic], + handler: dataHandler + }]; + await consumer.init(strategies) logger.info('Initialized kafka consumer') healthcheck.init([check]) } catch (err) { logger.error('Could not setup kafka consumer') logger.logFullError(err) - logger.debug(`error-sync: consumer kafka-setup "${err.message}"`) + logger.debug(`error-sync: consumer kafka-setup "${err.message}"`) terminate() } } diff --git a/src/producer.js b/src/producer.js index 6a18df1..9f8a975 100644 --- a/src/producer.js +++ b/src/producer.js @@ -20,19 +20,16 @@ async function setupPgClient() { var payloadcopy try { await pgClient.connect() - for (const triggerFunction of pgOptions.triggerFunctions) { + //for (const triggerFunction of pgOptions.triggerFunctions) { + for (const triggerFunction of pgOptions.triggerFunctions.split(',')) { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { try { payloadcopy = "" - logger.debug('Entering producer 1') - logger.debug(message.toString()) - logger.debug('Entering producer 2') + logger.debug('Entering producer 2') logger.debug(message) - logger.debug('Entering producer 3') - logger.debug(JSON.stringify(message.payload)) - const payload = JSON.parse(message.payload) + const payload = JSON.parse(message.payload) payloadcopy = message const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { @@ -40,11 +37,12 @@ try { //logger.info('trying to push on kafka topic') await pushToKafka(payload) logger.info('Push to kafka and added for audit trail') + audit(message) } else { logger.info('Push to dynamodb for reconciliation') await pushToDynamoDb(payload) } - audit(message) + } else { logger.debug('Ignoring message with incorrect topic or originator') // push to slack - alertIt("slack message") @@ -119,7 +117,7 @@ async function audit(message) { } else { logger.debug(`Producer DynamoDb : ${logMessage}`); } - auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", JSON.stringify(message), pl_timestamp, pl_topic], 'producer') + await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", JSON.stringify(message), pl_timestamp, pl_topic], 'producer') } else { const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) if (!isFailover) { diff --git a/src/reconsiler-audit.js b/src/reconsiler-audit.js index 4491fd1..c75b3dd 100644 --- a/src/reconsiler-audit.js +++ b/src/reconsiler-audit.js @@ -24,9 +24,10 @@ async function setupPgClient() { rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end]; sql1 = "select pgifx_sync_audit.seq_id, pgifx_sync_audit.payloadseqid,pgifx_sync_audit.auditdatetime ,pgifx_sync_audit.syncstatus, pgifx_sync_audit.payload from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)" - sql2 = " and pgifx_sync_audit.producer_err <> 'Reconsiler1' and pgifx_sync_audit.auditdatetime between (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($2)" + sql2 = " and pgifx_sync_audit.tablename not in ('sync_test_id') and pgifx_sync_audit.producer_err <> 'Reconsiler1' and pgifx_sync_audit.auditdatetime between (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($2)" sql3 = " and (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($3)" sql = sql1 + sql2 + sql3 + logger.info(`${sql}`) await pgClient.query(sql,paramvalues, async (err,result) => { if (err) { var errmsg0 = `error-sync: Audit Reconsiler1 query "${err.message}"` @@ -37,21 +38,43 @@ async function setupPgClient() { console.log("Reconsiler1 : Rowcount = ", result.rows.length) for (var i = 0; i < result.rows.length; i++) { for(var columnName in result.rows[i]) { - // console.log('column "%s" has a value of "%j"', columnName, result.rows[i][columnName]); - //if ((columnName === 'seq_id') || (columnName === 'payload')){ + logger.debug(`reconsiler record details : ${result.rows[i][columnName]}`) if ((columnName === 'payload')){ var reconsiler_payload = result.rows[i][columnName] } }//column for loop try { - //console.log("reconsiler_payload====",reconsiler_payload); - if (reconsiler_payload != ""){ - var s_payload = reconsiler_payload - payload = JSON.parse(s_payload) - payload1 = payload.payload - await pushToKafka(payload1) + if (reconsiler_payload != ""){ + /* s_payload = reconsiler_payload. //original code + payload = JSON.parse(s_payload) + payload1 = payload.payload + await pushToKafka(payload1) */ + + let s_payload = reconsiler_payload + let s_payload1 = JSON.stringify(s_payload) + let payload1 + let payload + if (s_payload1.includes("processId")) + { + console.log("here1") + payload = JSON.parse(s_payload) + //payload1 = JSON.parse(payload.payload) + payload1 = payload.payload + console.log(payload1) + } else + {console.log("here2") + payload = JSON.parse(s_payload1) + payload1 = payload + console.log(payload1) + } + //s_payload = JSON.stringify(s_payload) + //let payload = JSON.parse(s_payload) + //payload1 = payload.payload + await pushToKafka(payload1) + logger.info('Reconsiler1 Push to kafka and added for audit trail') await audit(s_payload,0) //0 flag means reconsiler 1. 1 flag reconsiler 2 i,e dynamodb + // } } }catch (error) { logger.error('Reconsiler1 : Could not parse message payload') logger.debug(`error-sync: Reconsiler1 parse message : "${error.message}"`) @@ -165,13 +188,16 @@ function onScan(err, data) { //console.log(item.payloadseqid); var retval = await verify_pg_record_exists(item.payloadseqid) //console.log("retval", retval); - if (retval === false){ - var s_payload = (item.pl_document) + var s_payload = (item.pl_document) payload = s_payload payload1 = (payload.payload) + if (retval === false && `${payload1.table}` !== 'sync_test_id'){ + /* var s_payload = (item.pl_document) + payload = s_payload + payload1 = (payload.payload)*/ await pushToKafka(item.pl_document) await audit(s_payload,1) //0 flag means reconsiler 1. 1 flag reconsiler 2 i,e dynamodb - logger.info(`Reconsiler2 : ${item.payloadseqid} posted to kafka: Total Kafka Count : ${total_pushtokafka}`) + logger.info(`Reconsiler2 : ${payload1.table} ${item.payloadseqid} posted to kafka: Total Kafka Count : ${total_pushtokafka}`) total_pushtokafka += 1 } total_dd_records += 1 diff --git a/src/reconsiler-dd.js b/src/reconsiler-dd.js new file mode 100644 index 0000000..2d403d8 --- /dev/null +++ b/src/reconsiler-dd.js @@ -0,0 +1,128 @@ + + +const config = require('config') +const pg = require('pg') +var AWS = require("aws-sdk"); +const logger = require('./common/logger') +const pushToKafka = require('./services/pushToKafka') +const postMessage = require('./services/posttoslack') +const auditTrail = require('./services/auditTrail'); +const port = 3000 +//===============RECONSILER2 DYNAMODB CODE STARTS HERE ========================== + +async function callReconsiler2() +{console.log("inside 2"); + docClient.scan(params, onScan); +} + +var docClient = new AWS.DynamoDB.DocumentClient({ + region: 'us-east-1', + convertEmptyValues: true + }); +//ElapsedTime = 094600000 +ElapsedTime = config.DYNAMODB.DD_ElapsedTime + var params = { + TableName: config.DYNAMODB.DYNAMODB_TABLE, + FilterExpression: "#timestamp between :time_1 and :time_2", + ExpressionAttributeNames: { + "#timestamp": "timestamp", + }, + ExpressionAttributeValues: { + ":time_1": Date.now() - ElapsedTime, + ":time_2": Date.now() + } + } + +function onScan(err, data) { + if (err) { + logger.error("Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2)); + terminate() + } else { + try + { + console.log("Scan succeeded."); + let total_dd_records = 0; + let total_pushtokafka = 0; + data.Items.forEach(async function(item) { + //console.log(item.payloadseqid); + var retval = await verify_pg_record_exists(item.payloadseqid) + //console.log("retval", retval); + var s_payload = (item.pl_document) + payload = s_payload + payload1 = (payload.payload) + if (retval === false && `${payload1.table}` !== 'sync_test_id'){ + /* var s_payload = (item.pl_document) + payload = s_payload + payload1 = (payload.payload)*/ + await pushToKafka(item.pl_document) + await audit(s_payload,1) //0 flag means reconsiler 1. 1 flag reconsiler 2 i,e dynamodb + logger.info(`Reconsiler2 : ${payload1.table} ${item.payloadseqid} posted to kafka: Total Kafka Count : ${total_pushtokafka}`) + total_pushtokafka += 1 + } + total_dd_records += 1 + }); + logger.info(`Reconsiler2 : count of total_dd_records ${total_dd_records}`); + if (typeof data.LastEvaluatedKey != "undefined") { + console.log("Scanning for more..."); + params.ExclusiveStartKey = data.LastEvaluatedKey; + docClient.scan(params, onScan); + } + } + catch (err) { + const errmsg = `error-sync: Reconsiler2 : Error during dynamodb scan/kafka push: "${err.message}"` + logger.error(errmsg) + logger.logFullError(err) + callposttoslack(errmsg) + //terminate() + } + } + //terminate() +} + +async function verify_pg_record_exists(seqid) +{ + try { + const pgClient = new pg.Client(pgConnectionString) + if (!pgClient.connect()) {await pgClient.connect()} + var paramvalues = [seqid] + sql = 'select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.payloadseqid = ($1)' + return new Promise(function (resolve, reject) { + pgClient.query(sql, paramvalues, async (err, result) => { + if (err) { + var errmsg0 = `error-sync: Audit reconsiler2 query "${err.message}"` + console.log(errmsg0) + } + else { + if (result.rows.length > 0) { + //console.log("row length > 0 ") + resolve(true); + } + else { + //console.log("0") + resolve(false); + } + } + pgClient.end() + }) + })} + catch (err) { + const errmsg = `error-sync: Reconsiler2 : Error in setting up postgres client: "${err.message}"` + logger.error(errmsg) + logger.logFullError(err) + await callposttoslack(errmsg) + terminate() + } +} + +//=================BEGIN HERE ======================= +const terminate = () => process.exit() + +async function run() { + //logger.debug("Initialising Reconsiler1 setup...") + //await setupPgClient() + logger.debug("Initialising Reconsiler2 setup...") + callReconsiler2() + // terminate() +} +//execute +run() diff --git a/src/services/auditTrail.js b/src/services/auditTrail.js index 7e0d645..ed78523 100644 --- a/src/services/auditTrail.js +++ b/src/services/auditTrail.js @@ -1,12 +1,12 @@ const config = require('config') -const pg = require('pg') +const pgpool = require('./db.js'); const logger = require('../common/logger') - -const pgOptions = config.get('POSTGRES') -const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` -let pgClient2 +//const pg = require('pg') +//const pgOptions = config.get('POSTGRES') +//const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` +//let pgClient2 //console.log(`"${pgConnectionString}"`); -async function setupPgClient2 () { +/*async function setupPgClient2 () { pgClient2 = new pg.Client(pgConnectionString) try { await pgClient2.connect() @@ -17,12 +17,12 @@ async function setupPgClient2 () { logger.logFullError(err) process.exit() } -} +}*/ async function auditTrail (data,sourcetype) { -if (!pgClient2) { +/*if (!pgClient2) { await setupPgClient2() -} +}*/ if (sourcetype === 'producer'){ sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)' sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus,producer_err) = ($6,$9) where pgifx_sync_audit.payloadseqid = $1'; @@ -31,13 +31,9 @@ if (sourcetype === 'producer'){ } else { sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)' sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus,consumer_err,retrycount) = ($6,$8,$7)'; - // where pgifx_sync_audit.payloadseqid = $1'; - //and pgifx_sync_audit.processId = $2'; sql = sql0 + sql1 - logger.debug(`--${1} ${3} 1 Audit Trail update consumer--`) - //logger.debug(`sql values "${sql}"`); } - return pgClient2.query(sql, data, (err, res) => { + /*return pgClient2.query(sql, data, (err, res) => { if (err) { logger.debug(`-- Audit Trail update error-- ${err.stack}`) //pgClient2.end() @@ -45,7 +41,27 @@ if (sourcetype === 'producer'){ // logger.debug(`--Audit Trail update success-- `) } }) -pgClient2.end() +pgClient2.end() */ + +pgpool.on('error', (err, client) => { + logger.debug(`Unexpected error on idle client : ${err}`) + process.exit(-1) + }) + +await pgpool.connect(async (err, client, release) => { + if (err) { + return logger.debug(`Error acquiring client : ${err.stack}`) + } + await client.query(sql, data, (err, res) => { + release() + if (err) { + return logger.debug(`Error executing Query : ${err.stack}`) + } + logger.debug(`Audit Trail update : ${res.rowCount}`) + }) + }) + + } diff --git a/src/services/db.js b/src/services/db.js new file mode 100644 index 0000000..40f7dec --- /dev/null +++ b/src/services/db.js @@ -0,0 +1,5 @@ +const config = require('config') +const pg = require('pg') + const pgOptions = config.get('POSTGRES') + var pool = new pg.Pool(pgOptions); +module.exports = pool; diff --git a/src/services/updateInformix.js b/src/services/updateInformix.js index 70c868d..8a798a8 100644 --- a/src/services/updateInformix.js +++ b/src/services/updateInformix.js @@ -2,42 +2,56 @@ const informix = require('../common/informixWrapper') const logger = require('../common/logger') -/** - * Updates informix database with insert/update/delete operation - * @param {Object} payload The DML trigger data - */ -async function updateInformix (payload) { - logger.debug('=====Starting to update informix with data:====') - //const operation = payload.operation.toLowerCase() - const operation = payload.payload.operation.toLowerCase() - console.log("level 1 informix ",operation) - let sql = null +String.prototype.escapeSpecialChars = function () { + return this.replace(/\n/g, "\\n"); +}; - const columns = payload.payload.data - const primaryKey = payload.payload.Uniquecolumn +async function updateInformix(payload) { + logger.debug(`Informix Received from consumer-kafka :${JSON.stringify(payload)}`) + const operation = payload.payload.operation.toLowerCase() + console.log("=====Informix DML Operation :==========", operation) + let sql = null + let t0 = [] + const columns = payload.payload.data + const primaryKey = payload.payload.Uniquecolumn // Build SQL query switch (operation) { case 'insert': { const columnNames = Object.keys(columns) - sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `'${columns[k]}'`).join(', ')});` // "insert into : (col_1, col_2, ...) values (val_1, val_2, ...)" + sql = `insert into ${payload.payload.schema}:${payload.payload.table} (${columnNames.join(', ')}) values (${columnNames.map((k) => `?`).join(', ')});` + t0 = Object.keys(columns).map((key) => `{"value":"${columns[key]}"}`) } break case 'update': { - sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update :
set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val" + sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}= ?`).join(', ')} where ${primaryKey}= ?;` + t0 = Object.keys(columns).map((key) => `{"value":"${columns[key]}"}`) + t0.push(`{"value":"${columns[primaryKey]}"}`) //param value for appended for where clause } break case 'delete': { - sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from :
where primary_key_col=primary_key_val" + sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}= ?;` + t0.push(`{"value":"${columns[primaryKey]}"}`) } break default: throw new Error(`Operation ${operation} is not supported`) } - const result = await informix.executeQuery(payload.payload.schema, sql, null) + //Preparedstatement for informix + t0.forEach((name, index) => t0[index] = `${name.escapeSpecialChars()}`); + //logger.debug(`Param values : ${t0}`); + let temp1 = "[" + `${t0}` + "]" + let finalparam = JSON.parse(temp1) + + /*console.log(`Typeof finalparam : ${typeof(finalparam)}`) + if (finalparam.constructor === Array ) console.log('isarray') + else console.log('finalparam not an array')*/ + logger.debug(`Final sql and param values are -- ${sql} ${JSON.stringify(finalparam)}`); + const result = await informix.executeQuery(payload.payload.schema, sql, finalparam) + logger.debug(`ifx execute query result : ${result}`) return result }