Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ install_deps:
run_test:
@echo "Running \033[0;32mtest suite\033[0m "; \
AWS_DEFAULT_REGION='us-east-1' nosetests --stop --with-coverage --cover-package=$(PACKAGE) \
--cover-branches --cover-erase --verbosity=2; \
--cover-branches --cover-erase --verbosity=2; pycodestyle; \

test: prepare
@make run_test
Expand Down
1 change: 1 addition & 0 deletions development.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ nose==1.3.0
pre-commit==0.7.6
sure==1.2.2
functools32;python_version=='2.7'
pycodestyle==2.4.0
16 changes: 12 additions & 4 deletions example/api/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

blueprint = Blueprint('api', __name__)


@blueprint.route('/example', methods=['POST'])
def example():
queues = current_app.config.get('QUEUES')
Expand All @@ -18,23 +19,30 @@ def example():
message_data = json.loads(data)
except (TypeError, AttributeError, ValueError) as e:
status = 500
return construct_response('Your payload does not appear to be valid json!', data, status)
return construct_response(
'Your payload does not appear to be valid json!', data, status)

try:
tasks.process.delay(message=message_data)
response_message = f'Successfully submitted message to queue {queue_name}'
response_message = (
f'Successfully submitted message to queue {queue_name}'
)
status = 200
except Exception as e:
response_message = f'Something went wrong submitting message to queue {queue_name}! {e}'
response_message = (
f'Something went wrong submitting message '
'to queue {queue_name}! {e}'
)
status = 500

return construct_response(response_message, message_data, status)


@blueprint.route('/health', methods=['GET'])
def health():
return jsonify('OK'), 200


@blueprint.route('/', methods=['GET'])
def hello():
return 'Hello!'
6 changes: 5 additions & 1 deletion example/api/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
import logging
from flask import jsonify


def construct_response(message, payload, status):
body = {}

if status == 500:
body['message'] = 'Something went wrong constructing response. Is your payload valid JSON?'
body['message'] = (
'Something went wrong constructing response. '
'Is your payload valid JSON?'
)
body['request_payload'] = str(payload)
else:
body['message'] = message
Expand Down
6 changes: 5 additions & 1 deletion example/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
from flask import Flask, current_app
from pyqs import task


# This task listens to 'queue-example' SQS queues for messages with
# {"task": "process"}
@task(queue='queue-example')
def process(message):
logging.info(f'PyQS task process() is processing message {message}. Clap your hands!')
logging.info(
f'PyQS task process() is processing message {message}. '
'Clap your hands!'
)
2 changes: 2 additions & 0 deletions example/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
config_name = os.environ.get('ENV', 'development')
current_config = config[config_name]


def create_app():
app = Flask(__name__)
app.config.from_object(current_config)

return app


app = create_app()
app.register_blueprint(example_blueprint)

Expand Down
2 changes: 1 addition & 1 deletion example/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

config = {
'development': DevelopmentConfig
#'production': ProductionConfig
# 'production': ProductionConfig
}
1 change: 1 addition & 0 deletions example/config/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import logging


class Config(object):

AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
Expand Down
1 change: 1 addition & 0 deletions example/config/development.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .common import Config


class DevelopmentConfig(Config):
DEBUG = True

Expand Down
12 changes: 8 additions & 4 deletions pyqs/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ def get_or_create_queue(queue_name):
try:
return sqs.get_queue_by_name(QueueName=queue_name)
except ClientError as exc:
if exc.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
non_existent_code = 'AWS.SimpleQueueService.NonExistentQueue'
if exc.response['Error']['Code'] == non_existent_code:
return sqs.create_queue(QueueName=queue_name)
else:
raise


def task_delayer(func_to_delay, queue_name, delay_seconds=None, override=False):
def task_delayer(func_to_delay, queue_name, delay_seconds=None,
override=False):
function_path = function_to_import_path(func_to_delay, override=override)

if not queue_name:
Expand Down Expand Up @@ -55,7 +57,8 @@ def wrapper(*args, **kwargs):


class task(object):
def __init__(self, queue=None, delay_seconds=None, custom_function_path=None):
def __init__(self, queue=None, delay_seconds=None,
custom_function_path=None):
self.queue_name = queue
self.delay_seconds = delay_seconds
self.function_path = custom_function_path
Expand All @@ -67,5 +70,6 @@ def __call__(self, *args, **kwargs):
if self.function_path:
override = True
function = self.function_path
func_to_wrap.delay = task_delayer(function, self.queue_name, self.delay_seconds, override=override)
func_to_wrap.delay = task_delayer(
function, self.queue_name, self.delay_seconds, override=override)
return func_to_wrap
25 changes: 20 additions & 5 deletions pyqs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ def main():
dest="logging_level",
type=str,
default="WARN",
help='Set logging level. This must be one of the python default logging levels',
help=(
'Set logging level. '
'This must be one of the python default logging levels'
),
action="store",
)

Expand Down Expand Up @@ -95,7 +98,10 @@ def main():
dest="prefetch_multiplier",
type=int,
default=2,
help='Multiplier on the size of the internal queue for prefetching SQS messages.',
help=(
'Multiplier on the size of the internal queue '
'for prefetching SQS messages.'
),
action="store",
)

Expand All @@ -114,9 +120,18 @@ def main():
)


def _main(queue_prefixes, concurrency=5, logging_level="WARN", region='us-east-1', access_key_id=None, secret_access_key=None, interval=1, batchsize=10, prefetch_multiplier=2):
logging.basicConfig(format="[%(levelname)s]: %(message)s", level=getattr(logging, logging_level))
def _main(queue_prefixes, concurrency=5, logging_level="WARN",
region='us-east-1', access_key_id=None, secret_access_key=None,
interval=1, batchsize=10, prefetch_multiplier=2):
logging.basicConfig(
format="[%(levelname)s]: %(message)s",
level=getattr(logging, logging_level),
)
logger.info("Starting PyQS version {}".format(__version__))
manager = ManagerWorker(queue_prefixes, concurrency, interval, batchsize, prefetch_multiplier=prefetch_multiplier, region=region, access_key_id=access_key_id, secret_access_key=secret_access_key)
manager = ManagerWorker(
queue_prefixes, concurrency, interval, batchsize,
prefetch_multiplier=prefetch_multiplier, region=region,
access_key_id=access_key_id, secret_access_key=secret_access_key,
)
manager.start()
manager.sleep()
Loading