diff --git a/Makefile b/Makefile index 05a9398..1ac7503 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ install_deps: run_test: @echo "Running \033[0;32mtest suite\033[0m "; \ AWS_DEFAULT_REGION='us-east-1' nosetests --stop --with-coverage --cover-package=$(PACKAGE) \ - --cover-branches --cover-erase --verbosity=2; \ + --cover-branches --cover-erase --verbosity=2; pycodestyle; \ test: prepare @make run_test diff --git a/development.txt b/development.txt index d0167fb..6d4f866 100644 --- a/development.txt +++ b/development.txt @@ -5,3 +5,4 @@ nose==1.3.0 pre-commit==0.7.6 sure==1.2.2 functools32;python_version=='2.7' +pycodestyle==2.4.0 diff --git a/example/api/example.py b/example/api/example.py index 499642e..07b0170 100644 --- a/example/api/example.py +++ b/example/api/example.py @@ -6,6 +6,7 @@ blueprint = Blueprint('api', __name__) + @blueprint.route('/example', methods=['POST']) def example(): queues = current_app.config.get('QUEUES') @@ -18,16 +19,22 @@ def example(): message_data = json.loads(data) except (TypeError, AttributeError, ValueError) as e: status = 500 - return construct_response('Your payload does not appear to be valid json!', data, status) + return construct_response( + 'Your payload does not appear to be valid json!', data, status) try: tasks.process.delay(message=message_data) - response_message = f'Successfully submitted message to queue {queue_name}' + response_message = ( + f'Successfully submitted message to queue {queue_name}' + ) status = 200 except Exception as e: - response_message = f'Something went wrong submitting message to queue {queue_name}! {e}' + response_message = ( + f'Something went wrong submitting message ' + 'to queue {queue_name}! {e}' + ) status = 500 - + return construct_response(response_message, message_data, status) @@ -35,6 +42,7 @@ def example(): def health(): return jsonify('OK'), 200 + @blueprint.route('/', methods=['GET']) def hello(): return 'Hello!' diff --git a/example/api/helpers.py b/example/api/helpers.py index 55d7559..fbeb9c5 100644 --- a/example/api/helpers.py +++ b/example/api/helpers.py @@ -2,11 +2,15 @@ import logging from flask import jsonify + def construct_response(message, payload, status): body = {} if status == 500: - body['message'] = 'Something went wrong constructing response. Is your payload valid JSON?' + body['message'] = ( + 'Something went wrong constructing response. ' + 'Is your payload valid JSON?' + ) body['request_payload'] = str(payload) else: body['message'] = message diff --git a/example/api/tasks.py b/example/api/tasks.py index 667e960..84be09f 100644 --- a/example/api/tasks.py +++ b/example/api/tasks.py @@ -2,8 +2,12 @@ from flask import Flask, current_app from pyqs import task + # This task listens to 'queue-example' SQS queues for messages with # {"task": "process"} @task(queue='queue-example') def process(message): - logging.info(f'PyQS task process() is processing message {message}. Clap your hands!') + logging.info( + f'PyQS task process() is processing message {message}. ' + 'Clap your hands!' + ) diff --git a/example/app.py b/example/app.py index 26bd3ad..8b49f52 100644 --- a/example/app.py +++ b/example/app.py @@ -6,12 +6,14 @@ config_name = os.environ.get('ENV', 'development') current_config = config[config_name] + def create_app(): app = Flask(__name__) app.config.from_object(current_config) return app + app = create_app() app.register_blueprint(example_blueprint) diff --git a/example/config/__init__.py b/example/config/__init__.py index 9a2d550..60b1e42 100644 --- a/example/config/__init__.py +++ b/example/config/__init__.py @@ -2,5 +2,5 @@ config = { 'development': DevelopmentConfig - #'production': ProductionConfig + # 'production': ProductionConfig } diff --git a/example/config/common.py b/example/config/common.py index aa9c3a0..c6db61c 100644 --- a/example/config/common.py +++ b/example/config/common.py @@ -1,6 +1,7 @@ import os import logging + class Config(object): AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID') diff --git a/example/config/development.py b/example/config/development.py index 5895b3b..98b6246 100644 --- a/example/config/development.py +++ b/example/config/development.py @@ -1,5 +1,6 @@ from .common import Config + class DevelopmentConfig(Config): DEBUG = True diff --git a/pyqs/decorator.py b/pyqs/decorator.py index 93f3d81..9153b8c 100644 --- a/pyqs/decorator.py +++ b/pyqs/decorator.py @@ -18,13 +18,15 @@ def get_or_create_queue(queue_name): try: return sqs.get_queue_by_name(QueueName=queue_name) except ClientError as exc: - if exc.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': + non_existent_code = 'AWS.SimpleQueueService.NonExistentQueue' + if exc.response['Error']['Code'] == non_existent_code: return sqs.create_queue(QueueName=queue_name) else: raise -def task_delayer(func_to_delay, queue_name, delay_seconds=None, override=False): +def task_delayer(func_to_delay, queue_name, delay_seconds=None, + override=False): function_path = function_to_import_path(func_to_delay, override=override) if not queue_name: @@ -55,7 +57,8 @@ def wrapper(*args, **kwargs): class task(object): - def __init__(self, queue=None, delay_seconds=None, custom_function_path=None): + def __init__(self, queue=None, delay_seconds=None, + custom_function_path=None): self.queue_name = queue self.delay_seconds = delay_seconds self.function_path = custom_function_path @@ -67,5 +70,6 @@ def __call__(self, *args, **kwargs): if self.function_path: override = True function = self.function_path - func_to_wrap.delay = task_delayer(function, self.queue_name, self.delay_seconds, override=override) + func_to_wrap.delay = task_delayer( + function, self.queue_name, self.delay_seconds, override=override) return func_to_wrap diff --git a/pyqs/main.py b/pyqs/main.py index e8f533f..8c2dd39 100644 --- a/pyqs/main.py +++ b/pyqs/main.py @@ -41,7 +41,10 @@ def main(): dest="logging_level", type=str, default="WARN", - help='Set logging level. This must be one of the python default logging levels', + help=( + 'Set logging level. ' + 'This must be one of the python default logging levels' + ), action="store", ) @@ -95,7 +98,10 @@ def main(): dest="prefetch_multiplier", type=int, default=2, - help='Multiplier on the size of the internal queue for prefetching SQS messages.', + help=( + 'Multiplier on the size of the internal queue ' + 'for prefetching SQS messages.' + ), action="store", ) @@ -114,9 +120,18 @@ def main(): ) -def _main(queue_prefixes, concurrency=5, logging_level="WARN", region='us-east-1', access_key_id=None, secret_access_key=None, interval=1, batchsize=10, prefetch_multiplier=2): - logging.basicConfig(format="[%(levelname)s]: %(message)s", level=getattr(logging, logging_level)) +def _main(queue_prefixes, concurrency=5, logging_level="WARN", + region='us-east-1', access_key_id=None, secret_access_key=None, + interval=1, batchsize=10, prefetch_multiplier=2): + logging.basicConfig( + format="[%(levelname)s]: %(message)s", + level=getattr(logging, logging_level), + ) logger.info("Starting PyQS version {}".format(__version__)) - manager = ManagerWorker(queue_prefixes, concurrency, interval, batchsize, prefetch_multiplier=prefetch_multiplier, region=region, access_key_id=access_key_id, secret_access_key=secret_access_key) + manager = ManagerWorker( + queue_prefixes, concurrency, interval, batchsize, + prefetch_multiplier=prefetch_multiplier, region=region, + access_key_id=access_key_id, secret_access_key=secret_access_key, + ) manager.start() manager.sleep() diff --git a/pyqs/worker.py b/pyqs/worker.py index d33e539..8d90d34 100644 --- a/pyqs/worker.py +++ b/pyqs/worker.py @@ -26,7 +26,11 @@ def get_conn(region='us-east-1', access_key_id=None, secret_access_key=None): - return boto3.client("sqs", aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key, region_name=region) + return boto3.client( + "sqs", + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, region_name=region, + ) class BaseWorker(Process): @@ -35,19 +39,24 @@ def __init__(self, *args, **kwargs): self.should_exit = Event() def shutdown(self): - logger.info("Received shutdown signal, shutting down PID {}!".format(os.getpid())) + logger.info( + "Received shutdown signal, shutting down PID {}!".format( + os.getpid())) self.should_exit.set() def parent_is_alive(self): if os.getppid() == 1: - logger.info("Parent process has gone away, exiting process {}!".format(os.getpid())) + logger.info( + "Parent process has gone away, exiting process {}!".format( + os.getpid())) return False return True class ReadWorker(BaseWorker): - def __init__(self, queue_url, internal_queue, batchsize, connection_args=None, *args, **kwargs): + def __init__(self, queue_url, internal_queue, batchsize, + connection_args=None, *args, **kwargs): super(ReadWorker, self).__init__(*args, **kwargs) if connection_args is None: connection_args = {} @@ -55,7 +64,8 @@ def __init__(self, queue_url, internal_queue, batchsize, connection_args=None, * self.conn = get_conn(**self.connection_args) self.queue_url = queue_url - sqs_queue = self.conn.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] + sqs_queue = self.conn.get_queue_attributes( + QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] self.visibility_timeout = int(sqs_queue['VisibilityTimeout']) self.internal_queue = internal_queue @@ -65,7 +75,9 @@ def run(self): # Set the child process to not receive any keyboard interrupts signal.signal(signal.SIGINT, signal.SIG_IGN) - logger.info("Running ReadWorker: {}, pid: {}".format(self.queue_url, os.getpid())) + logger.info( + "Running ReadWorker: {}, pid: {}".format( + self.queue_url, os.getpid())) while not self.should_exit.is_set() and self.parent_is_alive(): self.read_message() self.internal_queue.close() @@ -78,15 +90,21 @@ def read_message(self): WaitTimeSeconds=LONG_POLLING_INTERVAL, ).get('Messages', []) - logger.debug("Successfully got {} messages from SQS queue {}".format(len(messages), self.queue_url)) # noqa + logger.debug( + "Successfully got {} messages from SQS queue {}".format( + len(messages), self.queue_url)) # noqa start = time.time() for message in messages: end = time.time() if int(end - start) >= self.visibility_timeout: - # Don't add any more messages since they have re-appeared in the sqs queue - # Instead just reset and get fresh messages from the sqs queue - msg = "Clearing Local messages since we exceeded their visibility_timeout" + # Don't add any more messages since they have + # re-appeared in the sqs queue Instead just reset and get + # fresh messages from the sqs queue + msg = ( + "Clearing Local messages since we exceeded " + "their visibility_timeout" + ) logger.warning(msg) break @@ -98,18 +116,26 @@ def read_message(self): "start_time": start, "timeout": self.visibility_timeout, } - self.internal_queue.put(packed_message, True, self.visibility_timeout) + self.internal_queue.put( + packed_message, True, self.visibility_timeout) except Full: - msg = "Timed out trying to add the following message to the internal queue after {} seconds: {}".format(self.visibility_timeout, message_body) # noqa + msg = ( + "Timed out trying to add the following message " + "to the internal queue after {} seconds: {}" + ).format(self.visibility_timeout, message_body) # noqa logger.warning(msg) continue else: - logger.debug("Message successfully added to internal queue from SQS queue {} with body: {}".format(self.queue_url, message_body)) # noqa + logger.debug( + "Message successfully added to internal queue " + "from SQS queue {} with body: {}".format( + self.queue_url, message_body)) # noqa class ProcessWorker(BaseWorker): - def __init__(self, internal_queue, interval, connection_args=None, *args, **kwargs): + def __init__(self, internal_queue, interval, connection_args=None, *args, + **kwargs): super(ProcessWorker, self).__init__(*args, **kwargs) if connection_args is None: self.conn = get_conn() @@ -161,7 +187,8 @@ def process_message(self): current_time = time.time() if int(current_time - fetch_time) >= timeout: logger.warning( - "Discarding task {} with args: {} and kwargs: {} due to exceeding visibility timeout".format( # noqa + "Discarding task {} with args: {} and kwargs: {} due to " + "exceeding visibility timeout".format( # noqa full_task_path, repr(args), repr(kwargs), @@ -174,7 +201,8 @@ def process_message(self): except Exception: end_time = time.clock() logger.exception( - "Task {} raised error in {:.4f} seconds: with args: {} and kwargs: {}: {}".format( + "Task {} raised error in {:.4f} seconds: with args: {} " + "and kwargs: {}: {}".format( full_task_path, end_time - start_time, args, @@ -190,7 +218,8 @@ def process_message(self): ReceiptHandle=message['ReceiptHandle'] ) logger.info( - "Processed task {} in {:.4f} seconds with args: {} and kwargs: {}".format( + "Processed task {} in {:.4f} seconds with args: {} " + "and kwargs: {}".format( full_task_path, end_time - start_time, repr(args), @@ -202,7 +231,9 @@ def process_message(self): class ManagerWorker(object): - def __init__(self, queue_prefixes, worker_concurrency, interval, batchsize, prefetch_multiplier=2, region='us-east-1', access_key_id=None, secret_access_key=None): + def __init__(self, queue_prefixes, worker_concurrency, interval, batchsize, + prefetch_multiplier=2, region='us-east-1', access_key_id=None, + secret_access_key=None): self.connection_args = { "region": region, "access_key_id": access_key_id, @@ -216,7 +247,8 @@ def __init__(self, queue_prefixes, worker_concurrency, interval, batchsize, pref self.interval = interval self.prefetch_multiplier = prefetch_multiplier self.load_queue_prefixes(queue_prefixes) - self.queue_urls = self.get_queue_urls_from_queue_prefixes(self.queue_prefixes) + self.queue_urls = self.get_queue_urls_from_queue_prefixes( + self.queue_prefixes) self.setup_internal_queue(worker_concurrency) self.reader_children = [] self.worker_children = [] @@ -226,16 +258,27 @@ def __init__(self, queue_prefixes, worker_concurrency, interval, batchsize, pref self._register_signals() def _register_signals(self): - for SIG in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, signal.SIGHUP]: + for SIG in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, + signal.SIGHUP]: self.register_shutdown_signal(SIG) def _initialize_reader_children(self): for queue_url in self.queue_urls: - self.reader_children.append(ReadWorker(queue_url, self.internal_queue, self.batchsize, connection_args=self.connection_args)) + self.reader_children.append( + ReadWorker( + queue_url, self.internal_queue, self.batchsize, + connection_args=self.connection_args, + ) + ) def _initialize_worker_children(self, number): for index in range(number): - self.worker_children.append(ProcessWorker(self.internal_queue, self.interval, connection_args=self.connection_args)) + self.worker_children.append( + ProcessWorker( + self.internal_queue, self.interval, + connection_args=self.connection_args, + ) + ) def load_queue_prefixes(self, queue_prefixes): self.queue_prefixes = queue_prefixes @@ -257,7 +300,8 @@ def get_queue_urls_from_queue_prefixes(self, queue_prefixes): return matching_urls def setup_internal_queue(self, worker_concurrency): - self.internal_queue = Queue(worker_concurrency * self.prefetch_multiplier * self.batchsize) + self.internal_queue = Queue( + worker_concurrency * self.prefetch_multiplier * self.batchsize) def start(self): for child in self.reader_children: @@ -291,7 +335,7 @@ def register_shutdown_signal(self, SIG): signal.signal(SIG, self._graceful_shutdown) def _graceful_shutdown(self, signum, frame): - logger.info('Received shutdown signal', signum) + logger.info('Received shutdown signal %s', signum) self._running = False def _exit(self): @@ -312,18 +356,28 @@ def replace_workers(self): def _replace_reader_children(self): for index, reader in enumerate(self.reader_children): if not reader.is_alive(): - logger.info("Reader Process {} is no longer responding, spawning a new reader.".format(reader.pid)) + logger.info( + "Reader Process {} is no longer responding, " + "spawning a new reader.".format(reader.pid)) queue_url = reader.queue_url self.reader_children.pop(index) - worker = ReadWorker(queue_url, self.internal_queue, self.batchsize, connection_args=self.connection_args) + worker = ReadWorker( + queue_url, self.internal_queue, self.batchsize, + connection_args=self.connection_args, + ) worker.start() self.reader_children.append(worker) def _replace_worker_children(self): for index, worker in enumerate(self.worker_children): if not worker.is_alive(): - logger.info("Worker Process {} is no longer responding, spawning a new worker.".format(worker.pid)) + logger.info( + "Worker Process {} is no longer responding, " + "spawning a new worker.".format(worker.pid)) self.worker_children.pop(index) - worker = ProcessWorker(self.internal_queue, self.interval, connection_args=self.connection_args) + worker = ProcessWorker( + self.internal_queue, self.interval, + connection_args=self.connection_args, + ) worker.start() self.worker_children.append(worker) diff --git a/tests/tasks.py b/tests/tasks.py index 58c82c7..a92ddd7 100644 --- a/tests/tasks.py +++ b/tests/tasks.py @@ -13,7 +13,8 @@ def index_incrementer(message, extra=None): if isinstance(message, basestring): task_results.append(message) else: - raise ValueError("Need to be given basestring, was given {}".format(message)) + raise ValueError( + "Need to be given basestring, was given {}".format(message)) @task() diff --git a/tests/test_manager_worker.py b/tests/test_manager_worker.py index beefcb2..da1133a 100644 --- a/tests/test_manager_worker.py +++ b/tests/test_manager_worker.py @@ -10,7 +10,9 @@ from pyqs.main import main, _main from pyqs.worker import ManagerWorker -from tests.utils import MockLoggingHandler, ThreadWithReturnValue2, ThreadWithReturnValue3 +from tests.utils import ( + MockLoggingHandler, ThreadWithReturnValue2, ThreadWithReturnValue3, +) @mock_sqs @@ -22,7 +24,10 @@ def test_manager_worker_create_proper_children_workers(): conn = boto3.client('sqs', region_name='us-east-1') conn.create_queue(QueueName="email") - manager = ManagerWorker(queue_prefixes=['email'], worker_concurrency=3, interval=2, batchsize=10) + manager = ManagerWorker( + queue_prefixes=['email'], worker_concurrency=3, interval=2, + batchsize=10, + ) len(manager.reader_children).should.equal(1) len(manager.worker_children).should.equal(3) @@ -38,15 +43,20 @@ def test_manager_worker_with_queue_prefix(): conn.create_queue(QueueName="email.foobar") conn.create_queue(QueueName="email.baz") - manager = ManagerWorker(queue_prefixes=['email.*'], worker_concurrency=1, interval=1, batchsize=10) + manager = ManagerWorker( + queue_prefixes=['email.*'], worker_concurrency=1, interval=1, + batchsize=10, + ) len(manager.reader_children).should.equal(2) children = manager.reader_children # Pull all the read children and sort by name to make testing easier sorted_children = sorted(children, key=lambda child: child.queue_url) - sorted_children[0].queue_url.should.equal("https://queue.amazonaws.com/123456789012/email.baz") - sorted_children[1].queue_url.should.equal("https://queue.amazonaws.com/123456789012/email.foobar") + sorted_children[0].queue_url.should.equal( + "https://queue.amazonaws.com/123456789012/email.baz") + sorted_children[1].queue_url.should.equal( + "https://queue.amazonaws.com/123456789012/email.foobar") @mock_sqs @@ -58,7 +68,10 @@ def test_manager_start_and_stop(): conn = boto3.client('sqs', region_name='us-east-1') conn.create_queue(QueueName="email") - manager = ManagerWorker(queue_prefixes=['email'], worker_concurrency=2, interval=1, batchsize=10) + manager = ManagerWorker( + queue_prefixes=['email'], worker_concurrency=2, interval=1, + batchsize=10, + ) len(manager.worker_children).should.equal(2) @@ -85,7 +98,10 @@ def test_main_method(ManagerWorker): """ _main(["email1", "email2"], concurrency=2) - ManagerWorker.assert_called_once_with(['email1', 'email2'], 2, 1, 10, prefetch_multiplier=2, region='us-east-1', secret_access_key=None, access_key_id=None) + ManagerWorker.assert_called_once_with( + ['email1', 'email2'], 2, 1, 10, prefetch_multiplier=2, + region='us-east-1', secret_access_key=None, access_key_id=None, + ) ManagerWorker.return_value.start.assert_called_once_with() @@ -97,26 +113,18 @@ def test_real_main_method(ArgumentParser, _main): """ Test parsing of arguments from main method """ - ArgumentParser.return_value.parse_args.return_value = Mock(concurrency=3, - queues=["email1"], - interval=1, - batchsize=10, - logging_level="WARN", - region='us-east-1', - prefetch_multiplier=2, - access_key_id=None, - secret_access_key=None) + ArgumentParser.return_value.parse_args.return_value = Mock( + concurrency=3, queues=["email1"], interval=1, batchsize=10, + logging_level="WARN", region='us-east-1', prefetch_multiplier=2, + access_key_id=None, secret_access_key=None, + ) main() - _main.assert_called_once_with(queue_prefixes=['email1'], - concurrency=3, - interval=1, - batchsize=10, - logging_level="WARN", - region='us-east-1', - prefetch_multiplier=2, - access_key_id=None, - secret_access_key=None) + _main.assert_called_once_with( + queue_prefixes=['email1'], concurrency=3, interval=1, batchsize=10, + logging_level="WARN", region='us-east-1', prefetch_multiplier=2, + access_key_id=None, secret_access_key=None, + ) @mock_sqs @@ -157,7 +165,10 @@ def test_master_replaces_reader_processes(): conn.create_queue(QueueName="tester") # Setup Manager - manager = ManagerWorker(queue_prefixes=["tester"], worker_concurrency=1, interval=1, batchsize=10) + manager = ManagerWorker( + queue_prefixes=["tester"], worker_concurrency=1, interval=1, + batchsize=10, + ) manager.start() # Get Reader PID @@ -203,9 +214,11 @@ def test_master_counts_processes(): # Check messages msg1 = "Reader Processes: 1" - logger.handlers[0].messages['debug'][-2].lower().should.contain(msg1.lower()) + logger.handlers[0].messages['debug'][-2].lower().should.contain( + msg1.lower()) msg2 = "Worker Processes: 2" - logger.handlers[0].messages['debug'][-1].lower().should.contain(msg2.lower()) + logger.handlers[0].messages['debug'][-1].lower().should.contain( + msg2.lower()) @mock_sqs @@ -219,7 +232,10 @@ def test_master_replaces_worker_processes(): conn.create_queue(QueueName="tester") # Setup Manager - manager = ManagerWorker(queue_prefixes=["tester"], worker_concurrency=1, interval=1, batchsize=10) + manager = ManagerWorker( + queue_prefixes=["tester"], worker_concurrency=1, interval=1, + batchsize=10, + ) manager.start() # Get Worker PID @@ -257,7 +273,10 @@ def process_counts(): os.kill(os.getpid(), signal.SIGTERM) # Setup Manager - manager = ManagerWorker(queue_prefixes=["tester"], worker_concurrency=1, interval=1, batchsize=10) + manager = ManagerWorker( + queue_prefixes=["tester"], worker_concurrency=1, interval=1, + batchsize=10, + ) manager.process_counts = process_counts manager._graceful_shutdown = MagicMock() @@ -317,7 +336,10 @@ def sleep_and_kill(pid): return False # Setup Manager - manager = ManagerWorker(queue_prefixes=["tester"], worker_concurrency=0, interval=0.0, batchsize=1) + manager = ManagerWorker( + queue_prefixes=["tester"], worker_concurrency=0, interval=0.0, + batchsize=1, + ) manager.start() # Give our processes a moment to start @@ -326,11 +348,15 @@ def sleep_and_kill(pid): # Setup Threading watcher try: # Try Python 2 Style - thread = ThreadWithReturnValue2(target=sleep_and_kill, args=(manager.reader_children[0].pid,)) + thread = ThreadWithReturnValue2( + target=sleep_and_kill, args=(manager.reader_children[0].pid,)) thread.daemon = True except TypeError: # Use Python 3 Style - thread = ThreadWithReturnValue3(target=sleep_and_kill, args=(manager.reader_children[0].pid,), daemon=True) + thread = ThreadWithReturnValue3( + target=sleep_and_kill, args=(manager.reader_children[0].pid,), + daemon=True, + ) thread.start() @@ -390,7 +416,10 @@ def sleep_and_kill(pid): return False # Setup Manager - manager = ManagerWorker(queue_prefixes=["tester"], worker_concurrency=1, interval=0.0, batchsize=1) + manager = ManagerWorker( + queue_prefixes=["tester"], worker_concurrency=1, interval=0.0, + batchsize=1, + ) manager.start() # Give our processes a moment to start @@ -399,11 +428,15 @@ def sleep_and_kill(pid): # Setup Threading watcher try: # Try Python 2 Style - thread = ThreadWithReturnValue2(target=sleep_and_kill, args=(manager.reader_children[0].pid,)) + thread = ThreadWithReturnValue2( + target=sleep_and_kill, args=(manager.reader_children[0].pid,)) thread.daemon = True except TypeError: # Use Python 3 Style - thread = ThreadWithReturnValue3(target=sleep_and_kill, args=(manager.reader_children[0].pid,), daemon=True) + thread = ThreadWithReturnValue3( + target=sleep_and_kill, args=(manager.reader_children[0].pid,), + daemon=True, + ) thread.start() diff --git a/tests/test_tasks.py b/tests/test_tasks.py index a4f3bc0..de54063 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -3,7 +3,9 @@ import boto3 from moto import mock_sqs, mock_sqs_deprecated -from .tasks import index_incrementer, send_email, delayed_task, custom_path_task +from .tasks import ( + index_incrementer, send_email, delayed_task, custom_path_task, +) @mock_sqs() @@ -21,8 +23,12 @@ def test_basic_delay(): len(all_queues).should.equal(1) queue_url = all_queues[0] - queue_url.should.equal("https://queue.amazonaws.com/123456789012/tests.tasks.index_incrementer") - queue = conn.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] + queue_url.should.equal( + "https://queue.amazonaws.com/123456789012/" + "tests.tasks.index_incrementer" + ) + queue = conn.get_queue_attributes( + QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] queue['ApproximateNumberOfMessages'].should.equal('1') message = conn.receive_message(QueueUrl=queue_url)['Messages'][0] @@ -49,7 +55,8 @@ def test_specified_queue(): queue_url = queue_urls[0] queue_url.should.equal("https://queue.amazonaws.com/123456789012/email") - queue = conn.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] + queue = conn.get_queue_attributes( + QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] queue['ApproximateNumberOfMessages'].should.equal('1') @@ -68,7 +75,8 @@ def test_message_delay(): queue_url = queue_urls[0] queue_url.should.equal("https://queue.amazonaws.com/123456789012/delayed") - queue = conn.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] + queue = conn.get_queue_attributes( + QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] queue['ApproximateNumberOfMessages'].should.equal('0') @@ -87,7 +95,8 @@ def test_message_add_delay(): queue_url = queue_urls[0] queue_url.should.equal("https://queue.amazonaws.com/123456789012/email") - queue = conn.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] + queue = conn.get_queue_attributes( + QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] queue['ApproximateNumberOfMessages'].should.equal('0') @@ -106,7 +115,8 @@ def test_message_no_delay(): queue_url = queue_urls[0] queue_url.should.equal("https://queue.amazonaws.com/123456789012/delayed") - queue = conn.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] + queue = conn.get_queue_attributes( + QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] queue['ApproximateNumberOfMessages'].should.equal('1') @@ -124,7 +134,8 @@ def test_custom_function_path(): len(queue_urls).should.equal(1) queue_url = queue_urls[0] queue_url.should.equal("https://queue.amazonaws.com/123456789012/foobar") - queue = conn.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] + queue = conn.get_queue_attributes( + QueueUrl=queue_url, AttributeNames=['All'])['Attributes'] queue['ApproximateNumberOfMessages'].should.equal('1') message = conn.receive_message(QueueUrl=queue_url)['Messages'][0] diff --git a/tests/test_worker.py b/tests/test_worker.py index 3b786b2..04313d6 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -12,7 +12,10 @@ import boto3 from moto import mock_sqs from mock import patch, Mock -from pyqs.worker import ManagerWorker, ReadWorker, ProcessWorker, BaseWorker, MESSAGE_DOWNLOAD_BATCH_SIZE +from pyqs.worker import ( + ManagerWorker, ReadWorker, ProcessWorker, BaseWorker, + MESSAGE_DOWNLOAD_BATCH_SIZE, +) from pyqs.utils import decode_message from tests.tasks import task_results from tests.utils import MockLoggingHandler @@ -61,7 +64,8 @@ def test_worker_fills_internal_queue_only_until_maximum_queue_size(): """ conn = boto3.client('sqs', region_name='us-east-1') # Set visibility timeout low to improve test speed - queue_url = conn.create_queue(QueueName="tester", Attributes={'VisibilityTimeout': '1'})['QueueUrl'] + queue_url = conn.create_queue( + QueueName="tester", Attributes={'VisibilityTimeout': '1'})['QueueUrl'] message = json.dumps({ 'task': 'tests.tasks.index_incrementer', @@ -97,7 +101,12 @@ def test_worker_fills_internal_queue_from_celery_task(): conn = boto3.client('sqs', region_name='us-east-1') queue_url = conn.create_queue(QueueName="tester")['QueueUrl'] - message = '{"body": "KGRwMApTJ3Rhc2snCnAxClMndGVzdHMudGFza3MuaW5kZXhfaW5jcmVtZW50ZXInCnAyCnNTJ2Fy\\nZ3MnCnAzCihscDQKc1Mna3dhcmdzJwpwNQooZHA2ClMnbWVzc2FnZScKcDcKUydUZXN0IG1lc3Nh\\nZ2UyJwpwOApzcy4=\\n", "some stuff": "asdfasf"}' + message = ( + '{"body": "KGRwMApTJ3Rhc2snCnAxClMndGVzdHMudGFza3MuaW5kZXhfa' + 'W5jcmVtZW50ZXInCnAyCnNTJ2Fy\\nZ3MnCnAzCihscDQKc1Mna3dhcmdzJw' + 'pwNQooZHA2ClMnbWVzc2FnZScKcDcKUydUZXN0IG1lc3Nh\\nZ2UyJwpwOAp' + 'zcy4=\\n", "some stuff": "asdfasf"}' + ) conn.send_message(QueueUrl=queue_url, MessageBody=message) internal_queue = Queue() @@ -140,7 +149,14 @@ def test_worker_processes_tasks_from_internal_queue(): # Add message to queue internal_queue = Queue() - internal_queue.put({"message": message, "queue": queue_url, "start_time": time.time(), "timeout": 30}) + internal_queue.put( + { + "message": message, + "queue": queue_url, + "start_time": time.time(), + "timeout": 30, + } + ) # Process message worker = ProcessWorker(internal_queue, INTERVAL) @@ -168,10 +184,20 @@ def test_worker_fills_internal_queue_and_respects_visibility_timeouts(): # Setup SQS Queue conn = boto3.client('sqs', region_name='us-east-1') - queue_url = conn.create_queue(QueueName="tester", Attributes={'VisibilityTimeout': '1'})['QueueUrl'] + queue_url = conn.create_queue( + QueueName="tester", Attributes={'VisibilityTimeout': '1'})['QueueUrl'] # Add MEssages - message = json.dumps({"body": "KGRwMApTJ3Rhc2snCnAxClMndGVzdHMudGFza3MuaW5kZXhfaW5jcmVtZW50ZXInCnAyCnNTJ2Fy\nZ3MnCnAzCihscDQKc1Mna3dhcmdzJwpwNQooZHA2ClMnbWVzc2FnZScKcDcKUydUZXN0IG1lc3Nh\nZ2UyJwpwOApzcy4=\n", "some stuff": "asdfasf"}) + message = json.dumps( + { + "body": ( + "KGRwMApTJ3Rhc2snCnAxClMndGVzdHMudGFza3MuaW5kZXhfaW5jcmVtZW" + "50ZXInCnAyCnNTJ2Fy\nZ3MnCnAzCihscDQKc1Mna3dhcmdzJwpwNQooZHA" + "2ClMnbWVzc2FnZScKcDcKUydUZXN0IG1lc3Nh\nZ2UyJwpwOApzcy4=\n" + ), + "some stuff": "asdfasf", + } + ) for _ in range(3): conn.send_message(QueueUrl=queue_url, MessageBody=message) @@ -181,8 +207,10 @@ def test_worker_fills_internal_queue_and_respects_visibility_timeouts(): worker.read_message() # Check log messages - logger.handlers[0].messages['warning'][0].should.contain("Timed out trying to add the following message to the internal queue") - logger.handlers[0].messages['warning'][1].should.contain("Clearing Local messages since we exceeded their visibility_timeout") + logger.handlers[0].messages['warning'][0].should.contain( + "Timed out trying to add the following message to the internal queue") + logger.handlers[0].messages['warning'][1].should.contain( + "Clearing Local messages since we exceeded their visibility_timeout") @mock_sqs @@ -213,7 +241,14 @@ def test_worker_processes_tasks_and_logs_correctly(): # Add message to internal queue internal_queue = Queue() - internal_queue.put({"queue": queue_url, "message": message, "start_time": time.time(), "timeout": 30}) + internal_queue.put( + { + "queue": queue_url, + "message": message, + "start_time": time.time(), + "timeout": 30, + } + ) # Process message worker = ProcessWorker(internal_queue, INTERVAL) @@ -221,7 +256,10 @@ def test_worker_processes_tasks_and_logs_correctly(): # Check output kwargs = json.loads(message['Body'])['kwargs'] - expected_result = u"Processed task tests.tasks.index_incrementer in 0.0000 seconds with args: [] and kwargs: {}".format(kwargs) + expected_result = ( + u"Processed task tests.tasks.index_incrementer in 0.0000 seconds " + "with args: [] and kwargs: {}".format(kwargs) + ) logger.handlers[0].messages['info'].should.equal([expected_result]) @@ -253,7 +291,14 @@ def test_worker_processes_tasks_and_logs_warning_correctly(): # Add message to internal queue internal_queue = Queue() - internal_queue.put({"queue": queue_url, "message": message, "start_time": time.time(), "timeout": 30}) + internal_queue.put( + { + "queue": queue_url, + "message": message, + "start_time": time.time(), + "timeout": 30, + } + ) # Process message worker = ProcessWorker(internal_queue, INTERVAL) @@ -261,10 +306,20 @@ def test_worker_processes_tasks_and_logs_warning_correctly(): # Check output kwargs = json.loads(message['Body'])['kwargs'] - msg1 = "Task tests.tasks.index_incrementer raised error in 0.0000 seconds: with args: [] and kwargs: {}: Traceback (most recent call last)".format(kwargs) # noqa - logger.handlers[0].messages['error'][0].lower().should.contain(msg1.lower()) - msg2 = 'raise ValueError("Need to be given basestring, was given {}".format(message))\nValueError: Need to be given basestring, was given 23' # noqa - logger.handlers[0].messages['error'][0].lower().should.contain(msg2.lower()) + msg1 = ( + "Task tests.tasks.index_incrementer raised error in 0.0000 seconds: " + "with args: [] and kwargs: {}: " + "Traceback (most recent call last)".format(kwargs) + ) # noqa + logger.handlers[0].messages['error'][0].lower().should.contain( + msg1.lower()) + msg2 = ( + 'raise ValueError("Need to be given basestring, was given ' + '{}".format(message))\nValueError: Need to be given basestring, ' + 'was given 23' + ) # noqa + logger.handlers[0].messages['error'][0].lower().should.contain( + msg2.lower()) @mock_sqs @@ -378,7 +433,8 @@ def test_read_worker_with_parent_process_dead_and_should_not_exit(os): @patch("pyqs.worker.os") def test_process_worker_with_parent_process_alive_and_should_not_exit(os): """ - Test worker processes do not exit when parent is alive and shutdown is not set + Test worker processes do not exit when parent is alive and shutdown + is not set """ # Setup PPID os.getppid.return_value = 1234 @@ -431,7 +487,7 @@ def test_process_worker_with_parent_process_alive_and_should_exit(os): @mock_sqs -def test_worker_processes_shuts_down_after_processing_its_maximum_number_of_messages(): +def test_worker_processes_shuts_down_after_processing_its_max_number_of_msgs(): """ Test worker processes shutdown after processing maximum number of messages """ @@ -453,9 +509,30 @@ def test_worker_processes_shuts_down_after_processing_its_maximum_number_of_mess # Add message to internal queue internal_queue = Queue(3) - internal_queue.put({"queue": queue_url, "message": message, "start_time": time.time(), "timeout": 30}) - internal_queue.put({"queue": queue_url, "message": message, "start_time": time.time(), "timeout": 30}) - internal_queue.put({"queue": queue_url, "message": message, "start_time": time.time(), "timeout": 30}) + internal_queue.put( + { + "queue": queue_url, + "message": message, + "start_time": time.time(), + "timeout": 30, + } + ) + internal_queue.put( + { + "queue": queue_url, + "message": message, + "start_time": time.time(), + "timeout": 30, + } + ) + internal_queue.put( + { + "queue": queue_url, + "message": message, + "start_time": time.time(), + "timeout": 30, + } + ) # When I Process messages worker = ProcessWorker(internal_queue, INTERVAL) @@ -497,7 +574,14 @@ def test_worker_processes_discard_tasks_that_exceed_their_visibility_timeout(): # Add message to internal queue with timeout of 0 that started long ago internal_queue = Queue() - internal_queue.put({"queue": queue_url, "message": message, "start_time": 0, "timeout": 0}) + internal_queue.put( + { + "queue": queue_url, + "message": message, + "start_time": 0, + "timeout": 0, + } + ) # When I process the message worker = ProcessWorker(internal_queue, INTERVAL) @@ -505,14 +589,20 @@ def test_worker_processes_discard_tasks_that_exceed_their_visibility_timeout(): # Then I get an error about exceeding the visibility timeout kwargs = json.loads(message['Body'])['kwargs'] - msg1 = "Discarding task tests.tasks.index_incrementer with args: [] and kwargs: {} due to exceeding visibility timeout".format(kwargs) # noqa - logger.handlers[0].messages['warning'][0].lower().should.contain(msg1.lower()) + msg1 = ( + "Discarding task tests.tasks.index_incrementer with args: [] " + "and kwargs: {} due to exceeding " + "visibility timeout" + ).format(kwargs) # noqa + logger.handlers[0].messages['warning'][0].lower().should.contain( + msg1.lower()) @mock_sqs -def test_worker_processes_only_increases_processed_counter_if_a_message_was_processed(): +def test_worker_processes_only_incr_processed_counter_if_a_msg_was_processed(): """ - Test worker process only increases processed counter if a message was processed + Test worker process only increases processed counter if a message was + processed """ # Setup SQS Queue conn = boto3.client('sqs', region_name='us-east-1') @@ -532,12 +622,26 @@ def test_worker_processes_only_increases_processed_counter_if_a_message_was_proc # Add message to internal queue internal_queue = Queue(3) - internal_queue.put({"queue": queue_url, "message": message, "start_time": time.time(), "timeout": 30}) + internal_queue.put( + { + "queue": queue_url, + "message": message, + "start_time": time.time(), + "timeout": 30, + } + ) # And we add a message to the queue later def sleep_and_queue(internal_queue): time.sleep(1) - internal_queue.put({"queue": queue_url, "message": message, "start_time": time.time(), "timeout": 30}) + internal_queue.put( + { + "queue": queue_url, + "message": message, + "start_time": time.time(), + "timeout": 30, + } + ) thread = threading.Thread(target=sleep_and_queue, args=(internal_queue,)) thread.daemon = True diff --git a/tests/utils.py b/tests/utils.py index f36b029..3e9a9b4 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -27,13 +27,15 @@ def reset(self): class ThreadWithReturnValue2(Thread): - def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None): + def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, + Verbose=None): Thread.__init__(self, group, target, name, args, kwargs, Verbose) self._return = None def run(self): if self._Thread__target is not None: - self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) + self._return = self._Thread__target( + *self._Thread__args, **self._Thread__kwargs) def join(self): Thread.join(self) @@ -41,7 +43,8 @@ def join(self): class ThreadWithReturnValue3(Thread): - def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, daemon=None): + def __init__(self, group=None, target=None, name=None, args=(), + kwargs=None, daemon=None): Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon) self._return = None