11# -*- coding: utf-8 -*-
22"""Functions for pulling NSSP ER data."""
3+ import copy
34import logging
5+ import random
6+ import time
7+ from datetime import datetime , timedelta
48from pathlib import Path
59from typing import Optional
10+ from urllib .error import HTTPError
611
712import pandas as pd
813from delphi_utils import create_backup_csv
1116from .constants import MAIN_DATASET_ID , PRELIM_DATASET_ID , PRELIM_SIGNALS_MAP , PRELIM_TYPE_DICT , SIGNALS_MAP , TYPE_DICT
1217
1318
14- def pull_data (socrata_token : str , dataset_id : str ):
19+ def check_last_updated (socrata_token , dataset_id , logger ):
20+ """
21+ Check last updated timestamp to determine if data should be pulled or not.
22+
23+ Note -- if the call to the API fails, the behavior is to treat the data as stale,
24+ as possibly having duplicate is preferable to missing data
25+
26+ Parameters
27+ ----------
28+ socrata_token
29+ dataset_id
30+ logger
31+
32+ Returns bool
33+ -------
34+
35+ """
36+ recently_updated_source = True
37+ try :
38+ client = Socrata ("data.cdc.gov" , socrata_token )
39+ response = client .get_metadata (dataset_id )
40+
41+ updated_timestamp = datetime .utcfromtimestamp (int (response ["rowsUpdatedAt" ]))
42+ now = datetime .utcnow ()
43+ recently_updated_source = (now - updated_timestamp ) < timedelta (days = 1 )
44+
45+ prelim_prefix = "Preliminary " if dataset_id == PRELIM_DATASET_ID else ""
46+ if recently_updated_source :
47+ logger .info (
48+ f"{ prelim_prefix } NHSN data was recently updated; Pulling data" , updated_timestamp = updated_timestamp
49+ )
50+ else :
51+ logger .info (f"{ prelim_prefix } NHSN data is stale; Skipping" , updated_timestamp = updated_timestamp )
52+ # pylint: disable=W0703
53+ except Exception as e :
54+ logger .info ("error while processing socrata metadata; treating data as stale" , error = str (e ))
55+ return recently_updated_source
56+
57+
58+ def pull_data (socrata_token : str , dataset_id : str , backup_dir : str , logger ):
1559 """Pull data from Socrata API."""
1660 client = Socrata ("data.cdc.gov" , socrata_token )
61+ logger .info (
62+ f"Pulling { 'main' if dataset_id == MAIN_DATASET_ID else 'preliminary' } data from Socrata API" ,
63+ dataset_id = dataset_id ,
64+ )
1765 results = []
1866 offset = 0
1967 limit = 50000 # maximum limit allowed by SODA 2.0
20- while True :
68+ # retry logic for 500 error
69+ try :
2170 page = client .get (dataset_id , limit = limit , offset = offset )
22- if not page :
23- break # exit the loop if no more results
71+ except HTTPError as err :
72+ if err .code == 503 :
73+ time .sleep (2 + random .randint (0 , 1000 ) / 1000.0 )
74+ page = client .get (dataset_id , limit = limit , offset = offset )
75+ else :
76+ logger .info ("Error pulling data from Socrata API" , error = str (err ))
77+ raise err
78+
79+ while len (page ) > 0 :
2480 results .extend (page )
2581 offset += limit
82+ page = client .get (dataset_id , limit = limit , offset = offset )
2683
27- df = pd .DataFrame .from_records (results )
84+ if results :
85+ df = pd .DataFrame .from_records (results )
86+ sensor = "prelim" if dataset_id == PRELIM_DATASET_ID else None
87+ create_backup_csv (df , backup_dir , False , sensor = sensor , logger = logger )
88+ else :
89+ df = pd .DataFrame ()
2890 return df
2991
3092
@@ -62,6 +124,7 @@ def pull_nhsn_data(
62124 backup_dir : str ,
63125 custom_run : bool ,
64126 issue_date : Optional [str ],
127+ preliminary : bool = False ,
65128 logger : Optional [logging .Logger ] = None ,
66129):
67130 """Pull the latest NHSN hospital admission data, and conforms it into a dataset.
@@ -79,6 +142,10 @@ def pull_nhsn_data(
79142 Directory to which to save raw backup data
80143 custom_run: bool
81144 Flag indicating if the current run is a patch. If so, don't save any data to disk
145+ preliminary: bool
146+ Flag indicating if the grabbing main or preliminary data
147+ issue_date:
148+ date to indicate which backup file to pull for patching
82149 logger: Optional[logging.Logger]
83150 logger object
84151
@@ -87,83 +154,39 @@ def pull_nhsn_data(
87154 pd.DataFrame
88155 Dataframe as described above.
89156 """
157+ dataset_id = PRELIM_DATASET_ID if preliminary else MAIN_DATASET_ID
90158 # Pull data from Socrata API
91159 df = (
92- pull_data (socrata_token , dataset_id = MAIN_DATASET_ID )
160+ pull_data (socrata_token , dataset_id , backup_dir , logger )
93161 if not custom_run
94- else pull_data_from_file (backup_dir , issue_date , logger , prelim_flag = False )
162+ else pull_data_from_file (backup_dir , issue_date , logger , prelim_flag = preliminary )
95163 )
96164
97- keep_columns = list (TYPE_DICT .keys ())
98-
99- if not df .empty :
100- create_backup_csv (df , backup_dir , custom_run , logger = logger )
101-
102- df = df .rename (columns = {"weekendingdate" : "timestamp" , "jurisdiction" : "geo_id" })
103-
104- for signal , col_name in SIGNALS_MAP .items ():
105- df [signal ] = df [col_name ]
165+ recently_updated = True if custom_run else check_last_updated (socrata_token , dataset_id , logger )
106166
107- df = df [keep_columns ]
108- df ["geo_id" ] = df ["geo_id" ].str .lower ()
109- df .loc [df ["geo_id" ] == "usa" , "geo_id" ] = "us"
110- df = df .astype (TYPE_DICT )
111- else :
112- df = pd .DataFrame (columns = keep_columns )
167+ type_dict = PRELIM_TYPE_DICT if preliminary else TYPE_DICT
168+ keep_columns = list (type_dict .keys ())
169+ filtered_type_dict = copy .deepcopy (type_dict )
113170
114- return df
115-
116-
117- def pull_preliminary_nhsn_data (
118- socrata_token : str ,
119- backup_dir : str ,
120- custom_run : bool ,
121- issue_date : Optional [str ],
122- logger : Optional [logging .Logger ] = None ,
123- ):
124- """Pull the latest preliminary NHSN hospital admission data, and conforms it into a dataset.
125-
126- The output dataset has:
127-
128- - Each row corresponds to a single observation
129- - Each row additionally has columns for the signals in SIGNALS
130-
131- Parameters
132- ----------
133- socrata_token: str
134- My App Token for pulling the NHSN data
135- backup_dir: str
136- Directory to which to save raw backup data
137- custom_run: bool
138- Flag indicating if the current run is a patch. If so, don't save any data to disk
139- logger: Optional[logging.Logger]
140- logger object
141-
142- Returns
143- -------
144- pd.DataFrame
145- Dataframe as described above.
146- """
147- df = (
148- pull_data (socrata_token , dataset_id = PRELIM_DATASET_ID )
149- if not custom_run
150- else pull_data_from_file (backup_dir , issue_date , logger , prelim_flag = True )
151- )
152-
153- keep_columns = list (PRELIM_TYPE_DICT .keys ())
154-
155- if not df .empty :
156- create_backup_csv (df , backup_dir , custom_run , sensor = "prelim" , logger = logger )
171+ signal_map = PRELIM_SIGNALS_MAP if preliminary else SIGNALS_MAP
157172
173+ if not df .empty and recently_updated :
158174 df = df .rename (columns = {"weekendingdate" : "timestamp" , "jurisdiction" : "geo_id" })
159175
160- for signal , col_name in PRELIM_SIGNALS_MAP .items ():
161- df [signal ] = df [col_name ]
176+ for signal , col_name in signal_map .items ():
177+ # older backups don't have certain columns
178+ try :
179+ df [signal ] = df [col_name ]
180+ except KeyError :
181+ logger .info ("column not available in data" , col_name = col_name )
182+ keep_columns .remove (signal )
183+ del filtered_type_dict [signal ]
162184
163185 df = df [keep_columns ]
164- df = df .astype (PRELIM_TYPE_DICT )
165186 df ["geo_id" ] = df ["geo_id" ].str .lower ()
166187 df .loc [df ["geo_id" ] == "usa" , "geo_id" ] = "us"
188+
189+ df = df .astype (filtered_type_dict )
167190 else :
168191 df = pd .DataFrame (columns = keep_columns )
169192
0 commit comments