Skip to content

Commit 4b29389

Browse files
authored
Merge pull request #8 from cmu-delphi/sgratzl/meta
feat: add covidcast meta signal model
2 parents 6062b25 + 6c79a28 commit 4b29389

File tree

8 files changed

+435
-41
lines changed

8 files changed

+435
-41
lines changed

.pylintrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ min-public-methods=1
55

66
[MESSAGES CONTROL]
77

8-
disable=R0801, C0330, E1101, E0611, C0114, C0116, C0103, R0913, R0914, W0702, too-many-public-methods
8+
disable=R0801, C0330, E1101, E0611, C0114, C0116, C0103, R0913, R0914, W0702, too-many-public-methods, too-many-instance-attributes
99

1010
[FORMAT]
1111
max-line-length=120

delphi_epidata/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,14 @@
1313
EpiDataFormatType,
1414
AEpiDataCall,
1515
)
16+
from ._covidcast import (
17+
DataSignal,
18+
DataSource,
19+
WebLink,
20+
DataSignalGeoStatistics,
21+
CovidcastDataSources,
22+
GeoType,
23+
TimeType,
24+
)
1625

1726
__author__ = "Delphi Group"

delphi_epidata/_covidcast.py

Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
from dataclasses import Field, InitVar, dataclass, field, fields
2+
from typing import (
3+
Any,
4+
Callable,
5+
Dict,
6+
Generic,
7+
Iterable,
8+
List,
9+
Literal,
10+
Mapping,
11+
Optional,
12+
OrderedDict,
13+
Sequence,
14+
Tuple,
15+
Union,
16+
overload,
17+
get_args,
18+
)
19+
from functools import cached_property
20+
from pandas import DataFrame
21+
from ._model import (
22+
EpiRangeLike,
23+
CALL_TYPE,
24+
EpidataFieldInfo,
25+
EpidataFieldType,
26+
EpiRangeParam,
27+
InvalidArgumentException,
28+
)
29+
30+
31+
GeoType = Literal["nation", "msa", "hrr", "hhs", "state", "county"]
32+
TimeType = Literal["day", "week"]
33+
34+
35+
@dataclass
36+
class WebLink:
37+
"""
38+
represents a web link
39+
"""
40+
41+
alt: str
42+
href: str
43+
44+
45+
@dataclass
46+
class DataSignalGeoStatistics:
47+
"""
48+
COVIDcast signal statistics
49+
"""
50+
51+
min: float
52+
max: float
53+
mean: float
54+
stdev: float
55+
56+
57+
def _limit_fields(data: Dict[str, Any], class_fields: Tuple[Field, ...]) -> Dict[str, Any]:
58+
field_names = {f.name for f in class_fields}
59+
return {k: v for k, v in data.items() if k in field_names}
60+
61+
62+
def define_covidcast_fields() -> List[EpidataFieldInfo]:
63+
return [
64+
EpidataFieldInfo("source", EpidataFieldType.text),
65+
EpidataFieldInfo("signal", EpidataFieldType.text),
66+
EpidataFieldInfo(
67+
"geo_type",
68+
EpidataFieldType.categorical,
69+
categories=list(get_args(GeoType)),
70+
),
71+
EpidataFieldInfo("geo_value", EpidataFieldType.text),
72+
EpidataFieldInfo("time_type", EpidataFieldType.categorical, categories=list(get_args(TimeType))),
73+
EpidataFieldInfo("time_value", EpidataFieldType.date),
74+
EpidataFieldInfo("issue", EpidataFieldType.date),
75+
EpidataFieldInfo("lag", EpidataFieldType.int),
76+
EpidataFieldInfo("value", EpidataFieldType.float),
77+
EpidataFieldInfo("stderr", EpidataFieldType.float),
78+
EpidataFieldInfo("sample_size", EpidataFieldType.int),
79+
EpidataFieldInfo("direction", EpidataFieldType.float),
80+
EpidataFieldInfo("missing_value", EpidataFieldType.int),
81+
EpidataFieldInfo("missing_stderr", EpidataFieldType.int),
82+
EpidataFieldInfo("missing_sample_size", EpidataFieldType.int),
83+
]
84+
85+
86+
@dataclass
87+
class DataSignal(Generic[CALL_TYPE]):
88+
"""
89+
represents a COVIDcast data signal
90+
"""
91+
92+
_create_call: Callable[[Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]], CALL_TYPE]
93+
94+
source: str
95+
signal: str
96+
signal_basename: str
97+
name: str
98+
active: bool
99+
short_description: str
100+
description: str
101+
time_label: str
102+
value_label: str
103+
format: Literal["per100k", "percent", "fraction", "count", "raw"] = "raw"
104+
category: Literal["early", "public", "late", "other"] = "other"
105+
high_values_are: Literal["good", "bad", "neutral"] = "neutral"
106+
is_smoothed: bool = False
107+
is_weighted: bool = False
108+
is_cumulative: bool = False
109+
has_stderr: bool = False
110+
has_sample_size: bool = False
111+
link: Sequence[WebLink] = field(default_factory=list)
112+
compute_from_base: bool = False
113+
time_type: TimeType = "day"
114+
115+
geo_types: Dict[GeoType, DataSignalGeoStatistics] = field(default_factory=dict)
116+
117+
def __post_init__(self) -> None:
118+
self.link = [WebLink(alt=l["alt"], href=l["href"]) if isinstance(l, dict) else l for l in self.link]
119+
stats_fields = fields(DataSignalGeoStatistics)
120+
self.geo_types = {
121+
k: DataSignalGeoStatistics(**_limit_fields(l, stats_fields)) if isinstance(l, dict) else l
122+
for k, l in self.geo_types.items()
123+
}
124+
125+
@staticmethod
126+
def to_df(signals: Iterable["DataSignal"]) -> DataFrame:
127+
df = DataFrame(
128+
signals,
129+
columns=[
130+
"source",
131+
"signal",
132+
"name",
133+
"active",
134+
"short_description",
135+
"description",
136+
"time_type",
137+
"time_label",
138+
"value_label",
139+
"format",
140+
"category",
141+
"high_values_are",
142+
"is_smoothed",
143+
"is_weighted",
144+
"is_cumulative",
145+
"has_stderr",
146+
"has_sample_size",
147+
],
148+
)
149+
df.insert(6, "geo_types", [",".join(s.geo_types.keys()) for s in signals])
150+
return df.set_index(["source", "signal"])
151+
152+
@property
153+
def key(self) -> Tuple[str, str]:
154+
return (self.source, self.signal)
155+
156+
def call(
157+
self,
158+
geo_type: GeoType,
159+
geo_values: Union[int, str, Iterable[Union[int, str]]],
160+
time_values: EpiRangeParam,
161+
as_of: Union[None, str, int] = None,
162+
issues: Optional[EpiRangeParam] = None,
163+
lag: Optional[int] = None,
164+
) -> CALL_TYPE:
165+
"""Fetch Delphi's COVID-19 Surveillance Streams"""
166+
if any((v is None for v in (geo_type, geo_values, time_values))):
167+
raise InvalidArgumentException("`geo_type`, `time_values`, and `geo_values` are all required")
168+
if issues is not None and lag is not None:
169+
raise InvalidArgumentException("`issues` and `lag` are mutually exclusive")
170+
171+
return self._create_call(
172+
dict(
173+
data_source=self.source,
174+
signals=self.signal,
175+
time_type=self.time_type,
176+
time_values=time_values,
177+
geo_type=geo_type,
178+
geo_values=geo_values,
179+
as_of=as_of,
180+
issues=issues,
181+
lag=lag,
182+
)
183+
)
184+
185+
def __call__(
186+
self,
187+
geo_type: GeoType,
188+
geo_values: Union[int, str, Iterable[Union[int, str]]],
189+
time_values: EpiRangeParam,
190+
as_of: Union[None, str, int] = None,
191+
issues: Optional[EpiRangeParam] = None,
192+
lag: Optional[int] = None,
193+
) -> CALL_TYPE:
194+
"""Fetch Delphi's COVID-19 Surveillance Streams"""
195+
return self.call(geo_type, geo_values, time_values, as_of, issues, lag)
196+
197+
198+
@dataclass
199+
class DataSource(Generic[CALL_TYPE]):
200+
"""
201+
represents a COVIDcast data source
202+
"""
203+
204+
_create_call: InitVar[Callable[[Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]], CALL_TYPE]]
205+
206+
source: str
207+
db_source: str
208+
name: str
209+
description: str
210+
reference_signal: str
211+
license: Optional[str] = None
212+
link: Sequence[WebLink] = field(default_factory=list)
213+
dua: Optional[str] = None
214+
215+
signals: Sequence[DataSignal] = field(default_factory=list)
216+
217+
def __post_init__(
218+
self, _create_call: Callable[[Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]], CALL_TYPE]
219+
) -> None:
220+
self.link = [WebLink(alt=l["alt"], href=l["href"]) if isinstance(l, dict) else l for l in self.link]
221+
signal_fields = fields(DataSignal)
222+
self.signals = [
223+
DataSignal(_create_call=_create_call, **_limit_fields(s, signal_fields)) if isinstance(s, dict) else s
224+
for s in self.signals
225+
]
226+
227+
@staticmethod
228+
def to_df(sources: Iterable["DataSource"]) -> DataFrame:
229+
df = DataFrame(
230+
sources,
231+
columns=["source", "name", "description", "reference_signal", "license", "dua"],
232+
)
233+
df["signals"] = [",".join(ss.signal for ss in s.signals) for s in sources]
234+
return df.set_index("source")
235+
236+
def get_signal(self, signal: str) -> Optional[DataSignal]:
237+
return next((s for s in self.signals if s.signal == signal), None)
238+
239+
@cached_property
240+
def signal_df(self) -> DataFrame:
241+
return DataSignal.to_df(self.signals)
242+
243+
244+
@dataclass
245+
class CovidcastDataSources(Generic[CALL_TYPE]):
246+
"""
247+
COVIDcast data source helper
248+
"""
249+
250+
sources: Sequence[DataSource[CALL_TYPE]]
251+
_source_by_name: Dict[str, DataSource[CALL_TYPE]] = field(init=False, default_factory=dict)
252+
_signals_by_key: OrderedDict[Tuple[str, str], DataSignal[CALL_TYPE]] = field(
253+
init=False, default_factory=OrderedDict
254+
)
255+
256+
_create_call: Callable[[Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]], CALL_TYPE]
257+
258+
def __post_init__(self) -> None:
259+
self._source_by_name = {s.source: s for s in self.sources}
260+
for source in self.sources:
261+
for signal in source.signals:
262+
self._signals_by_key[signal.key] = signal
263+
264+
def get_source(self, source: str) -> Optional[DataSource[CALL_TYPE]]:
265+
return self._source_by_name.get(source)
266+
267+
@property
268+
def source_names(self) -> Iterable[str]:
269+
return (s.source for s in self.sources)
270+
271+
@cached_property
272+
def source_df(self) -> DataFrame:
273+
return DataSource.to_df(self.sources)
274+
275+
@property
276+
def signals(self) -> Iterable[DataSignal[CALL_TYPE]]:
277+
return self._signals_by_key.values()
278+
279+
@cached_property
280+
def signal_df(self) -> DataFrame:
281+
return DataSignal.to_df(self.signals)
282+
283+
def get_signal(self, source: str, signal: str) -> Optional[DataSignal[CALL_TYPE]]:
284+
return self._signals_by_key.get((source, signal))
285+
286+
@property
287+
def signal_names(self) -> Iterable[Tuple[str, str]]:
288+
return self._signals_by_key.keys()
289+
290+
def __iter__(self) -> Iterable[DataSource[CALL_TYPE]]:
291+
return iter(self.sources)
292+
293+
@overload
294+
def __getitem__(self, source: str) -> DataSource[CALL_TYPE]:
295+
...
296+
297+
@overload
298+
def __getitem__(self, source_signal: Tuple[str, str]) -> DataSignal[CALL_TYPE]:
299+
...
300+
301+
def __getitem__(
302+
self, source_signal: Union[str, Tuple[str, str]]
303+
) -> Union[DataSource[CALL_TYPE], DataSignal[CALL_TYPE]]:
304+
if isinstance(source_signal, str):
305+
r = self.get_source(source_signal)
306+
assert r is not None
307+
return r
308+
s = self.get_signal(source_signal[0], source_signal[1])
309+
assert s is not None
310+
return s
311+
312+
@staticmethod
313+
def create(
314+
meta: List[Dict],
315+
create_call: Callable[[Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]], CALL_TYPE],
316+
) -> "CovidcastDataSources":
317+
source_fields = fields(DataSource)
318+
sources = [DataSource(_create_call=create_call, **_limit_fields(k, source_fields)) for k in meta]
319+
return CovidcastDataSources(sources, create_call)

0 commit comments

Comments
 (0)