Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion .circleci/workflows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,15 @@ jobs:
- *restore_venv_cache
- *build
- *attach_generated_sql
- &restore_schema_cache
run:
name: Restore schema cache from generate-sql job
command: |
# Restore stable table schema cache from workspace
if [ -d /tmp/workspace/schema-cache/bigquery_etl_schemas ]; then
cp -r /tmp/workspace/schema-cache/bigquery_etl_schemas /tmp/
echo "Restored schema cache from generate-sql job"
fi
- *authenticate
- &add_private_bigquery_etl_ssh_keys
add_ssh_keys:
Expand Down Expand Up @@ -619,6 +628,7 @@ jobs:
--ignore derived_view_schemas \
--output-dir /tmp/workspace/generated-sql/sql/ \
--target-project moz-fx-data-shared-prod
PATH="venv/bin:$PATH" script/bqetl format /tmp/workspace/generated-sql/sql/
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to run the formatting if we use the existing generated-sql branch since anything on there is already properly formatted. The formatting does take 2-3 minutes to run

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC a lot (most?) of that is due to qualifying table references. That could be another thing to optimize but it seems hard to do

else
echo "Changes made don't affect generated SQL. Use content from generated-sql"

Expand All @@ -627,18 +637,26 @@ jobs:
cp -a sql/. /tmp/workspace/generated-sql/sql
fi

PATH="venv/bin:$PATH" script/bqetl format /tmp/workspace/generated-sql/sql/
PATH="venv/bin:$PATH" script/bqetl dependency record \
--skip-existing \
"/tmp/workspace/generated-sql/sql/"
PATH="venv/bin:$PATH" script/bqetl metadata update \
--sql-dir /tmp/workspace/generated-sql/sql/ \
/tmp/workspace/generated-sql/sql/
PATH="venv/bin:$PATH" script/bqetl monitoring update /tmp/workspace/generated-sql/sql/
- run:
name: Copy schema cache for reuse in other jobs
command: |
# Copy stable table schema cache to workspace for reuse in generate-dags
mkdir -p /tmp/workspace/schema-cache
if [ -d /tmp/bigquery_etl_schemas ]; then
cp -r /tmp/bigquery_etl_schemas /tmp/workspace/schema-cache/
fi
- persist_to_workspace:
root: /tmp/workspace
paths:
- generated-sql
- schema-cache
- unless:
condition: *validate-sql-or-routines
steps:
Expand All @@ -655,6 +673,7 @@ jobs:
- *restore_venv_cache
- *build
- *attach_generated_sql
- *restore_schema_cache
- *copy_generated_sql
- add_ssh_keys:
fingerprints:
Expand Down
45 changes: 45 additions & 0 deletions bigquery_etl/query_scheduling/dag_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,31 @@
import yaml
from black import FileMode, format_file_contents

from bigquery_etl.dependency import extract_table_references_without_views
from bigquery_etl.query_scheduling.dag import Dag, InvalidDag, PublicDataJsonDag
from bigquery_etl.query_scheduling.utils import negate_timedelta_string
from bigquery_etl.schema.stable_table_schema import get_stable_table_schemas


def _precompute_task_references(task):
"""Pre-compute expensive table references parsing and return result."""
if (
task.is_python_script
or task.is_bigeye_check
or task.referenced_tables is not None
):
return (task.task_key, task.referenced_tables or [])

query_files = [Path(task.query_file)]
if task.multipart:
query_files = list(Path(task.query_file_path).glob("*.sql"))

table_names = {
tuple(table.split("."))
for query_file in query_files
for table in extract_table_references_without_views(query_file)
}
return (task.task_key, sorted(table_names))


class DagCollection:
Expand Down Expand Up @@ -192,10 +215,32 @@ def to_airflow_dags(self, output_dir, dag_to_generate=None):
except Exception:
pass

# Pre-load stable table schemas before spawning workers to avoid loading multiple times
# This downloads and caches schemas once in the main process
try:
get_stable_table_schemas()
except Exception:
# If schema loading fails, continue anyway (some tasks may not need them)
pass

# Pre-compute referenced tables for all tasks in parallel
# This is the expensive I/O-heavy part (SQL parsing via extract_table_references_without_views)
all_tasks = [task for dag in self.dags for task in dag.tasks]

with get_context("spawn").Pool(8) as p:
task_references = p.map(_precompute_task_references, all_tasks)

# Update tasks with precomputed references
task_map = {task.task_key: task for task in all_tasks}
for task_key, referenced_tables in task_references:
task_map[task_key].referenced_tables = referenced_tables

# Resolve dependencies sequentially
for dag in self.dags:
dag.with_upstream_dependencies(self)
dag.with_downstream_dependencies(self)

# Finally, parallelize DAG-to-Airflow conversion
to_airflow_dag = partial(self.dag_to_airflow, output_dir)
with get_context("spawn").Pool(8) as p:
p.map(to_airflow_dag, self.dags)