-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Labels
bugSomething isn't workingSomething isn't workingperformanceMake DataFusion fasterMake DataFusion faster
Description
Describe the bug
was doing some testing and notice that datafusion don't seems to be using all cores in my notebook runtime
To Reproduce
here is a simplified code
import re
import shutil
from urllib.request import urlopen
import os
import requests
import pyarrow.dataset as ds
from pyarrow import csv
from concurrent.futures import ThreadPoolExecutor
from shutil import unpack_archive
from deltalake.writer import write_deltalake
import glob
from psutil import *
from datafusion import SessionContext as session
# nbr of files to process 60 * (nbr_copies +1)
nbr_copies = 1
total_files = 60 * (nbr_copies + 1)
Source = "/lakehouse/default/Files/0_Source/ARCHIVE/Daily_Reports/"
Destination = "/lakehouse/default/Files/1_Transform/0/ARCHIVE/Daily_Reportsd/"
# Display system information
core = cpu_count()
vCPU = str(core) + " vCPU"
mem = round(virtual_memory().total / (1024 * 1024 * 1024), 0)
print(vCPU + ' Memory:' + str(mem))
def download(url, Path, total_files):
if not os.path.exists(Path):
os.makedirs(Path, exist_ok=True)
result = urlopen(url).read().decode('utf-8')
pattern = re.compile(r'[\w.]*.zip')
filelist1 = pattern.findall(result)
filelist_unique = dict.fromkeys(filelist1)
filelist = sorted(filelist_unique, reverse=True)
current = [os.path.basename(x) for x in glob.glob(Path + '*.zip')]
files_to_upload = list(set(filelist) - set(current))
files_to_upload = list(dict.fromkeys(files_to_upload))[:total_files]
print(str(len(files_to_upload)) + ' New File Loaded')
if len(files_to_upload) != 0:
for x in files_to_upload:
with requests.get(url + x, stream=True) as resp:
if resp.ok:
with open(f"{Path}{x}", "wb") as f:
for chunk in resp.iter_content(chunk_size=4096):
f.write(chunk)
return "done"
def uncompress(source_zip_path):
"""Unzips a single file to the global Destination directory."""
unpack_archive(str(source_zip_path), str(Destination), 'zip')
def unzip(Source, Destination, Nbr_Files_to_Download):
"""Unzips new files from Source to Destination using multiple threads."""
if not os.path.exists(Destination):
os.makedirs(Destination, exist_ok=True)
source_zips = glob.glob(Source + '*.zip')
existing_unzipped = {os.path.basename(f).replace('.CSV', '.zip') for f in glob.glob(Destination + '*.CSV')}
files_to_unzip = [zip_path for zip_path in source_zips if os.path.basename(zip_path) not in existing_unzipped]
print(f'{len(files_to_unzip)} New File(s) to uncompress')
if files_to_unzip:
with ThreadPoolExecutor(max_workers=core) as executor:
executor.map(uncompress, files_to_unzip)
return "done"
else:
return "nothing to see here"
def datafusion_clean_csv(files_to_upload_full_Path):
ctx = session()
colum_names = [
'I', 'UNIT', 'XX', 'VERSION', 'SETTLEMENTDATE', 'RUNNO', 'DUID', 'INTERVENTION',
'DISPATCHMODE', 'AGCSTATUS', 'INITIALMW', 'TOTALCLEARED', 'RAMPDOWNRATE', 'RAMPUPRATE',
'LOWER5MIN', 'LOWER60SEC', 'LOWER6SEC', 'RAISE5MIN', 'RAISE60SEC', 'RAISE6SEC',
'MARGINAL5MINVALUE', 'MARGINAL60SECVALUE', 'MARGINAL6SECVALUE', 'MARGINALVALUE',
'VIOLATION5MINDEGREE', 'VIOLATION60SECDEGREE', 'VIOLATION6SECDEGREE', 'VIOLATIONDEGREE',
'LOWERREG', 'RAISEREG', 'AVAILABILITY', 'RAISE6SECFLAGS', 'RAISE60SECFLAGS',
'RAISE5MINFLAGS', 'RAISEREGFLAGS', 'LOWER6SECFLAGS', 'LOWER60SECFLAGS', 'LOWER5MINFLAGS',
'LOWERREGFLAGS', 'RAISEREGAVAILABILITY', 'RAISEREGENABLEMENTMAX', 'RAISEREGENABLEMENTMIN',
'LOWERREGAVAILABILITY', 'LOWERREGENABLEMENTMAX', 'LOWERREGENABLEMENTMIN',
'RAISE6SECACTUALAVAILABILITY', 'RAISE60SECACTUALAVAILABILITY', 'RAISE5MINACTUALAVAILABILITY',
'RAISEREGACTUALAVAILABILITY', 'LOWER6SECACTUALAVAILABILITY', 'LOWER60SECACTUALAVAILABILITY',
'LOWER5MINACTUALAVAILABILITY', 'LOWERREGACTUALAVAILABILITY'
]
ReadOptions = csv.ReadOptions(column_names=colum_names, skip_rows=1)
ParseOptions = csv.ParseOptions(invalid_row_handler=lambda i: "skip")
ConvertOptions = csv.ConvertOptions(strings_can_be_null=True)
format = ds.CsvFileFormat(parse_options=ParseOptions, convert_options=ConvertOptions, read_options=ReadOptions)
raw = ds.dataset(files_to_upload_full_Path, format=format)
ctx.register_dataset("arrow_dataset", raw)
######################################
df = ctx.sql("""
select * EXCLUDE("I","XX","SETTLEMENTDATE"),
to_timestamp_seconds ("SETTLEMENTDATE",'%Y/%m/%d %H:%M:%S') as SETTLEMENTDATE,
cast(EXTRACT(YEAR FROM to_timestamp_seconds ("SETTLEMENTDATE",'%Y/%m/%d %H:%M:%S')) as integer) as year
from arrow_dataset where "I" ='D' and "UNIT"='DUNIT' and "VERSION" ='3'
""")
write_deltalake(f"/lakehouse/default/Tables/T{total_files}/datafusion", df, mode="overwrite", partition_by=['year'])
return "done"
def duplicate_files(Destination, nbr_copies):
"""Duplicates files in the specified directory."""
if not os.path.exists(Destination):
os.makedirs(Destination, exist_ok=True)
files = [f for f in os.listdir(Destination) if os.path.isfile(os.path.join(Destination, f))]
if len(files) > nbr_copies * 60:
print("all good, data exists already ")
return
for file in files:
file_path = os.path.join(Destination, file)
name, ext = os.path.splitext(file)
for i in range(1, nbr_copies + 1): # Create nbr_copies copies
new_file = os.path.join(Destination, f"{name}_copy{i}{ext}")
shutil.copy(file_path, new_file)
print(f"Successfully duplicated {len(files)} files {nbr_copies} times each in '{Destination}'.")
### Main execution starts here ###
download("https://nemweb.com.au/Reports/Current/Daily_Reports/", Source, 60)
unzip(Source, Destination, 60)
duplicate_files(Destination, nbr_copies)
list_files = [os.path.basename(x) for x in glob.glob(Destination + '*.CSV')]
files_to_upload_full_Path = [Destination + i for i in list_files][:total_files]
datafusion_clean_csv(files_to_upload_full_Path)
print("Data processing completed successfully.")
Expected behavior
No response
Additional context
No response
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingperformanceMake DataFusion fasterMake DataFusion faster