Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/sentry/spans/consumers/process/strategy.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
from typing import Any, TypeVar
from typing import Any, Generic, TypeVar

from arroyo.processing.strategies.abstract import ProcessingStrategy
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.types import Commit, FilteredPayload, Message
from arroyo.types import Commit, Message

TPayload = TypeVar("TPayload")


class CommitSpanOffsets(CommitOffsets):
class CommitSpanOffsets(CommitOffsets, Generic[TPayload]):
"""
Inherits from CommitOffsets so we can add a next step. We'd like to commit offsets for
processed spans before carrying on the work to build segments and produce them since
the processing messages and producing segments are two distinct operations. Span messages
should be committed once they are processed and put into redis.
"""

def __init__(
self, commit: Commit, next_step: ProcessingStrategy[FilteredPayload | TPayload]
) -> None:
def __init__(self, commit: Commit, next_step: ProcessingStrategy[TPayload]) -> None:
super().__init__(commit=commit)
self.__next_step = next_step

Expand Down