diff --git a/.circleci/config.yml b/.circleci/config.yml
index da48833..ca3986b 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -41,16 +41,22 @@ build_steps: &build_steps
           command: |
             ./awsconfiguration.sh ${DEPLOY_ENV}
             source awsenvconf
-            ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar
+            
+             ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar
             source buildenvvar
             ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
-
+            
             echo "Running Masterscript - deploy postgres-ifx-processer producer"
             if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then  sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi
             ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar
             source buildenvvar
             ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
-
+                          
+            echo "Running Masterscript - deploy postgres-ifx-processer producer_dd"
+            if [ -e ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json ]; then  sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json; fi
+            ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar
+            source buildenvvar
+            ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
 jobs:
   # Build & Deploy against development backend #
   "build-dev":
@@ -61,7 +67,16 @@ jobs:
       GLOBAL_ENV: "dev"
       APP_NAME: "postgres-ifx-processer"
     steps: *build_steps
-  # Build & Deploy against production backend
+  # Build & Deploy against development backend #
+  "build-test":
+    <<: *defaults
+    environment:
+      DEPLOY_ENV: "DEV"
+      LOGICAL_ENV: "TEST"
+      GLOBAL_ENV: "dev"
+      APP_NAME: "postgres-ifx-processer"
+    steps: *build_steps
+  # Build & Deploy against production backend  
   "build-prod":
     <<: *defaults
     environment:
@@ -82,10 +97,16 @@ workflows:
               only:
                 - dev
                 - dev-retryfeature
+      - "build-test":
+          context : org-global
+          filters:
+            branches:
+              only:
+                - dev-test-pg             
+                - dev-test-pg-rf             
       - "build-prod":
           context : org-global
           filters:
             branches:
               only:
                 - master
-
diff --git a/config/default.js b/config/default.js
index 54a0793..729e68c 100644
--- a/config/default.js
+++ b/config/default.js
@@ -22,8 +22,8 @@ module.exports = {
     database: process.env.PG_DATABASE || 'postgres', // database must exist before running the tool
     password: process.env.PG_PASSWORD || 'password',
     port: parseInt(process.env.PG_PORT, 10) || 5432,
-    triggerFunctions: process.env.TRIGGER_FUNCTIONS || ['db_notifications'], // List of trigger functions to listen to
-    triggerTopics: process.env.TRIGGER_TOPICS || ['db.postgres.sync'], // Names of the topic in the trigger payload
+    triggerFunctions: process.env.TRIGGER_FUNCTIONS || ['dev_db_notifications'], // List of trigger functions to listen to
+    triggerTopics: process.env.TRIGGER_TOPICS || ['dev.db.postgres.sync'], // Names of the topic in the trigger payload
     triggerOriginators: process.env.TRIGGER_ORIGINATORS || ['tc-postgres-delta-processor'] // Names of the originator in the trigger payload
   },
   KAFKA: { // Kafka connection options
@@ -38,6 +38,20 @@ module.exports = {
     errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error',
     recipients: ['admin@abc.com'] // Kafka partitions to use
   },
+  SLACK: {
+    URL: process.env.SLACKURL || 'us-east-1',
+    SLACKCHANNEL: process.env.SLACKCHANNEL || 'ifxpg-migrator',
+    SLACKNOTIFY:  process.env.SLACKNOTIFY || 'false'
+  },
+   RECONSILER:{
+    RECONSILER_START: process.env.RECONSILER_START || 10,
+    RECONSILER_END: process.env.RECONSILER_END || 5,
+    RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm'
+  },
+  DYNAMODB:
+	{
+	DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'dev_pg_ifx_payload_sync'
+	},
 
 	AUTH0_URL: process.env.AUTH0_URL ,
 	AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE ,
diff --git a/config/test.js b/config/test.js
index 228861f..a198f88 100644
--- a/config/test.js
+++ b/config/test.js
@@ -7,9 +7,9 @@ module.exports = {
     host: process.env.INFORMIX_HOST || 'localhost',
     port: parseInt(process.env.INFORMIX_PORT, 10) || 2021,
     user: process.env.INFORMIX_USER || 'informix',
-    password: process.env.INFORMIX_PASSWORD || '1nf0rm1x',
-    database: process.env.INFORMIX_DATABASE || 'tcs_catalog',
-    server: process.env.INFORMIX_SERVER || 'informixoltp_tcp',
+    password: process.env.INFORMIX_PASSWORD || 'password',
+    database: process.env.INFORMIX_DATABASE || 'db',
+    server: process.env.INFORMIX_SERVER || 'informixp',
     minpool: parseInt(process.env.MINPOOL, 10) || 1,
     maxpool: parseInt(process.env.MAXPOOL, 10) || 60,
     maxsize: parseInt(process.env.MAXSIZE, 10) || 0,
diff --git a/package.json b/package.json
index 21b72b6..f026db9 100644
--- a/package.json
+++ b/package.json
@@ -8,11 +8,13 @@
     "lint:fix": "standard --env mocha --fix",
     "producer": "node ./src/producer.js",
     "consumer": "node ./src/consumer.js",
-    "start": "npm run producer & npm run consumer"
+    "producer_dd": "node ./src/producer.js failover",
+    "start": "npm run producer & npm run producer_dd & npm run consumer"
   },
   "author": "Topcoder",
   "license": "ISC",
   "dependencies": {
+    "aws-sdk": "*",
     "config": "^3.2.2",
     "informix-wrapper": "git+https://github.com/appirio-tech/informix-wrapper.git",
     "no-kafka": "^3.4.3",
diff --git a/pg-identity-func-trig-seq.sql b/pg-identity-func-trig-seq.sql
new file mode 100644
index 0000000..bc304c0
--- /dev/null
+++ b/pg-identity-func-trig-seq.sql
@@ -0,0 +1,343 @@
+SET search_path TO common_oltp;
+
+CREATE TABLE
+    pgifx_sync_audit
+    (
+        seq_id SERIAL NOT NULL,
+        payloadseqid CHARACTER VARYING,
+        processid INTEGER,
+        tablename CHARACTER VARYING(64),
+        uniquecolumn CHARACTER VARYING(64),
+        dboperation CHARACTER VARYING(32),
+        syncstatus CHARACTER VARYING(64),
+        retrycount CHARACTER VARYING(64),
+        consumer_err CHARACTER VARYING,
+        producer_err CHARACTER VARYING,
+        payload CHARACTER VARYING,
+        auditdatetime TIMESTAMP(6) WITHOUT TIME ZONE,
+        topicname CHARACTER VARYING(64),
+        UNIQUE (payloadseqid)
+    );
+
+CREATE OR REPLACE FUNCTION "common_oltp"."notify_trigger_common_oltp" ()  RETURNS trigger
+  VOLATILE
+AS $body$
+DECLARE
+  rec RECORD;
+  payload TEXT;
+  column_name TEXT;
+  column_value TEXT;
+  payload_items TEXT[];
+  uniquecolumn TEXT;
+  logtime TEXT;
+  payloadseqid INTEGER;
+BEGIN
+  CASE TG_OP
+  WHEN 'INSERT', 'UPDATE' THEN
+     rec := NEW;
+  WHEN 'DELETE' THEN
+     rec := OLD;
+  ELSE
+     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
+  END CASE;
+ raise notice 'table name : %', TG_TABLE_NAME;
+   RAISE info 'hello world';
+  -- Get required fields
+  FOREACH column_name IN ARRAY TG_ARGV LOOP
+    EXECUTE format('SELECT $1.%I::TEXT', column_name)
+    INTO column_value
+    USING rec;
+   case 
+    when 
+    column_name = 'upload_document' then 
+         --  RAISE NOTICE 'upload_document boolean';
+         if column_value = 'false' then
+                column_value = '0';
+        else
+                column_value = '1';
+        end if;
+    when
+    column_name = 'upload_document_required' then
+         -- RAISE NOTICE 'upload_document_required boolean';
+        if column_value = 'false' then
+                column_value = '0';
+        else
+                column_value = '1';     
+        end if;
+     when
+        column_name = 'identify_email_enabled' then
+        if column_value = 'false' then
+                column_value = '0';
+        else
+                column_value = '1';     
+        end if;
+     when
+        column_name = 'identify_handle_enabled' then
+        if column_value = 'false' then
+                column_value = '0';
+        else
+                column_value = '1';     
+        end if;
+      when  
+     column_name = 'create_date' then 
+      column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS'));
+      when
+         column_name = 'modify_date' then 
+       column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS'));
+     -- when
+      --   column_name = 'achievement_date' then 
+      --column_value := (select to_date (column_value, 'MM/DD/YYYY'));
+      --column_value := (select to_date (column_value));
+       --when
+         --column_name = 'password' then 
+         --column_value := regexp_replace(column_value, '\s', '', 'g');
+         --column_value := regexp_replace(column_value, E'[\\n\\r]+', '\n\r', 'g');  
+           else
+    -- RAISE NOTICE ' not boolean';
+    end case;
+    payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"');
+  END LOOP;
+  --logtime := (select date_display_tz());
+  logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'));
+  payloadseqid := (select nextval('payloadsequence'::regclass));
+
+  uniquecolumn := (SELECT c.column_name
+        FROM information_schema.key_column_usage AS c
+        LEFT JOIN information_schema.table_constraints AS t
+        ON t.constraint_name = c.constraint_name
+        WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1);
+        
+        if (uniquecolumn = '') IS NOT FALSE then
+         uniquecolumn := 'Not-Available';
+         end if;
+
+  -- Build the payload
+  payload := ''
+               || '{'
+              || '"topic":"' || 'test.db.postgres.sync' || '",'
+              || '"originator":"' || 'tc-postgres-delta-processor' || '",'  
+            || '"timestamp":"' || logtime  || '",'
+              || '"mime-type":"' || 'application/json'                   || '",'
+              || '"payload": {'      
+              || '"payloadseqid":"' || payloadseqid                   || '",'
+              || '"Uniquecolumn":"' || uniquecolumn                   || '",'
+              || '"operation":"' || TG_OP                                || '",'
+              || '"schema":"'    || TG_TABLE_SCHEMA                      || '",'
+              || '"table":"'     || TG_TABLE_NAME                        || '",'
+              || '"data": {'      || array_to_string(payload_items, ',')  || '}'
+              || '}}';
+
+  -- Notify the channel
+  PERFORM pg_notify('test_db_notifications', payload);
+  
+  RETURN rec;
+END;
+$body$ LANGUAGE plpgsql;
+
+CREATE TRIGGER "pg_email_trigger"
+  AFTER INSERT OR DELETE OR UPDATE ON email
+  FOR EACH ROW
+EXECUTE PROCEDURE notify_trigger_common_oltp('user_id', 'email_id', 'email_type_id', 'address', 'primary_ind', 'status_id');
+
+CREATE TRIGGER "pg_security_user_trigger"
+  AFTER INSERT OR DELETE OR UPDATE ON security_user
+  FOR EACH ROW
+EXECUTE PROCEDURE notify_trigger_common_oltp('login_id', 'user_id', 'password', 'create_user_id');
+
+
+CREATE TRIGGER "pg_user_achievement_trigger"
+  AFTER INSERT OR DELETE OR UPDATE ON user_achievement
+  FOR EACH ROW
+EXECUTE PROCEDURE notify_trigger_common_oltp('user_id', 'achievement_date', 'achievement_type_id', 'description', 'create_date');
+
+CREATE TRIGGER "pg_user_group_xref_trigger"
+  AFTER INSERT OR DELETE OR UPDATE ON user_group_xref
+  FOR EACH ROW
+EXECUTE PROCEDURE notify_trigger_common_oltp('user_group_id', 'login_id', 'group_id', 'create_user_id', 'security_status_id');
+
+CREATE TRIGGER "pg_user_trigger"
+  AFTER INSERT OR DELETE OR UPDATE ON "user"
+  FOR EACH ROW
+EXECUTE PROCEDURE notify_trigger_common_oltp('user_id', 'first_name', 'last_name', 'handle', 'status', 'activation_code', 'reg_source', 'utm_source', 'utm_medium', 'utm_campaign');
+                                  
+ CREATE TRIGGER "pg_user_sso_login_trigger"
+  AFTER INSERT OR DELETE OR UPDATE ON user_sso_login
+  FOR EACH ROW
+EXECUTE PROCEDURE notify_trigger_common_oltp('user_id', 'sso_user_id', 'sso_user_name', 'provider_id', 'email');
+                                  
+CREATE TRIGGER "pg_user_social_login_trigger"
+  AFTER INSERT OR DELETE OR UPDATE ON user_social_login
+  FOR EACH ROW
+EXECUTE PROCEDURE notify_trigger_common_oltp('social_user_id', 'user_id', 'social_login_provider_id', 'social_user_name', 'social_email', 'social_email_verified');
+                                  
+CREATE TRIGGER "pg_security_groups_trigger"
+  AFTER INSERT OR DELETE OR UPDATE ON security_groups
+  FOR EACH ROW
+EXECUTE PROCEDURE notify_trigger_common_oltp('group_id', 'description', 'challenge_group_ind', 'create_user_id');
+
+                                
+
+                                
+  CREATE SEQUENCE payloadsequence INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 
+START WITH 1  NO CYCLE;
+                                  
+--drop sequence sequence_user_seq;                              
+CREATE SEQUENCE sequence_user_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH
+110100000 NO CYCLE;
+                                  
+--drop SEQUENCE sequence_user_group_seq;
+CREATE SEQUENCE sequence_user_group_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START
+WITH 601000000 NO CYCLE;
+
+
+--drop SEQUENCE sequence_email_seq;
+CREATE SEQUENCE sequence_email_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START
+WITH 70100000 NO CYCLE;
+
+ ---COUNTRY TABLE ADDITIONAL COLUMN
+  alter table country 
+  ADD COLUMN iso_name VARCHAR(128),
+  ADD COLUMN iso_alpha2_code VARCHAR(2),
+  ADD COLUMN iso_alpha3_code VARCHAR(3);
+  --migrate directly from dev/prod database (using ecs run migrator).
+                                  
+ --migrate directly from dev/prod database (using ecs run migrator).
+ ALTER TABLE sso_login_provider 
+ ADD COLUMN identify_email_enabled BOOLEAN NOT NULL default true,
+ ADD COLUMN identify_handle_enabled BOOLEAN NOT NULL default true;                                  
+
+SET search_path TO informixoltp;
+
+CREATE OR REPLACE FUNCTION "informixoltp"."notify_trigger_informixoltp" ()  RETURNS trigger
+  VOLATILE
+AS $body$
+DECLARE
+  rec RECORD;
+  payload TEXT;
+  column_name TEXT;
+  column_value TEXT;
+  payload_items TEXT[];
+  uniquecolumn TEXT;
+  logtime TEXT;
+  payloadseqid INTEGER;
+BEGIN
+  CASE TG_OP
+  WHEN 'INSERT', 'UPDATE' THEN
+     rec := NEW;
+  WHEN 'DELETE' THEN
+     rec := OLD;
+  ELSE
+     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
+  END CASE;
+ raise notice 'table name : %', TG_TABLE_NAME;
+   RAISE info 'hello world';
+  -- Get required fields
+  FOREACH column_name IN ARRAY TG_ARGV LOOP
+    EXECUTE format('SELECT $1.%I::TEXT', column_name)
+    INTO column_value
+    USING rec;
+   case 
+    when 
+    column_name = 'upload_document' then 
+         --  RAISE NOTICE 'upload_document boolean';
+         if column_value = 'false' then
+                column_value = '0';
+        else
+                column_value = '1';
+        end if;
+    when
+    column_name = 'upload_document_required' then
+         -- RAISE NOTICE 'upload_document_required boolean';
+        if column_value = 'false' then
+                column_value = '0';
+        else
+                column_value = '1';     
+        end if;
+     when
+        column_name = 'identify_email_enabled' then
+        if column_value = 'false' then
+                column_value = '0';
+        else
+                column_value = '1';     
+        end if;
+     when
+        column_name = 'identify_handle_enabled' then
+        if column_value = 'false' then
+                column_value = '0';
+        else
+                column_value = '1';     
+        end if;
+      when  
+     column_name = 'create_date' then 
+      column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS'));
+      when
+         column_name = 'modify_date' then 
+       column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS'));
+     -- when
+      --   column_name = 'achievement_date' then 
+      --column_value := (select to_date (column_value, 'MM/DD/YYYY'));
+      --column_value := (select to_date (column_value));
+       --when
+         --column_name = 'password' then 
+         --column_value := regexp_replace(column_value, '\s', '', 'g');
+         --column_value := regexp_replace(column_value, E'[\\n\\r]+', '\n\r', 'g');  
+           else
+    -- RAISE NOTICE ' not boolean';
+    end case;
+    payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"');
+  END LOOP;
+  --logtime := (select date_display_tz());
+  logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'));
+  payloadseqid := (select nextval('payloadsequence'::regclass));
+
+  uniquecolumn := (SELECT c.column_name
+        FROM information_schema.key_column_usage AS c
+        LEFT JOIN information_schema.table_constraints AS t
+        ON t.constraint_name = c.constraint_name
+        WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1);
+        
+        if (uniquecolumn = '') IS NOT FALSE then
+         uniquecolumn := 'Not-Available';
+         end if;
+
+  -- Build the payload
+  payload := ''
+               || '{'
+              || '"topic":"' || 'test.db.postgres.sync' || '",'
+              || '"originator":"' || 'tc-postgres-delta-processor' || '",'  
+            || '"timestamp":"' || logtime  || '",'
+              || '"mime-type":"' || 'application/json'                   || '",'
+              || '"payload": {'      
+              || '"payloadseqid":"' || payloadseqid                   || '",'
+              || '"Uniquecolumn":"' || uniquecolumn                   || '",'
+              || '"operation":"' || TG_OP                                || '",'
+              || '"schema":"'    || TG_TABLE_SCHEMA                      || '",'
+              || '"table":"'     || TG_TABLE_NAME                        || '",'
+              || '"data": {'      || array_to_string(payload_items, ',')  || '}'
+              || '}}';
+
+  -- Notify the channel
+  PERFORM pg_notify('test_db_notifications', payload);
+  
+  RETURN rec;
+END;
+$body$ LANGUAGE plpgsql;
+
+
+CREATE TRIGGER "pg_algo_rating"
+  AFTER INSERT OR DELETE OR UPDATE ON algo_rating
+  FOR EACH ROW
+EXECUTE PROCEDURE notify_trigger_informixoltp('coder_id', 'rating', 'vol', 'round_id', 'num_ratings', 'algo_rating_type_id', 'modify_date');
+
+CREATE TRIGGER "pg_coder"
+  AFTER INSERT OR DELETE OR UPDATE ON coder
+  FOR EACH ROW
+EXECUTE PROCEDURE notify_trigger_informixoltp('coder_id', 'quote', 'coder_type_id', 'comp_country_code', 'display_quote', 'quote_location', 'quote_color', 'display_banner', 'banner_style');
+
+GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA common_oltp,informixoltp, corporate_oltp,tcs_catalog, time_oltp TO coder;
+GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA common_oltp,informixoltp, corporate_oltp,tcs_catalog, time_oltp TO coder;
+
+GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA common_oltp,informixoltp, corporate_oltp,tcs_catalog, time_oltp TO pgsyncuser;
+GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA common_oltp,informixoltp, corporate_oltp,tcs_catalog, time_oltp TO pgsyncuser;
+grant USAGE ON  SCHEMA common_oltp,informixoltp, corporate_oltp,tcs_catalog, time_oltp To pgsyncuser;
+grant USAGE ON  SCHEMA common_oltp,informixoltp, corporate_oltp,tcs_catalog, time_oltp To coder;;
diff --git a/src/consumer.js b/src/consumer.js
index 9794446..c5d1e9b 100644
--- a/src/consumer.js
+++ b/src/consumer.js
@@ -9,6 +9,7 @@ const pushToKafka = require('./services/pushToKafka')
 const healthcheck = require('topcoder-healthcheck-dropin');
 const auditTrail = require('./services/auditTrail');
 const kafkaOptions = config.get('KAFKA')
+const postMessage = require('./services/posttoslack')
 const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
 const consumer = new Kafka.SimpleConsumer({
   connectionString: kafkaOptions.brokers_url,
@@ -20,7 +21,6 @@ const consumer = new Kafka.SimpleConsumer({
   })
 })
 
-
 const check = function () {
   if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {
     return false;
@@ -33,7 +33,7 @@ const check = function () {
   return connected;
 };
 
-
+let cs_processId;
 const terminate = () => process.exit()
 /**
  *
@@ -41,49 +41,99 @@ const terminate = () => process.exit()
  * @param {String} topic The name of the message topic
  * @param {Number} partition The kafka partition to which messages are written
  */
+var retryvar="";
+//let cs_payloadseqid;
 async function dataHandler(messageSet, topic, partition) {
-  for (const m of messageSet) { // Process messages sequentially
+let cs_payloadseqid
+	for (const m of messageSet) { // Process messages sequentially
     let message
     try {
       message = JSON.parse(m.message.value)
       logger.debug('Received message from kafka:')
-      logger.debug(JSON.stringify(message))
-      await updateInformix(message)
+      if (message.payload.payloadseqid) cs_payloadseqid = message.payload.payloadseqid;
+      logger.debug(`consumer : ${message.payload.payloadseqid} ${message.payload.table} ${message.payload.Uniquecolumn} ${message.payload.operation} ${message.timestamp} `);
+       await updateInformix(message)
       await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success
-      await auditTrail([message.payload.payloadseqid,'scorecard_consumer',message.payload.table,message.payload.Uniquecolumn,
-             message.payload.operation,1,0,"",message.timestamp,new Date(),message.payload.data],'consumer')
+	if (message.payload['retryCount']) retryvar = message.payload.retryCount;
+       auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn,
+            message.payload.operation,"Informix-updated",retryvar,"","",message.payload.data, message.timestamp,message.topic],'consumer')
     } catch (err) {
-      logger.error('Could not process kafka message')
+      const errmsg2 = `Could not process kafka message or informix DB error: "${err.message}"`  
+      logger.error(errmsg2)
+      //await callposttoslack(errmsg2)
       //logger.logFullError(err)
+      logger.debug(`error-sync: consumer "${err.message}"`) 
+       if (!cs_payloadseqid){
+	    cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2);
+                        }
+	    
+   await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
+           'message.payload.operation',"Error-Consumer","",err.message,"",'message.payload.data',new Date(),'message.topic'],'consumer')
       try {
+	//var retryvar
+	if (message.payload['retryCount']) retryvar = message.payload.retryCount;
         await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish
-        logger.debug('Trying to push same message after adding retryCounter')
+    //    await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
+      //   'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer')
+        //await callposttoslack(`Retry for Kafka push : retrycount : "${retryvar}"`)
+	logger.debug(`Trying to push same message after adding retryCounter`)
         if (!message.payload.retryCount) {
           message.payload.retryCount = 0
           logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry);
         }
         if (message.payload.retryCount >= config.KAFKA.maxRetry) {
           logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic);
-
-          let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic })
+          logger.debug(`error-sync: consumer max-retry-limit reached`) 
+              // push to slack - alertIt("slack message"
+          await callposttoslack(`error-sync: postgres-ifx-processor : consumer max-retry-limit reached: "${message.payload.table}": payloadseqid : "${cs_payloadseqid}"`)
+	  let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic })
           notifiyMessage.payload['recipients'] = config.KAFKA.recipients
           logger.debug('pushing following message on kafka error alert queue:')
-          logger.debug(notifiyMessage)
-          await pushToKafka(notifiyMessage)
+          //logger.debug(notifiyMessage)
+	  await pushToKafka(notifiyMessage)
           return
         }
         message.payload['retryCount'] = message.payload.retryCount + 1;
         await pushToKafka(message)
-        logger.debug('pushed same message after adding retryCount')
+	var errmsg9 = `Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"`
+     logger.debug(errmsg9)
+     //await callposttoslack(errmsg9)
       } catch (err) {
-	 //await auditTrail([payload.payload.payloadseqid,'scorecard_consumer',payload.payload.table,payload.payload.Uniquecolumn,
-           //  payload.payload.operation,0,message.payload.retryCount,"re-publish kafka err",payload.timestamp,new Date(),""],'consumer')
-        logger.error("Error occured in re-publishing kafka message", err)
+        
+   await auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn,
+            message.payload.operation,"Error-republishing",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer')
+	      const errmsg1 = `postgres-ifx-processor: consumer : Error-republishing: "${err.message}"`
+	      logger.error(errmsg1)
+              logger.debug(`error-sync: consumer re-publishing "${err.message}"`)
+              // push to slack - alertIt("slack message"
+              await callposttoslack(errmsg1)
       }
     }
   }
 }
 
+async function callposttoslack(slackmessage) {
+if(config.SLACK.SLACKNOTIFY === 'true') {
+    return new Promise(function (resolve, reject) {
+        postMessage(slackmessage, (response) => {
+          console.log(`respnse : ${response}`)
+                if (response.statusCode < 400) {
+                logger.debug('Message posted successfully');
+                //callback(null);
+            } else if (response.statusCode < 500) {
+                const errmsg1 =`Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
+                logger.debug(`error-sync: ${errmsg1}`)
+            }
+                else {
+                logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
+                //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
+            }
+                resolve("done")
+        });
+    }) //end
+}
+
+}
 
 /**
  * Initialize kafka consumer
@@ -99,6 +149,7 @@ async function setupKafkaConsumer() {
   } catch (err) {
     logger.error('Could not setup kafka consumer')
     logger.logFullError(err)
+    logger.debug(`error-sync: consumer kafka-setup "${err.message}"`) 
     terminate()
   }
 }
diff --git a/src/producer.js b/src/producer.js
index ce59982..983de78 100644
--- a/src/producer.js
+++ b/src/producer.js
@@ -5,56 +5,125 @@ const config = require('config')
 const pg = require('pg')
 const logger = require('./common/logger')
 const pushToKafka = require('./services/pushToKafka')
+const pushToDynamoDb = require('./services/pushToDynamoDb')
 const pgOptions = config.get('POSTGRES')
+const postMessage = require('./services/posttoslack')
 const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
 const pgClient = new pg.Client(pgConnectionString)
 const auditTrail = require('./services/auditTrail');
 const express = require('express')
 const app = express()
 const port = 3000
+const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false
 
-
-async function setupPgClient () {
-  try {
+async function setupPgClient() {
+var payloadcopy 
+try {
     await pgClient.connect()
     for (const triggerFunction of pgOptions.triggerFunctions) {
       await pgClient.query(`LISTEN ${triggerFunction}`)
     }
     pgClient.on('notification', async (message) => {
       try {
+	payloadcopy = ""
         const payload = JSON.parse(message.payload)
-	const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
+	payloadcopy = message
+        const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
         if (validTopicAndOriginator) {
-     console.log(`${payload.topic} ${payload.payload.table} ${payload.payload.operation} ${payload.timestamp}`);
-	await pushToKafka(payload)
-	} else {
+          if (!isFailover) {
+            //logger.info('trying to push on kafka topic')
+            await pushToKafka(payload)
+            logger.info('Push to kafka and added for audit trail')
+          } else {
+            logger.info('Push to dynamodb for reconciliation')
+            await pushToDynamoDb(payload)
+          }
+          audit(message)
+        } else {
           logger.debug('Ignoring message with incorrect topic or originator')
+          // push to slack - alertIt("slack message")
         }
-     await auditTrail([payload.payload.payloadseqid,'scorecard_producer',1,payload.topic,payload.payload.table,payload.payload.Uniquecolumn,
-	     payload.payload.operation,"",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer')
       } catch (error) {
         logger.error('Could not parse message payload')
-     await auditTrail([payload.payload.payloadseqid,'scorecard_producer',0,payload.topic,payload.payload.table,payload.payload.Uniquecolumn,
-	     payload.payload.operation,"error",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer')
+        logger.debug(`error-sync: producer parse message : "${error.message}"`)
+	const errmsg1 = `postgres-ifx-processor: producer or dd : Error Parse or payload : "${error.message}" \n payload : "${payloadcopy.payload}"`
 	logger.logFullError(error)
+        audit(error)
+        // push to slack - alertIt("slack message"
+	await callposttoslack(errmsg1)
       }
     })
-    logger.info('Listening to notifications')
+    logger.info('pg-ifx-sync producer: Listening to pg-trigger channel.')
   } catch (err) {
-    logger.error('Could not setup postgres client')
+    const errmsg = `postgres-ifx-processor: producer or dd : Error in setting up postgres client: "${err.message}"`
+    logger.error(errmsg)
     logger.logFullError(err)
+    // push to slack - alertIt("slack message")
+    await callposttoslack(errmsg)
     terminate()
   }
 }
 
-async function run () {
+const terminate = () => process.exit()
+
+async function run() {
+  logger.debug("Initialising producer setup...")
   await setupPgClient()
 }
 
+async function callposttoslack(slackmessage) {
+if(config.SLACK.SLACKNOTIFY === 'true') {
+    return new Promise(function (resolve, reject) {
+	postMessage(slackmessage, (response) => {
+          console.log(`respnse : ${response}`)
+		if (response.statusCode < 400) {
+                logger.debug('Message posted successfully');
+                //callback(null);
+            } else if (response.statusCode < 500) {
+		const errmsg1 =`Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
+                logger.debug(`error-sync: ${errmsg1}`)
+            }
+		else {
+                logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
+                //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
+            }
+		resolve("done")
+	});
+    }) //end
+}
+
+}
+// execute  
 run()
 
+async function audit(message) {
+  const pl_processid = message.processId
+  if (pl_processid != 'undefined') {
+    const payload = JSON.parse(message.payload)
+    const pl_seqid = payload.payload.payloadseqid
+    const pl_topic = payload.topic // TODO can move in config ? 
+    const pl_table = payload.payload.table
+    const pl_uniquecolumn = payload.payload.Uniquecolumn
+    const pl_operation = payload.payload.operation
+    const pl_timestamp = payload.timestamp
+    const pl_payload = JSON.stringify(payload.payload)
+    const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`
+    if (!isFailover) {
+      logger.debug(`producer : ${logMessage}`);
+    } else {
+      logger.debug(`Producer DynamoDb : ${logMessage}`);
+    }
+    auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", JSON.stringify(message), pl_timestamp, pl_topic], 'producer')
+  } else {
+    const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2)
+    if (!isFailover) {
+      await auditTrail([pl_randonseq, 1111, "", "", "", "error-producer", "", "", message.message, "", new Date(), ""], 'producer')
+    }
+  }
+}
+
 app.get('/health', (req, res) => {
-        res.send('health ok')
+  res.send('health ok')
 })
 app.listen(port, () => console.log(`app listening on port ${port}!`))
 
diff --git a/src/reconsiler-audit.js b/src/reconsiler-audit.js
new file mode 100644
index 0000000..0c44570
--- /dev/null
+++ b/src/reconsiler-audit.js
@@ -0,0 +1,126 @@
+const config = require('config')
+const pg = require('pg')
+const logger = require('./common/logger')
+const pushToKafka = require('./services/pushToKafka')
+const pgOptions = config.get('POSTGRES')
+const postMessage = require('./services/posttoslack')
+const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
+const pgClient = new pg.Client(pgConnectionString)
+const auditTrail = require('./services/auditTrail');
+const port = 3000
+
+async function setupPgClient() {
+  var payloadcopy
+  try {
+    await pgClient.connect()
+    //sql1= "select * from pgifx_sync_audit where syncstatus in ('Informix-updated') 
+    //and auditdatetime >= (now()::date - interval '10m') and auditdatetime <= (now()::date - interval '5m') ";
+    //>= 12.53 and  <= 12.48
+    rec_d_start = config.RECONSILER.RECONSILER_START
+    rec_d_end = config.RECONSILER.RECONSILER_END
+    rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE
+     var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end];  
+    //var paramvalues = ['push-to-kafka','60002027'];  
+    //sql1 = 'select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)'
+    sql1 = " select seq_id, payloadseqid,auditdatetime at time zone 'utc' at time zone 'Asia/Calcutta',syncstatus, payload where pgifx_sync_audit.syncstatus =($1)"
+    sql2 = " and pgifx_sync_audit.auditdatetime >= DATE(NOW()) - INTERVAL '1"+ rec_d_type + "' * ($2)"
+    sql3 = " and pgifx_sync_audit.auditdatetime <= DATE(NOW()) - INTERVAL '1"+ rec_d_type + "' * ($3)"
+    sql = sql1 + sql2 + sql3
+    console.log('sql ', sql)
+   // sql3 = ' select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)'
+   // sql4 = " and pgifx_sync_audit.payloadseqid = ($2)"
+    
+    //sql = sql3 + sql4
+    //const result = await pgClient.query(sql1, (err, res) => {
+    await pgClient.query(sql,paramvalues, async (err,result) => {
+      if (err) {
+        var errmsg0 = `error-sync: Audit reconsiler query  "${err.message}"`
+        logger.debug (errmsg0)
+        // await callposttoslack(errmsg0)
+    }
+      else{
+        console.log("Reconsiler Rowcount = ", result.rows.length)    
+        for (var i = 0; i < result.rows.length; i++) {
+            for(var columnName in result.rows[i]) {
+                // console.log('column "%s" has a value of "%j"', columnName, result.rows[i][columnName]);
+                //if ((columnName === 'seq_id') || (columnName === 'payload')){
+                if ((columnName === 'payload')){
+                var reconsiler_payload = result.rows[i][columnName]
+                }
+              }//column for loop
+          try {  
+              var s_payload =  reconsiler_payload
+              payload = JSON.parse(s_payload)
+              payload1 = payload.payload
+              await pushToKafka(payload1)
+              logger.info('Reconsiler : Push to kafka and added for audit trail')
+              audit(s_payload)
+            } catch (error) {
+              logger.error('Reconsiler: Could not parse message payload')
+              logger.debug(`error-sync: Reconsiler parse message : "${error.message}"`)
+              const errmsg1 = `postgres-ifx-processor: Reconsiler : Error Parse or payload : "${error.message}" \n payload : "${payloadcopy.payload}"`
+              logger.logFullError(error)
+              audit(error)
+             await callposttoslack(errmsg1)
+            } 
+        }//result for loop
+      }
+       pgClient.end()
+    })  
+  }catch (err) {
+    const errmsg = `postgres-ifx-processor: Reconsiler : Error in setting up postgres client: "${err.message}"`
+    logger.error(errmsg)
+    logger.logFullError(err)
+    await callposttoslack(errmsg)
+    terminate()
+  }
+}
+
+const terminate = () => process.exit()
+
+async function run() {
+  logger.debug("Initialising Reconsiler setup...")
+  await setupPgClient()
+}
+
+async function callposttoslack(slackmessage) {
+  if (config.SLACK.SLACKNOTIFY === 'true') {
+    return new Promise(function (resolve, reject) {
+      postMessage(slackmessage, (response) => {
+        console.log(`respnse : ${response}`)
+        if (response.statusCode < 400) {
+          logger.debug('Message posted successfully');
+          //callback(null);
+        } else if (response.statusCode < 500) {
+          const errmsg1 = `Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
+          logger.debug(`error-sync: ${errmsg1}`)
+        } else {
+          logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
+          //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
+        }
+        resolve("done")
+      });
+    }) //end
+  }
+
+}
+// execute  
+run()
+
+async function audit(message) {
+    const pl_processid = 5555
+    const jsonpayload = JSON.parse(message)
+    payload = JSON.parse(jsonpayload.payload)
+    payload1 = payload.payload
+    const pl_seqid = payload1.payloadseqid
+    const pl_topic = payload1.topic // TODO can move in config ? 
+    const pl_table = payload1.table
+    const pl_uniquecolumn = payload1.Uniquecolumn
+    const pl_operation = payload1.operation
+    const pl_timestamp = payload1.timestamp
+    //const pl_payload = JSON.stringify(payload.payload)
+    const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`
+    logger.debug(`reconsiler : ${logMessage}`);
+   //await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka-reconsiler", "", "reconsiler", "", "", new Date(), ""], 'producer')
+}
+
diff --git a/src/services/auditTrail.js b/src/services/auditTrail.js
index fcd26a5..3f45f95 100644
--- a/src/services/auditTrail.js
+++ b/src/services/auditTrail.js
@@ -5,7 +5,7 @@ const logger = require('../common/logger')
 const pgOptions = config.get('POSTGRES')
 const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
 let pgClient2
-
+//console.log(`"${pgConnectionString}"`);
 async function setupPgClient2 () {
   pgClient2 = new pg.Client(pgConnectionString)
   try {
@@ -24,20 +24,28 @@ if (!pgClient2) {
 	await setupPgClient2()
 }
 if (sourcetype === 'producer'){
- sql = 'INSERT INTO tcs_catalog.producer_scorecard_audit(payloadseqid,origin_source,kafka_post_status,topic_name,table_name,Uniquecolumn,operationtype,errormessage,payloadtime,auditdatetime,payload) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)'
-logger.debug(`--Audit Trail update producer--`)
+        sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)'
+        sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus) = ($6) where pgifx_sync_audit.payloadseqid = $1';
+        sql = sql0 + sql1
+	logger.debug(`--Audit Trail update producer--`)
 } else {
-sql = 'INSERT INTO tcs_catalog.consumer_scorecard_audit(payloadseqid,origin_source,table_name,Uniquecolumn,operationtype,dest_db_status, dest_retry_count,errormessage,payloadtime,auditdatetime,dest_operationquery) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)'
-logger.debug(`--Audit Trail  update consumer--`)
+	sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)'
+	sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus,consumer_err,retrycount) = ($6,$8,$7)';
+	// where pgifx_sync_audit.payloadseqid = $1';
+	//and pgifx_sync_audit.processId = $2';
+	sql = sql0 + sql1
+	logger.debug(`--Audit Trail  update consumer--`)
+	//logger.debug(`sql values "${sql}"`);
 }
   return pgClient2.query(sql, data, (err, res) => {
   if (err) {
   logger.debug(`--Audit Trail  update error-- ${err.stack}`)
   //pgClient2.end()
   } else {
-    logger.debug(`--Audit Trail update success-- `)
+  //  logger.debug(`--Audit Trail update success-- `)
   }
 })
+pgClient2.end()
 }
 
 
diff --git a/src/services/posttoslack.js b/src/services/posttoslack.js
new file mode 100644
index 0000000..f5cf15a
--- /dev/null
+++ b/src/services/posttoslack.js
@@ -0,0 +1,42 @@
+const config = require('config');
+const zlib = require('zlib');
+const url = require('url');
+const https = require('https');
+hookUrl = config.SLACK.URL
+slackChannel = config.SLACK.SLACKCHANNEL
+async function postMessage(message, callback) {
+    var slackMessage = {
+        channel: `${slackChannel}`,
+        text: `${message}`,
+    }
+    const body = JSON.stringify(slackMessage);
+    const options = url.parse(hookUrl);
+    options.method = 'POST';
+    options.headers = {
+        'Content-Type': 'application/json',
+        'Content-Length': Buffer.byteLength(body),
+    };
+    return new Promise(function (resolve, reject) {	
+    const postReq = https.request(options, (res) => {
+        const chunks = [];
+        res.setEncoding('utf8');
+        res.on('data', (chunk) => chunks.push(chunk));
+        res.on('end', () => {
+            if (callback) {
+                callback({
+                    body: chunks.join(''),
+                    statusCode: res.statusCode,
+                    statusMessage: res.statusMessage,
+                });
+            }
+         resolve(res)		
+        });
+       // return res;
+    });
+
+    postReq.write(body);
+    postReq.end();
+    }) 
+}
+
+module.exports = postMessage
diff --git a/src/services/pushToDynamoDb.js b/src/services/pushToDynamoDb.js
new file mode 100644
index 0000000..78d995d
--- /dev/null
+++ b/src/services/pushToDynamoDb.js
@@ -0,0 +1,30 @@
+const config = require('config')
+const logger = require('../common/logger')
+const _ = require('lodash')
+var AWS = require("aws-sdk");
+async function pushToDynamoDb(payload) {
+  try { console.log('----Push To DynomoDB  -------');
+	    var params = {
+	    TableName: config.DYNAMODB.DYNAMODB_TABLE,
+    	    Item: {
+       		payloadseqid: payload.payload.payloadseqid,
+       		pl_document: payload,
+		pl_table: payload.payload.table,
+		pl_uniquecolumn: payload.payload.Uniquecolumn,
+		pl_operation: payload.payload.operation,
+		pl_time: payload.timestamp, 
+		timestamp: Date.now()
+    		}
+  		}
+	  var docClient = new AWS.DynamoDB.DocumentClient({region: 'us-east-1',convertEmptyValues: true});
+	  docClient.put(params, function(err, data) {
+    	if (err) logger.error(err);
+    	else logger.info(data);
+  	});
+
+  } catch (e) {
+          logger.error(`error-sync: Error at PushToDynamoDB "${e}"`)
+  }
+}
+
+module.exports = pushToDynamoDb
diff --git a/src/services/updateInformix.js b/src/services/updateInformix.js
index 5098162..70c868d 100644
--- a/src/services/updateInformix.js
+++ b/src/services/updateInformix.js
@@ -28,11 +28,11 @@ async function updateInformix (payload) {
 	  sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update <schema>:<table> set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val"
       }
       break
-   // case 'delete':
-   //   {
-   //     sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from <schema>:<table> where primary_key_col=primary_key_val"
-   //   }
-   //   break
+    case 'delete':
+      {
+        sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from <schema>:<table> where primary_key_col=primary_key_val"
+      }
+      break
     default:
       throw new Error(`Operation ${operation} is not supported`)
   }