diff --git a/src/sentry/buffer/redis.py b/src/sentry/buffer/redis.py index 8349ba4aea9184..2dbd4f2f4a69a1 100644 --- a/src/sentry/buffer/redis.py +++ b/src/sentry/buffer/redis.py @@ -332,12 +332,12 @@ def get_redis_connection(self, key: str, transaction: bool = True) -> Pipeline: pipe = conn.pipeline(transaction=transaction) return pipe - def _execute_redis_operation( + def _execute_redis_operation_no_txn( self, key: str, operation: RedisOperation, *args: Any, **kwargs: Any ) -> Any: metrics_str = f"redis_buffer.{operation.value}" metrics.incr(metrics_str) - pipe = self.get_redis_connection(self.pending_key) + pipe = self.get_redis_connection(self.pending_key, transaction=False) getattr(pipe, operation.value)(key, *args, **kwargs) if args: pipe.expire(key, self.key_expire) @@ -356,7 +356,7 @@ def _execute_sharded_redis_operation( metrics_str = f"redis_buffer.{operation.value}" metrics.incr(metrics_str, amount=len(keys)) - pipe = self.get_redis_connection(self.pending_key) + pipe = self.get_redis_connection(self.pending_key, transaction=False) for key in keys: getattr(pipe, operation.value)(key, *args, **kwargs) if args: @@ -369,10 +369,10 @@ def push_to_sorted_set(self, key: str, value: list[int] | int) -> None: value_dict = {v: now for v in value} else: value_dict = {value: now} - self._execute_redis_operation(key, RedisOperation.SORTED_SET_ADD, value_dict) + self._execute_redis_operation_no_txn(key, RedisOperation.SORTED_SET_ADD, value_dict) def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, float]]: - redis_set = self._execute_redis_operation( + redis_set = self._execute_redis_operation_no_txn( key, RedisOperation.SORTED_SET_GET_RANGE, min=min, @@ -410,7 +410,9 @@ def bulk_get_sorted_set( return data_to_timestamps def delete_key(self, key: str, min: float, max: float) -> None: - self._execute_redis_operation(key, RedisOperation.SORTED_SET_DELETE_RANGE, min=min, max=max) + self._execute_redis_operation_no_txn( + key, RedisOperation.SORTED_SET_DELETE_RANGE, min=min, max=max + ) def delete_keys(self, keys: list[str], min: float, max: float) -> None: self._execute_sharded_redis_operation( @@ -427,7 +429,7 @@ def delete_hash( fields: list[str], ) -> None: key = self._make_key(model, filters) - pipe = self.get_redis_connection(self.pending_key) + pipe = self.get_redis_connection(self.pending_key, transaction=False) for field in fields: getattr(pipe, RedisOperation.HASH_DELETE.value)(key, field) pipe.expire(key, self.key_expire) @@ -441,7 +443,7 @@ def push_to_hash( value: str, ) -> None: key = self._make_key(model, filters) - self._execute_redis_operation(key, RedisOperation.HASH_ADD, field, value) + self._execute_redis_operation_no_txn(key, RedisOperation.HASH_ADD, field, value) def push_to_hash_bulk( self, @@ -450,11 +452,11 @@ def push_to_hash_bulk( data: dict[str, str], ) -> None: key = self._make_key(model, filters) - self._execute_redis_operation(key, RedisOperation.HASH_ADD_BULK, data) + self._execute_redis_operation_no_txn(key, RedisOperation.HASH_ADD_BULK, data) def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) -> dict[str, str]: key = self._make_key(model, field) - redis_hash = self._execute_redis_operation(key, RedisOperation.HASH_GET_ALL) + redis_hash = self._execute_redis_operation_no_txn(key, RedisOperation.HASH_GET_ALL) decoded_hash = {} for k, v in redis_hash.items(): if isinstance(k, bytes): @@ -467,7 +469,7 @@ def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) -> def get_hash_length(self, model: type[models.Model], field: dict[str, BufferField]) -> int: key = self._make_key(model, field) - return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH) + return self._execute_redis_operation_no_txn(key, RedisOperation.HASH_LENGTH) def incr( self,