Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a42f9bf
fix: amending path for notebook
LisaHopcroft Aug 25, 2021
11e1d82
feat: example.py to run examples with logging
LisaHopcroft Aug 25, 2021
c8550ac
feat: reporting process ID
LisaHopcroft Aug 25, 2021
df3e61f
fix: dealing with unexpected column names
LisaHopcroft Aug 25, 2021
2cd1442
fix: checking date format of input file
LisaHopcroft Aug 25, 2021
245d12d
feat: incoming date format can be specified
LisaHopcroft Aug 25, 2021
266c81f
bump to last commit
LisaHopcroft Aug 25, 2021
90882b7
tidy: editing comments
LisaHopcroft Aug 25, 2021
6e47899
bump to last commit
LisaHopcroft Aug 25, 2021
ee679f8
chore: improved info/error messages
LisaHopcroft Aug 25, 2021
669221c
fix: accommodating integer 'code' values
LisaHopcroft Aug 25, 2021
87ccce1
fix: capturing reformatted date
LisaHopcroft Aug 25, 2021
56fbd7d
fix: removing additional data
LisaHopcroft Aug 25, 2021
3137e96
chore: improved info/error messages
LisaHopcroft Aug 25, 2021
4ed669e
feat: adding another example
LisaHopcroft Aug 25, 2021
099abde
bump to previous commit
LisaHopcroft Aug 25, 2021
436ddb0
feat: param for directory + file existance check
LisaHopcroft Aug 26, 2021
b218452
feat: reporting of R script command (when verbose)
LisaHopcroft Aug 26, 2021
bbdf57b
bug: adding directory creation for CSV files
LisaHopcroft Aug 26, 2021
916c91e
feat: handling invalid draw_figures values
LisaHopcroft Aug 26, 2021
aff1496
feat: message when figure dir is created
LisaHopcroft Aug 26, 2021
b7b3c6c
docs: updated examples to demo new features
LisaHopcroft Aug 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 131 additions & 15 deletions change_detection/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas as pd
import numpy as np
from ebmdatalab import bq
import sys

'''
required R packages:
Expand All @@ -30,6 +31,7 @@ def run_r_script(path):
command = 'Rscript'
path2script = os.path.join(os.getcwd(), path)
cmd = [command, path2script]

return subprocess.call(cmd)


Expand All @@ -45,6 +47,13 @@ def __init__(self,
sample=False,
measure=False,
custom_measure=False,
code_variable = 'code',
numerator_variable = 'numerator',
denominator_variable = 'denominator',
date_variable = 'month',
date_format = "%Y-%m-%d",
base_dir = os.getcwd(),
data_subdir = 'data',
direction='both',
use_cache=True,
csv_name='bq_cache.csv',
Expand All @@ -59,19 +68,38 @@ def __init__(self,
self.sample = sample
self.measure = measure
self.custom_measure = custom_measure
self.code_variable = code_variable
self.numerator_variable = numerator_variable
self.denominator_variable = denominator_variable
self.date_variable = date_variable
self.date_format = date_format
self.expected_columns = {"code": self.code_variable,
"month": self.date_variable,
"numerator": self.numerator_variable,
"denominator": self.denominator_variable}
self.base_dir = base_dir
self.data_subdir = data_subdir
self.direction = direction
self.use_cache = use_cache
self.csv_name = csv_name
self.overwrite = overwrite
self.draw_figures = draw_figures
self.draw_figures = self._draw_figures_check(draw_figures)
self.bq_folder = bq_folder

def _draw_figures_check(self, v):
if not (v in ['yes','no'] ):
raise Exception("The 'draw_figures' parameter should be 'yes' or 'no'")
return(v)

def get_working_dir(self, folder):
folder_name = folder.replace('%', '')
return os.path.join(os.getcwd(), 'data', folder_name)
return os.path.join(self.base_dir, self.data_subdir, folder_name)

def create_dir(self, dir_path):
os.makedirs(dir_path, exist_ok=True)
if ( self.verbose == True ):
print( f"[INFO] Creating 'figures' directory in {dir_path}" )
sys.stdout.flush()
os.makedirs(os.path.join(dir_path, 'figures'), exist_ok=True)

def get_measure_list(self):
Expand All @@ -94,11 +122,12 @@ def get_measure_list(self):
csv_path=csv_path,
use_cache=self.use_cache
)

return measure_list['table_id']

def get_custom_measure_list(self):
return [entry.name.split('.',1)[0] for entry
in os.scandir('data/measure_sql/{name}'.format(name=self.name))
in os.scandir('{subdir}/measure_sql/{name}'.format(name=self.name,subdir=self.data_subdir))
if entry.name.endswith('.sql')]

def get_measure_query(self, measure_name):
Expand Down Expand Up @@ -147,13 +176,34 @@ def get_data(self):
use_cache=self.use_cache)
else:
get_data_dir = self.get_working_dir(self.name)

self.create_dir(get_data_dir)
query = self.get_custom_query()
csv_path = os.path.join(get_data_dir, self.csv_name)
bq.cached_read(query,
csv_path=csv_path,
use_cache=self.use_cache)

def amend_column_names(self,df):
for expected_name, actual_name in self.expected_columns.items():
if (expected_name != actual_name):
if (self.verbose):
print(f"[INFO] Replacing column '{actual_name}' with expected column '{expected_name}'")
df[expected_name] = df[actual_name]
return df

def check_column_names(self,df):
check_message = []

columns_missing = np.setdiff1d(
list(self.expected_columns.values()),
df.columns).tolist()

if ( len( columns_missing ) > 0 ):
check_message.append(f"[ERROR] Expected columns missing: {', '.join( columns_missing )}")

return check_message

def shape_dataframe(self):
'''
Returns data in a dataframe in the format needed for `r_detect()`
Expand All @@ -168,10 +218,40 @@ def shape_dataframe(self):
time.sleep(0.5)
#time.sleep(3)
input_df = pd.read_csv(csv_path)

#############################################################
### Checking input formatting ###
#############################################################

### If the user nas specified other column
### names via the [code/date/numerator/denominator}_variable
### arguments, then these are replaced with the expected
### column names in amend_column_names.
input_df = self.amend_column_names(input_df)

### If expected column names are still missing, an
### exception will be thrown.
column_check_message = self.check_column_names(input_df)
if ( len( column_check_message ) > 0 ):
raise NameError( '\n'.join(column_check_message) )

### Check the format of the date - reformat to requested
### date format.
try:
date_tmp = pd.to_datetime(input_df['month'],
format=self.date_format,
errors='raise')
input_df['month'] = pd.to_datetime( date_tmp, format="%Y-%m-%d" )
except ValueError as e:
raise ValueError( f"[ERROR] Field '{self.date_variable}' is not of the required format '{self.date_format}'" )

### Retain only those columns that we're expecting.
input_df = input_df[list(self.expected_columns.keys())]

input_df = input_df.sort_values(['code', 'month'])
input_df['ratio'] = input_df['numerator']/(input_df['denominator'])
## R script requires this header format:
input_df['code'] = 'ratio_quantity.' + input_df['code']
input_df['code'] = 'ratio_quantity.' + input_df['code'].apply(str)
input_df = input_df.set_index(['month', 'code'])

## drop small numbers
Expand Down Expand Up @@ -214,7 +294,7 @@ def shape_dataframe(self):
def run_r_script(self, i, script_name, input_name, output_name, *args):
'''
- have reduced outputs (a bit faster that way)
- for debugging purposes use `verbose` argument"
- for debugging purposes use `verbose` argument"
'''
## Define R command
command = 'Rscript'
Expand All @@ -227,6 +307,10 @@ def run_r_script(self, i, script_name, input_name, output_name, *args):
for arg in args:
arguments.append(arg)

if ( self.verbose ):
print( f"[INFO/R command] [{' '.join(cmd)} {' '.join(arguments)}]")
sys.stdout.flush()

## run the command
if i == 0:
if self.verbose:
Expand Down Expand Up @@ -257,7 +341,7 @@ def r_detect(self):

df = pd.DataFrame(item)
df.to_csv(os.path.join(self.working_dir, input_name))

process = self.run_r_script(i,
script_name,
input_name,
Expand Down Expand Up @@ -312,17 +396,41 @@ def run_if_needed(self, out_path):
self.concatenate_split_dfs()

def detect_change(self):
if self.measure:
for measure_name in self.measure_list:
folder_name = os.path.join(self.name, measure_name)
self.working_dir = self.get_working_dir(folder_name)

try:
if self.csv_name != 'bq_cache.csv':
self.working_dir = self.get_working_dir(self.name)
csv_path = os.path.join(self.working_dir, self.csv_name)
if not os.path.isfile(csv_path):
raise FileNotFoundError(f"[ERROR] File {csv_path} does not exist")

if self.measure:
for measure_name in self.measure_list:
folder_name = os.path.join(self.name, measure_name)
self.working_dir = self.get_working_dir(folder_name)
out_path = os.path.join(self.working_dir, 'r_output.csv')
self.run_if_needed(out_path)
else:
self.working_dir = self.get_working_dir(self.name)
out_path = os.path.join(self.working_dir, 'r_output.csv')
self.run_if_needed(out_path)
else:
self.working_dir = self.get_working_dir(self.name)
out_path = os.path.join(self.working_dir, 'r_output.csv')
self.run_if_needed(out_path)


except NameError as e:
print( e )
print(f" Columns of {self.csv_name} are not as expected")
print(f" Specify the column names using the [code/date/numerator/denominator]_variable parameter(s)")
sys.stdout.flush()

except ValueError as e:
print( e )
print(" Specify the date format using the date_format parameter (by default this is '%Y-%m-%d')")
sys.stdout.flush()

except FileNotFoundError as e:
print( e )
print(" Check the name and location of the input .csv file")
sys.stdout.flush()

def clear(self):
os.system( 'cls' )

Expand All @@ -335,8 +443,16 @@ def run(self):
if self.csv_name == 'bq_cache.csv':
p1 = Process(target = self.get_data)
p1.start()
else:
get_data_dir = self.get_working_dir(self.name)
self.create_dir(get_data_dir)
p2 = Process(target = self.detect_change)
p2.start()

if self.verbose == True:
print("\n[INFO] Process initiated: ", end='')
print( p2 )
sys.stdout.flush()

def concatenate_outputs(self, folder_suffix=''):
assert self.measure, "Not to be used on single outputs"
Expand Down
Loading