From d56b2fb6f4c9f3d9022c2195d872cbbc87bf837f Mon Sep 17 00:00:00 2001 From: anthony sottile Date: Tue, 7 May 2024 14:20:49 -0400 Subject: [PATCH] ref: remove unused partition parameter from buffer --- src/sentry/buffer/base.py | 6 ++---- src/sentry/buffer/redis.py | 6 ++---- src/sentry/tasks/process_buffer.py | 25 ++++++++--------------- tests/sentry/tasks/test_process_buffer.py | 2 +- 4 files changed, 14 insertions(+), 25 deletions(-) diff --git a/src/sentry/buffer/base.py b/src/sentry/buffer/base.py index 2627298a89a4bd..e80e18f977981a 100644 --- a/src/sentry/buffer/base.py +++ b/src/sentry/buffer/base.py @@ -69,12 +69,10 @@ def incr( } ) - # TODO: `partition` is unused, remove after a deploy - - def process_pending(self, partition: int | None = None) -> None: + def process_pending(self) -> None: return - def process_batch(self, partition: int | None = None) -> None: + def process_batch(self) -> None: return def process( diff --git a/src/sentry/buffer/redis.py b/src/sentry/buffer/redis.py index 18405b2f54df3e..812c52d49fa3ef 100644 --- a/src/sentry/buffer/redis.py +++ b/src/sentry/buffer/redis.py @@ -303,7 +303,7 @@ def get_hash( return decoded_hash - def process_batch(self, partition: int | None = None) -> None: + def process_batch(self) -> None: client = get_cluster_routing_client(self.cluster, self.is_redis_cluster) lock_key = self._lock_key(client, self.pending_key, ex=10) if not lock_key: @@ -371,9 +371,7 @@ def incr( tags={"module": model.__module__, "model": model.__name__}, ) - # TODO: `partition` is unused, remove after a deploy - - def process_pending(self, partition: int | None = None) -> None: + def process_pending(self) -> None: client = get_cluster_routing_client(self.cluster, self.is_redis_cluster) lock_key = self._lock_key(client, self.pending_key, ex=60) if not lock_key: diff --git a/src/sentry/tasks/process_buffer.py b/src/sentry/tasks/process_buffer.py index 7a1def04b698f9..e4deedee5b5947 100644 --- a/src/sentry/tasks/process_buffer.py +++ b/src/sentry/tasks/process_buffer.py @@ -6,58 +6,51 @@ from sentry.tasks.base import instrumented_task from sentry.utils.locking import UnableToAcquireLock +from sentry.utils.locking.lock import Lock logger = logging.getLogger(__name__) -# TODO: `partition` is unused, remove after a deploy - - -def get_process_lock(lock_name: str, partition: str | None = None): +def get_process_lock(lock_name: str) -> Lock: from sentry.locks import locks - if partition is None: - lock_key = f"buffer:{lock_name}" - else: - lock_key = f"buffer:{lock_name}:{partition}" - - return locks.get(lock_key, duration=60, name=lock_name) + return locks.get(f"buffer:{lock_name}", duration=60, name=lock_name) @instrumented_task( name="sentry.tasks.process_buffer.process_pending", queue="buffers.process_pending" ) -def process_pending(partition=None): +def process_pending() -> None: """ Process pending buffers. """ from sentry import buffer - lock = get_process_lock("process_pending", partition) + lock = get_process_lock("process_pending") try: with lock.acquire(): buffer.process_pending() except UnableToAcquireLock as error: - logger.warning("process_pending.fail", extra={"error": error, "partition": partition}) + logger.warning("process_pending.fail", extra={"error": error}) @instrumented_task( name="sentry.tasks.process_buffer.process_pending_batch", queue="buffers.process_pending_batch" ) -def process_pending_batch(partition=None): +def process_pending_batch() -> None: """ Process pending buffers in a batch. """ from sentry import buffer - lock = get_process_lock("process_pending_batch", partition) + lock = get_process_lock("process_pending_batch") try: with lock.acquire(): buffer.process_batch() except UnableToAcquireLock as error: - logger.warning("process_pending_batch.fail", extra={"error": error, "partition": partition}) + logger.warning("process_pending_batch.fail", extra={"error": error}) @instrumented_task(name="sentry.tasks.process_buffer.process_incr", queue="counters-0") diff --git a/tests/sentry/tasks/test_process_buffer.py b/tests/sentry/tasks/test_process_buffer.py index 6688ba1d33d5a4..ff4035f4065897 100644 --- a/tests/sentry/tasks/test_process_buffer.py +++ b/tests/sentry/tasks/test_process_buffer.py @@ -42,7 +42,7 @@ def test_process_pending_batch(self, mock_process_pending_batch): @mock.patch("sentry.buffer.backend.process_batch") def test_process_pending_batch_locked_out(self, mock_process_pending_batch): with self.assertLogs("sentry.tasks.process_buffer", level="WARNING") as logger: - lock = get_process_lock("process_pending_batch", None) + lock = get_process_lock("process_pending_batch") with lock.acquire(): process_pending_batch() self.assertEqual(len(logger.output), 1)