Skip to content

datafusion seems to be single threaded regardless of the number of cores #16833

@djouallah

Description

@djouallah

Describe the bug

was doing some testing and notice that datafusion don't seems to be using all cores in my notebook runtime

To Reproduce

here is a simplified code

import re
import shutil
from   urllib.request import urlopen
import os
import requests
import pyarrow.dataset as ds
from   pyarrow import csv
from   concurrent.futures import ThreadPoolExecutor
from   shutil import unpack_archive
from   deltalake.writer import write_deltalake
import glob
from   psutil import *
from   datafusion import SessionContext as session

# nbr of files to process 60 * (nbr_copies +1)
nbr_copies = 1
total_files = 60 * (nbr_copies + 1)
Source = "/lakehouse/default/Files/0_Source/ARCHIVE/Daily_Reports/"
Destination = "/lakehouse/default/Files/1_Transform/0/ARCHIVE/Daily_Reportsd/"

# Display system information
core = cpu_count()
vCPU = str(core) + " vCPU"
mem = round(virtual_memory().total / (1024 * 1024 * 1024), 0)
print(vCPU + ' Memory:' + str(mem))

def download(url, Path, total_files):
    if not os.path.exists(Path):
        os.makedirs(Path, exist_ok=True)
    result = urlopen(url).read().decode('utf-8')
    pattern = re.compile(r'[\w.]*.zip')
    filelist1 = pattern.findall(result)
    filelist_unique = dict.fromkeys(filelist1)
    filelist = sorted(filelist_unique, reverse=True)
    current = [os.path.basename(x) for x in glob.glob(Path + '*.zip')]
    files_to_upload = list(set(filelist) - set(current))
    files_to_upload = list(dict.fromkeys(files_to_upload))[:total_files]
    print(str(len(files_to_upload)) + ' New File Loaded')
    if len(files_to_upload) != 0:
        for x in files_to_upload:
            with requests.get(url + x, stream=True) as resp:
                if resp.ok:
                    with open(f"{Path}{x}", "wb") as f:
                        for chunk in resp.iter_content(chunk_size=4096):
                            f.write(chunk)
    return "done"

def uncompress(source_zip_path):
    """Unzips a single file to the global Destination directory."""
    unpack_archive(str(source_zip_path), str(Destination), 'zip')

def unzip(Source, Destination, Nbr_Files_to_Download):
    """Unzips new files from Source to Destination using multiple threads."""
    if not os.path.exists(Destination):
        os.makedirs(Destination, exist_ok=True)
    
    source_zips = glob.glob(Source + '*.zip')
    existing_unzipped = {os.path.basename(f).replace('.CSV', '.zip') for f in glob.glob(Destination + '*.CSV')}
    files_to_unzip = [zip_path for zip_path in source_zips if os.path.basename(zip_path) not in existing_unzipped]
    
    print(f'{len(files_to_unzip)} New File(s) to uncompress')
    
    if files_to_unzip:
        with ThreadPoolExecutor(max_workers=core) as executor:
            executor.map(uncompress, files_to_unzip)
        return "done"
    else:
        return "nothing to see here"


def datafusion_clean_csv(files_to_upload_full_Path):
    ctx = session()
    colum_names = [
        'I', 'UNIT', 'XX', 'VERSION', 'SETTLEMENTDATE', 'RUNNO', 'DUID', 'INTERVENTION',
        'DISPATCHMODE', 'AGCSTATUS', 'INITIALMW', 'TOTALCLEARED', 'RAMPDOWNRATE', 'RAMPUPRATE',
        'LOWER5MIN', 'LOWER60SEC', 'LOWER6SEC', 'RAISE5MIN', 'RAISE60SEC', 'RAISE6SEC',
        'MARGINAL5MINVALUE', 'MARGINAL60SECVALUE', 'MARGINAL6SECVALUE', 'MARGINALVALUE',
        'VIOLATION5MINDEGREE', 'VIOLATION60SECDEGREE', 'VIOLATION6SECDEGREE', 'VIOLATIONDEGREE',
        'LOWERREG', 'RAISEREG', 'AVAILABILITY', 'RAISE6SECFLAGS', 'RAISE60SECFLAGS',
        'RAISE5MINFLAGS', 'RAISEREGFLAGS', 'LOWER6SECFLAGS', 'LOWER60SECFLAGS', 'LOWER5MINFLAGS',
        'LOWERREGFLAGS', 'RAISEREGAVAILABILITY', 'RAISEREGENABLEMENTMAX', 'RAISEREGENABLEMENTMIN',
        'LOWERREGAVAILABILITY', 'LOWERREGENABLEMENTMAX', 'LOWERREGENABLEMENTMIN',
        'RAISE6SECACTUALAVAILABILITY', 'RAISE60SECACTUALAVAILABILITY', 'RAISE5MINACTUALAVAILABILITY',
        'RAISEREGACTUALAVAILABILITY', 'LOWER6SECACTUALAVAILABILITY', 'LOWER60SECACTUALAVAILABILITY',
        'LOWER5MINACTUALAVAILABILITY', 'LOWERREGACTUALAVAILABILITY'
    ]

    ReadOptions = csv.ReadOptions(column_names=colum_names, skip_rows=1)
    ParseOptions = csv.ParseOptions(invalid_row_handler=lambda i: "skip")
    ConvertOptions = csv.ConvertOptions(strings_can_be_null=True)
    format = ds.CsvFileFormat(parse_options=ParseOptions, convert_options=ConvertOptions, read_options=ReadOptions)
    raw = ds.dataset(files_to_upload_full_Path, format=format)
    ctx.register_dataset("arrow_dataset", raw)
    ######################################
    df = ctx.sql("""
                    select * EXCLUDE("I","XX","SETTLEMENTDATE"),
                    to_timestamp_seconds ("SETTLEMENTDATE",'%Y/%m/%d %H:%M:%S') as SETTLEMENTDATE,
                    cast(EXTRACT(YEAR FROM to_timestamp_seconds ("SETTLEMENTDATE",'%Y/%m/%d %H:%M:%S')) as integer) as year
                    from arrow_dataset where "I" ='D' and "UNIT"='DUNIT' and "VERSION" ='3'
                """)
    write_deltalake(f"/lakehouse/default/Tables/T{total_files}/datafusion", df, mode="overwrite", partition_by=['year'])
    return "done"
def duplicate_files(Destination, nbr_copies):
    """Duplicates files in the specified directory."""
    if not os.path.exists(Destination):
        os.makedirs(Destination, exist_ok=True)
    
    files = [f for f in os.listdir(Destination) if os.path.isfile(os.path.join(Destination, f))]
    if len(files) > nbr_copies * 60:
        print("all good, data exists already ")
        return
    
    for file in files:
        file_path = os.path.join(Destination, file)
        name, ext = os.path.splitext(file)

        for i in range(1, nbr_copies + 1):  # Create nbr_copies copies
            new_file = os.path.join(Destination, f"{name}_copy{i}{ext}")
            shutil.copy(file_path, new_file)
    print(f"Successfully duplicated {len(files)} files {nbr_copies} times each in '{Destination}'.")

### Main execution starts here ###
download("https://nemweb.com.au/Reports/Current/Daily_Reports/", Source, 60)
unzip(Source, Destination, 60)
duplicate_files(Destination, nbr_copies)
list_files = [os.path.basename(x) for x in glob.glob(Destination + '*.CSV')]
files_to_upload_full_Path = [Destination + i for i in list_files][:total_files]
datafusion_clean_csv(files_to_upload_full_Path)
print("Data processing completed successfully.")

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingperformanceMake DataFusion faster

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions