diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index 0c8c3e35d..f5ce462f2 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -218,7 +218,7 @@ def test_geo_value(self): # insert placeholder data: three counties, three MSAs N = 3 rows = [ - CovidcastTestRow.make_default_row(geo_type="fips", geo_value=FIPS[i], value=i) + CovidcastTestRow.make_default_row(geo_type="county", geo_value=FIPS[i], value=i) for i in range(N) ] + [ CovidcastTestRow.make_default_row(geo_type="msa", geo_value=MSA[i], value=i*10) @@ -245,7 +245,7 @@ def fetch(geo): self.assertEqual(request['epidata'], [counties[0]]) # test fetch a specific yet not existing region request = fetch('55555') - self.assertEqual(request['message'], 'Invalid geo_value(s) 55555 for the requested geo_type fips') + self.assertEqual(request['message'], 'Invalid geo_value(s) 55555 for the requested geo_type county') # test fetch a multiple regions request = fetch([FIPS[0], FIPS[1]]) self.assertEqual(request['message'], 'success') @@ -256,10 +256,10 @@ def fetch(geo): self.assertEqual(request['epidata'], [counties[0], counties[2]]) # test fetch a multiple regions but one is not existing request = fetch([FIPS[0], '55555']) - self.assertEqual(request['message'], 'Invalid geo_value(s) 55555 for the requested geo_type fips') + self.assertEqual(request['message'], 'Invalid geo_value(s) 55555 for the requested geo_type county') # test fetch a multiple regions but specify no region request = fetch([]) - self.assertEqual(request['message'], 'geo_value is empty for the requested geo_type fips!') + self.assertEqual(request['message'], 'geo_value is empty for the requested geo_type county!') # test fetch a region with no results request = fetch([FIPS[3]]) self.assertEqual(request['message'], 'no results') @@ -326,7 +326,7 @@ def test_async_epidata(self): # insert placeholder data: three counties, three MSAs N = 3 rows = [ - CovidcastTestRow.make_default_row(geo_type="fips", geo_value=FIPS[i-1], value=i) + CovidcastTestRow.make_default_row(geo_type="county", geo_value=FIPS[i-1], value=i) for i in range(N) ] + [ CovidcastTestRow.make_default_row(geo_type="msa", geo_value=MSA[i-1], value=i*10) diff --git a/src/server/_params.py b/src/server/_params.py index 41f5ce494..39c25ce1e 100644 --- a/src/server/_params.py +++ b/src/server/_params.py @@ -58,10 +58,21 @@ def __init__(self, geo_type: str, geo_values: Union[bool, Sequence[str]]): if not isinstance(geo_values, bool): if geo_values == ['']: raise ValidationFailedException(f"geo_value is empty for the requested geo_type {geo_type}!") - allowed_values = delphi_utils.geomap.GeoMapper().get_geo_values(geo_type) - invalid_values = set(geo_values) - set(allowed_values) - if invalid_values: - raise ValidationFailedException(f"Invalid geo_value(s) {', '.join(invalid_values)} for the requested geo_type {geo_type}") + # TODO: keep this translator in sync with CsvImporter.GEOGRAPHIC_RESOLUTIONS in acquisition/covidcast/ and with GeoMapper + geo_type_translator = { + "county": "fips", + "state": "state_id", + "zip": "zip", + "hrr": "hrr", + "hhs": "hhs", + "msa": "msa", + "nation": "nation" + } + if geo_type in geo_type_translator: # else geo_type is unknown to GeoMapper + allowed_values = delphi_utils.geomap.GeoMapper().get_geo_values(geo_type_translator[geo_type]) + invalid_values = set(geo_values) - set(allowed_values) + if invalid_values: + raise ValidationFailedException(f"Invalid geo_value(s) {', '.join(invalid_values)} for the requested geo_type {geo_type}") self.geo_type = geo_type self.geo_values = geo_values diff --git a/tests/server/test_params.py b/tests/server/test_params.py index 1a401efe2..6d6de4fdc 100644 --- a/tests/server/test_params.py +++ b/tests/server/test_params.py @@ -94,6 +94,10 @@ def test_parse_geo_arg(self): self.assertEqual(parse_geo_arg(), [GeoSet("fips", True)]) with app.test_request_context(f"/?geo=fips:{FIPS[0]}"): self.assertEqual(parse_geo_arg(), [GeoSet("fips", [FIPS[0]])]) + with self.subTest("covidcast"): + for geo_type in "county dma hhs hrr msa nation state".split(): + with app.test_request_context(f"/?geo={geo_type}:*"): + self.assertEqual(parse_geo_arg(), [GeoSet(geo_type, True)]) with self.subTest("single list"): with app.test_request_context(f"/?geo=fips:{FIPS[0]},{FIPS[1]}"): self.assertEqual(parse_geo_arg(), [GeoSet("fips", [FIPS[0], FIPS[1]])])