Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
956b9b3
Add visitor to replace project-filter by project-id-filter in query
shellmayr Apr 10, 2024
2fbf999
Add visitor to replace groupby:project by groupby:project_id
shellmayr Apr 10, 2024
ab65551
wip: generalize
shellmayr Apr 11, 2024
d53d659
wip: implement first running version of modulator-visitor based on fl…
shellmayr Apr 12, 2024
0518514
add query post-processing steps
shellmayr Apr 16, 2024
08caa89
wip: fix bugs & restructure
shellmayr Apr 16, 2024
5034dbb
wip: everything works before cleanup
shellmayr Apr 16, 2024
164836f
wip: cleanup
shellmayr Apr 16, 2024
a8e14a1
wip: remove ModulationMetadata - not needed
shellmayr Apr 16, 2024
7b830af
wip: move things around
shellmayr Apr 16, 2024
bc5a402
wip: inject projects directly into the visitor that needs it instead …
shellmayr Apr 16, 2024
2f9f9c8
wip: move groupby modulation into function for ModulatorVisitor
shellmayr Apr 16, 2024
a8d757f
add tests & refactor
shellmayr Apr 16, 2024
6a1e9e1
wip: fix mypy errors
shellmayr Apr 16, 2024
0a8142d
add more tests & refactor
shellmayr Apr 17, 2024
a414b9a
remove comment
shellmayr Apr 17, 2024
008d7c6
refactor into optimized object structure to reduce shared state
shellmayr Apr 17, 2024
13e6e5c
refactor naming
shellmayr Apr 18, 2024
941dafd
remove find_modulator
shellmayr Apr 18, 2024
2a20d60
rename QueryDemodulationStep
shellmayr Apr 18, 2024
c70c225
wip: work review comments into PR
shellmayr Apr 18, 2024
4bf4cc7
wip: add test for groupby:project and filter:project_id
shellmayr Apr 18, 2024
7f3e47f
last adaptations to pacify mypy
shellmayr Apr 18, 2024
0146d86
Merge branch 'master' into shellmayr/feat/project-to-project-id-draft-2
shellmayr Apr 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/sentry/sentry_metrics/querying/data/api.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
from collections.abc import Sequence
from datetime import datetime
from typing import cast

from snuba_sdk import MetricsQuery, MetricsScope, Rollup

from sentry import features
from sentry.models.environment import Environment
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.sentry_metrics.querying.data.execution import QueryExecutor, QueryResult
from sentry.sentry_metrics.querying.data.execution import QueryExecutor
from sentry.sentry_metrics.querying.data.mapping.mapper import MapperConfig, Project2ProjectIDMapper
from sentry.sentry_metrics.querying.data.parsing import QueryParser
from sentry.sentry_metrics.querying.data.postprocessing.base import run_post_processing_steps
from sentry.sentry_metrics.querying.data.postprocessing.remapping import QueryRemappingStep
from sentry.sentry_metrics.querying.data.preparation.base import (
IntermediateQuery,
PreparationStep,
run_preparation_steps,
)
from sentry.sentry_metrics.querying.data.preparation.mapping import QueryMappingStep
from sentry.sentry_metrics.querying.data.preparation.units_normalization import (
UnitsNormalizationStep,
)
from sentry.sentry_metrics.querying.data.query import MQLQueriesResult, MQLQuery
from sentry.sentry_metrics.querying.types import QueryType

DEFAULT_MAPPINGS: MapperConfig = MapperConfig().add(Project2ProjectIDMapper)


def run_queries(
mql_queries: Sequence[MQLQuery],
Expand Down Expand Up @@ -62,12 +68,15 @@ def run_queries(
)
)

preparation_steps = []
preparation_steps: list[PreparationStep] = []

if features.has(
"organizations:ddm-metrics-api-unit-normalization", organization=organization, actor=None
):
preparation_steps.append(UnitsNormalizationStep())

preparation_steps.append(QueryMappingStep(projects, DEFAULT_MAPPINGS))

# We run a series of preparation steps which operate on the entire list of queries.
intermediate_queries = run_preparation_steps(intermediate_queries, *preparation_steps)

Expand All @@ -77,6 +86,7 @@ def run_queries(
executor.schedule(intermediate_query=intermediate_query, query_type=query_type)

results = executor.execute()
results = run_post_processing_steps(results, QueryRemappingStep(projects))

# We wrap the result in a class that exposes some utils methods to operate on results.
return MQLQueriesResult(cast(list[QueryResult], results))
return MQLQueriesResult(results)
29 changes: 24 additions & 5 deletions src/sentry/sentry_metrics/querying/data/execution.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, replace
from dataclasses import dataclass, field, replace
from datetime import datetime
from enum import Enum
from typing import Any, Union, cast
Expand All @@ -11,6 +11,7 @@
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.sentry_metrics.querying.constants import SNUBA_QUERY_LIMIT
from sentry.sentry_metrics.querying.data.mapping.mapper import Mapper
from sentry.sentry_metrics.querying.data.preparation.base import IntermediateQuery
from sentry.sentry_metrics.querying.data.utils import adjust_time_bounds_with_interval
from sentry.sentry_metrics.querying.errors import (
Expand Down Expand Up @@ -145,6 +146,7 @@ class ScheduledQuery:
unit_family: UnitFamily | None = None
unit: MeasurementUnit | None = None
scaling_factor: float | None = None
mappers: list[Mapper] = field(default_factory=list)

def initialize(
self,
Expand Down Expand Up @@ -318,7 +320,7 @@ def _align_date_range(cls, metrics_query: MetricsQuery) -> tuple[MetricsQuery, i
return metrics_query, None


@dataclass(frozen=True)
@dataclass
class QueryResult:
"""
Represents the result of a ScheduledQuery containing its associated series and totals results.
Expand Down Expand Up @@ -445,12 +447,24 @@ def modified_end(self) -> datetime:

@property
def series(self) -> Sequence[Mapping[str, Any]]:
if "series" not in self.result:
return []
return self.result["series"]["data"]

@series.setter
def series(self, value: Sequence[Mapping[str, Any]]) -> None:
self.result["series"]["data"] = value

@property
def totals(self) -> Sequence[Mapping[str, Any]]:
if "totals" not in self.result:
return []
return self.result["totals"]["data"]

@totals.setter
def totals(self, value: Sequence[Mapping[str, Any]]) -> None:
self.result["totals"]["data"] = value

@property
def meta(self) -> Sequence[Mapping[str, str]]:
# By default, we extract the metadata from the totals query, if that is not there we extract from the series
Expand All @@ -464,7 +478,11 @@ def group_bys(self) -> list[str]:
# that we can correctly render groups in case they are not returned from the db because of missing data.
#
# Sorting of the groups is done to maintain consistency across function calls.
return sorted(UsedGroupBysVisitor().visit(self._any_query().metrics_query.query))
scheduled_query = self._any_query()
mappers = [mapper for mapper in scheduled_query.mappers if mapper.applied_on_groupby]
return sorted(
UsedGroupBysVisitor(mappers=mappers).visit(scheduled_query.metrics_query.query)
)

@property
def interval(self) -> int | None:
Expand Down Expand Up @@ -774,7 +792,7 @@ def _execution_loop(self):
while continue_execution:
continue_execution = self._bulk_execute()

def execute(self) -> Sequence[QueryResult]:
def execute(self) -> list[QueryResult]:
"""
Executes the scheduled queries in the execution loop.

Expand All @@ -798,7 +816,7 @@ def execute(self) -> Sequence[QueryResult]:
"Not all queries were executed in the execution loop"
)

return cast(Sequence[QueryResult], self._query_results)
return cast(list[QueryResult], self._query_results)

def schedule(self, intermediate_query: IntermediateQuery, query_type: QueryType):
"""
Expand All @@ -813,6 +831,7 @@ def schedule(self, intermediate_query: IntermediateQuery, query_type: QueryType)
unit_family=intermediate_query.unit_family,
unit=intermediate_query.unit,
scaling_factor=intermediate_query.scaling_factor,
mappers=intermediate_query.mappers,
)

# In case the user chooses to run also a series query, we will duplicate the query and chain it after totals.
Expand Down
Empty file.
94 changes: 94 additions & 0 deletions src/sentry/sentry_metrics/querying/data/mapping/mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import abc
from collections.abc import Sequence
from typing import Any, TypeVar

from sentry.models.project import Project


class Mapper(abc.ABC):
from_key: str = ""
to_key: str = ""
applied_on_groupby: bool = False

def __init__(self):
# This exists to satisfy mypy, which complains otherwise
self.map: dict[Any, Any] = {}

def __hash__(self):
return hash((self.from_key, self.to_key))

@abc.abstractmethod
def forward(self, projects: Sequence[Project], value: Any) -> Any:
return value

@abc.abstractmethod
def backward(self, projects: Sequence[Project], value: Any) -> Any:
return value


TMapper = TypeVar("TMapper", bound=Mapper)


class MapperConfig:
def __init__(self):
self.mappers: set[type[Mapper]] = set()

def add(self, mapper: type[Mapper]) -> "MapperConfig":
self.mappers.add(mapper)
return self

def get(self, from_key: str | None = None, to_key: str | None = None) -> type[Mapper] | None:
for mapper in self.mappers:
if mapper.from_key == from_key:
return mapper
if mapper.to_key == to_key:
return mapper
return None


def get_or_create_mapper(
mapper_config: MapperConfig,
mappers: list[Mapper],
from_key: str | None = None,
to_key: str | None = None,
) -> Mapper | None:
# retrieve the mapper type that is applicable for the given key
mapper_class = mapper_config.get(from_key=from_key, to_key=to_key)
# check if a mapper of the type already exists
if mapper_class:
for mapper in mappers:
if mapper_class == type(mapper):
# if a mapper already exists, return the existing mapper
return mapper
else:
# if no mapper exists yet, instantiate the object and append it to the mappers list
mapper_instance = mapper_class()
mappers.append(mapper_instance)
return mapper_instance
else:
# if no mapper is configured for the key, return None
return None


class Project2ProjectIDMapper(Mapper):
from_key: str = "project"
to_key: str = "project_id"

def __init__(self):
super().__init__()

def forward(self, projects: Sequence[Project], value: str) -> int:
if value not in self.map:
self.map[value] = None
for project in projects:
if project.slug == value:
self.map[value] = project.id
return self.map[value]

def backward(self, projects: Sequence[Project], value: int) -> str:
if value not in self.map:
for project in projects:
if project.id == value:
self.map[value] = project.slug

return self.map[value]
Empty file.
37 changes: 37 additions & 0 deletions src/sentry/sentry_metrics/querying/data/postprocessing/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from abc import ABC, abstractmethod

from sentry.sentry_metrics.querying.data.execution import QueryResult


class PostProcessingStep(ABC):
"""
Represents an abstract step that post-processes a collection of QueryResult objects.

The post-processing of these objects might include transforming them or just obtaining some intermediate data that
is useful to compute other things before returning the results.
"""

@abstractmethod
def run(self, query_results: list[QueryResult]) -> list[QueryResult]:
"""
Runs the post-processing steps on a list of query results.

Returns:
A list of post-processed query results.
"""
raise NotImplementedError


def run_post_processing_steps(query_results: list[QueryResult], *steps) -> list[QueryResult]:
"""
Takes a series of query results and steps and runs the post-processing steps one after each other in order they are
supplied in.

Returns:
A list of query results after running the post-processing steps.
"""
for step in steps:
if isinstance(step, PostProcessingStep):
query_results = step.run(query_results=query_results)

return query_results
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from collections.abc import Mapping, Sequence
from copy import deepcopy
from typing import Any, cast

from sentry.models.project import Project
from sentry.sentry_metrics.querying.data.execution import QueryResult
from sentry.sentry_metrics.querying.data.mapping.mapper import Mapper
from sentry.sentry_metrics.querying.data.postprocessing.base import PostProcessingStep


class QueryRemappingStep(PostProcessingStep):
def __init__(self, projects: Sequence[Project]):
self.projects = projects

def run(self, query_results: list[QueryResult]) -> list[QueryResult]:
for query_result in query_results:
if (
query_result.totals is not None
and query_result.totals_query is not None
and len(query_result.totals) > 0
):
query_result.totals = self._unmap_data(
query_result.totals, query_result.totals_query.mappers
)
if (
query_result.series is not None
and query_result.series_query is not None
and len(query_result.series) > 0
):
query_result.series = self._unmap_data(
query_result.series, query_result.series_query.mappers
)

return query_results

def _unmap_data(
self, data: Sequence[Mapping[str, Any]], mappers: list[Mapper]
) -> Sequence[Mapping[str, Any]]:
unmapped_data: list[dict[str, Any]] = cast(list[dict[str, Any]], deepcopy(data))
for element in unmapped_data:
updated_element = dict()
keys_to_delete = []
for result_key in element.keys():
for mapper in mappers:
if mapper.to_key == result_key and mapper.applied_on_groupby:
original_value = mapper.backward(self.projects, element[result_key])
updated_element[mapper.from_key] = original_value
keys_to_delete.append(result_key)

for key in keys_to_delete:
del element[key]
element.update(updated_element)

return cast(Sequence[Mapping[str, Any]], unmapped_data)
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from dataclasses import dataclass, field

from snuba_sdk import MetricsQuery

from sentry.sentry_metrics.querying.data.mapping.mapper import Mapper
from sentry.sentry_metrics.querying.types import QueryOrder
from sentry.sentry_metrics.querying.units import MeasurementUnit, UnitFamily

Expand All @@ -27,6 +28,7 @@ class IntermediateQuery:
unit_family: UnitFamily | None = None
unit: MeasurementUnit | None = None
scaling_factor: float | None = None
mappers: list[Mapper] = field(default_factory=list)


class PreparationStep(ABC):
Expand Down
35 changes: 35 additions & 0 deletions src/sentry/sentry_metrics/querying/data/preparation/mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from collections.abc import Sequence
from dataclasses import replace

from sentry.models.project import Project
from sentry.sentry_metrics.querying.data.mapping.mapper import MapperConfig
from sentry.sentry_metrics.querying.data.preparation.base import IntermediateQuery, PreparationStep
from sentry.sentry_metrics.querying.visitors.query_expression import MapperVisitor


class QueryMappingStep(PreparationStep):
def __init__(self, projects: Sequence[Project], mapper_config: MapperConfig):
self.projects = projects
self.mapper_config = mapper_config

def _get_mapped_intermediate_query(
self, intermediate_query: IntermediateQuery
) -> IntermediateQuery:
visitor = MapperVisitor(self.projects, self.mapper_config)
mapped_query = visitor.visit(intermediate_query.metrics_query.query)

return replace(
intermediate_query,
metrics_query=intermediate_query.metrics_query.set_query(mapped_query),
mappers=visitor.mappers,
)

def run(self, intermediate_queries: list[IntermediateQuery]) -> list[IntermediateQuery]:
mapped_intermediate_queries = []

for intermediate_query in intermediate_queries:
mapped_intermediate_queries.append(
self._get_mapped_intermediate_query(intermediate_query)
)

return mapped_intermediate_queries
Loading