From 853643150abad23e8a619fb4ab62fbd3a429908c Mon Sep 17 00:00:00 2001 From: george haff Date: Thu, 16 Nov 2023 10:37:37 -0500 Subject: [PATCH 1/6] use REPLACE INTO instead of INSERT INTO...UPDATE in covid_hosp acquisition --- src/acquisition/covid_hosp/common/database.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index ec00f662a..d52d4d3eb 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -159,6 +159,26 @@ def insert_metadata(self, publication_date, revision, meta_json, logger=False): (%s, %s, %s, %s, %s, NOW()) ''', (self.table_name, self.hhs_dataset_id, publication_date, revision, meta_json)) + def remove_issues(self, issue_date): + # TODO: this is *VERY* incomplete! SQL statements are never even evaluated! + # delete from metadata table where issue date matches + a = f"DELETE FROM `covid_hosp_meta` WHERE dataset_name='{self.table_name}' AND publication_date='{issue_date}'" + if self.aggregate_key_cols: + # TODO: restrict this to just UNIQUE columns from aggregate keys table? + # create (empty) `some_temp_table` like `{self.table_name}_key` + b = f"CREATE TABLE some_temp_table AS SELECT {self.aggregate_key_cols} FROM `{self.table_name}_key` WHERE FALSE" + # save aggregate keys from what we are about to delete + c = f"SELECT {self.aggregate_key_cols} INTO some_temp_table FROM `{self.table_name}` WHERE `{self.publication_col_name}`={issue_date} GROUP BY {self.aggregate_key_cols}" + # TODO: combine two SQL queries above into one? + # delete from main data table where issue matches + d = f"DELETE FROM `{self.table_name}` WHERE `{self.publication_col_name}`={issue_date}" + if self.aggregate_key_cols: + # delete from saved aggregate keys where the key still exists + e = f"DELETE FROM some_temp_table JOIN `{self.table_name}` USING ({self.aggregate_key_cols})" + # delete from aggregate key table anything left in saved keys (which should be aggregate keys that only existed in the issue we deleted) + f = f"DELETE FROM `{self.table_name}_key` JOIN some_temp_table USING ({self.aggregate_key_cols})" + g = "DROP TABLE some_temp_table" + def insert_dataset(self, publication_date, dataframe, logger=False): """Add a dataset to the database. @@ -193,10 +213,12 @@ def nan_safe_dtype(dtype, value): sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \ f'VALUES ({value_placeholders}) AS new_values ' \ f'ON DUPLICATE KEY UPDATE {updates}' + sql = f'REPLACE INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) VALUES ({value_placeholders})' id_and_publication_date = (0, publication_date) if logger: logger.info('updating values', count=len(dataframe.index)) n = 0 + rows_affected = 0 many_values = [] with self.new_cursor() as cursor: for index, row in dataframe.iterrows(): @@ -212,6 +234,7 @@ def nan_safe_dtype(dtype, value): if n % 5_000 == 0: try: cursor.executemany(sql, many_values) + rows_affected += cursor.rowcount many_values = [] except Exception as e: if logger: @@ -220,6 +243,8 @@ def nan_safe_dtype(dtype, value): # insert final batch if many_values: cursor.executemany(sql, many_values) + rows_affected += cursor.rowcount + logger.info('rows affected', count=rows_affected) # deal with non/seldomly updated columns used like a fk table (if this database needs it) if hasattr(self, 'AGGREGATE_KEY_COLS'): From e85a994ac9c28897949194619fb53a6987e66e38 Mon Sep 17 00:00:00 2001 From: george haff Date: Thu, 16 Nov 2023 10:48:26 -0500 Subject: [PATCH 2/6] oops, forgot logger was optional --- src/acquisition/covid_hosp/common/database.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index d52d4d3eb..af36eba7c 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -244,7 +244,8 @@ def nan_safe_dtype(dtype, value): if many_values: cursor.executemany(sql, many_values) rows_affected += cursor.rowcount - logger.info('rows affected', count=rows_affected) + if logger: + logger.info('rows affected', count=rows_affected) # deal with non/seldomly updated columns used like a fk table (if this database needs it) if hasattr(self, 'AGGREGATE_KEY_COLS'): From 3ccd57d3887346e773412f01881032aaa450eb17 Mon Sep 17 00:00:00 2001 From: george haff Date: Thu, 16 Nov 2023 10:54:39 -0500 Subject: [PATCH 3/6] update tests for REPLACE INTO --- tests/acquisition/covid_hosp/common/test_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/acquisition/covid_hosp/common/test_database.py b/tests/acquisition/covid_hosp/common/test_database.py index c070a00ae..a45953313 100644 --- a/tests/acquisition/covid_hosp/common/test_database.py +++ b/tests/acquisition/covid_hosp/common/test_database.py @@ -148,7 +148,7 @@ def test_insert_dataset(self): actual_sql = mock_cursor.executemany.call_args[0][0] self.assertIn( - 'INSERT INTO `test_table` (`id`, `publication_date`, `sql_str_col`, `sql_int_col`, `sql_float_col`)', + 'REPLACE INTO `test_table` (`id`, `publication_date`, `sql_str_col`, `sql_int_col`, `sql_float_col`)', actual_sql) expected_values = [ From a930babe608e0b1bf73a5f3d5279000c6d5b95b9 Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 5 Dec 2023 12:50:53 -0500 Subject: [PATCH 4/6] cleanup and better logging --- src/acquisition/covid_hosp/common/database.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index af36eba7c..d987834a8 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -204,19 +204,14 @@ def nan_safe_dtype(dtype, value): for csv_name in self.key_columns: dataframe.loc[:, csv_name] = dataframe[csv_name].map(self.columns_and_types[csv_name].dtype) - num_columns = 2 + len(dataframe_columns_and_types) + len(self.additional_fields) - value_placeholders = ', '.join(['%s'] * num_columns) col_names = [f'`{i.sql_name}`' for i in dataframe_columns_and_types + self.additional_fields] - columns = ', '.join(col_names) - updates = ', '.join(f'{c}=new_values.{c}' for c in col_names) - # NOTE: list in `updates` presumes `publication_col_name` is part of the unique key and thus not needed in UPDATE - sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \ - f'VALUES ({value_placeholders}) AS new_values ' \ - f'ON DUPLICATE KEY UPDATE {updates}' - sql = f'REPLACE INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) VALUES ({value_placeholders})' + value_placeholders = ', '.join(['%s'] * (2 + len(columns))) # extra 2 for `id` and `self.publication_col_name` cols + columnstring = ', '.join(col_names) + sql = f'REPLACE INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columnstring}) VALUES ({value_placeholders})' id_and_publication_date = (0, publication_date) + num_values = len(dataframe.index) if logger: - logger.info('updating values', count=len(dataframe.index)) + logger.info('updating values', count=num_values) n = 0 rows_affected = 0 many_values = [] @@ -245,7 +240,9 @@ def nan_safe_dtype(dtype, value): cursor.executemany(sql, many_values) rows_affected += cursor.rowcount if logger: - logger.info('rows affected', count=rows_affected) + # NOTE: REPLACE INTO marks 2 rows affected for a "replace" (one for a delete and one for a re-insert) + # which allows us to count rows which were updated + logger.info('rows affected', total=rows_affected, updated=rows_affected-num_values) # deal with non/seldomly updated columns used like a fk table (if this database needs it) if hasattr(self, 'AGGREGATE_KEY_COLS'): From 924b08ea266859bb1adac82b37199d4d6df56897 Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 5 Dec 2023 13:10:46 -0500 Subject: [PATCH 5/6] variable name typo --- src/acquisition/covid_hosp/common/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index d987834a8..6c8d58523 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -205,7 +205,7 @@ def nan_safe_dtype(dtype, value): dataframe.loc[:, csv_name] = dataframe[csv_name].map(self.columns_and_types[csv_name].dtype) col_names = [f'`{i.sql_name}`' for i in dataframe_columns_and_types + self.additional_fields] - value_placeholders = ', '.join(['%s'] * (2 + len(columns))) # extra 2 for `id` and `self.publication_col_name` cols + value_placeholders = ', '.join(['%s'] * (2 + len(col_names))) # extra 2 for `id` and `self.publication_col_name` cols columnstring = ', '.join(col_names) sql = f'REPLACE INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columnstring}) VALUES ({value_placeholders})' id_and_publication_date = (0, publication_date) From a232e7b6fafe9010db5503f5d1e8460e080a3687 Mon Sep 17 00:00:00 2001 From: george haff Date: Wed, 6 Dec 2023 14:33:03 -0500 Subject: [PATCH 6/6] removed w.i.p. method stub --- src/acquisition/covid_hosp/common/database.py | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index 6c8d58523..efbdb6c45 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -159,26 +159,6 @@ def insert_metadata(self, publication_date, revision, meta_json, logger=False): (%s, %s, %s, %s, %s, NOW()) ''', (self.table_name, self.hhs_dataset_id, publication_date, revision, meta_json)) - def remove_issues(self, issue_date): - # TODO: this is *VERY* incomplete! SQL statements are never even evaluated! - # delete from metadata table where issue date matches - a = f"DELETE FROM `covid_hosp_meta` WHERE dataset_name='{self.table_name}' AND publication_date='{issue_date}'" - if self.aggregate_key_cols: - # TODO: restrict this to just UNIQUE columns from aggregate keys table? - # create (empty) `some_temp_table` like `{self.table_name}_key` - b = f"CREATE TABLE some_temp_table AS SELECT {self.aggregate_key_cols} FROM `{self.table_name}_key` WHERE FALSE" - # save aggregate keys from what we are about to delete - c = f"SELECT {self.aggregate_key_cols} INTO some_temp_table FROM `{self.table_name}` WHERE `{self.publication_col_name}`={issue_date} GROUP BY {self.aggregate_key_cols}" - # TODO: combine two SQL queries above into one? - # delete from main data table where issue matches - d = f"DELETE FROM `{self.table_name}` WHERE `{self.publication_col_name}`={issue_date}" - if self.aggregate_key_cols: - # delete from saved aggregate keys where the key still exists - e = f"DELETE FROM some_temp_table JOIN `{self.table_name}` USING ({self.aggregate_key_cols})" - # delete from aggregate key table anything left in saved keys (which should be aggregate keys that only existed in the issue we deleted) - f = f"DELETE FROM `{self.table_name}_key` JOIN some_temp_table USING ({self.aggregate_key_cols})" - g = "DROP TABLE some_temp_table" - def insert_dataset(self, publication_date, dataframe, logger=False): """Add a dataset to the database.