diff --git a/.ci/travis-build-wheels.sh b/.ci/travis-build-wheels.sh index c1d9b77e..30a77b18 100755 --- a/.ci/travis-build-wheels.sh +++ b/.ci/travis-build-wheels.sh @@ -9,7 +9,7 @@ if [[ "${TRAVIS_BRANCH}" != "releases" || "${BUILD}" != *wheels* ]]; then fi -if [ "${TRAVIS_OS_NAME}" == "osx" ]; then +if [ "${PYENV}" == "true" ]; then PYENV_ROOT="$HOME/.pyenv" PATH="$PYENV_ROOT/bin:$PATH" eval "$(pyenv init -)" diff --git a/.ci/travis-install.sh b/.ci/travis-install.sh index 191b775c..0848a60e 100755 --- a/.ci/travis-install.sh +++ b/.ci/travis-install.sh @@ -2,9 +2,17 @@ set -e -x -if [ "${TRAVIS_OS_NAME}" == "osx" ]; then - brew update >/dev/null - brew upgrade pyenv +if [ "${PYENV}" == "true" ]; then + + if [ "${TRAVIS_OS_NAME}" == "osx" ]; then + brew update >/dev/null + brew upgrade pyenv + else + git clone --depth 1 https://github.com/yyuu/pyenv.git ~/.pyenv + PYENV_ROOT="$HOME/.pyenv" + PATH="$PYENV_ROOT/bin:$PATH" + fi + eval "$(pyenv init -)" if ! (pyenv versions | grep "${PYTHON_VERSION}$"); then @@ -13,11 +21,26 @@ if [ "${TRAVIS_OS_NAME}" == "osx" ]; then pyenv global ${PYTHON_VERSION} pyenv rehash + python --version +fi + +if [ "${TRAVIS_OS_NAME}" == "osx" ]; then + brew update + brew install gnu-sed --with-default-names brew outdated libtool || brew upgrade libtool brew outdated autoconf || brew upgrade autoconf --with-default-names brew outdated automake || brew upgrade automake --with-default-names + + # Pined to 9.0.X till following issues are addressed + # https://github.com/pypa/pip/issues/5240 + # https://github.com/pyenv/pyenv/issues/1141 + pip install --upgrade pip~=9.0.0 +else + pip install --upgrade pip fi -pip install --upgrade setuptools pip wheel + +pip install --upgrade wheel +pip install --upgrade setuptools pip install -r .ci/requirements.txt diff --git a/.ci/travis-tests.sh b/.ci/travis-tests.sh index 0fc4aa23..61aeb3eb 100755 --- a/.ci/travis-tests.sh +++ b/.ci/travis-tests.sh @@ -7,11 +7,14 @@ if [[ "${BUILD}" != *tests* ]]; then exit 0 fi -if [ "${TRAVIS_OS_NAME}" == "osx" ]; then +if [ "${PYENV}" == "true" ]; then PYENV_ROOT="$HOME/.pyenv" PATH="$PYENV_ROOT/bin:$PATH" eval "$(pyenv init -)" + pyenv global ${PYTHON_VERSION} fi +python --version + make distclean && make && make test make distclean && make debug && make test diff --git a/.travis.yml b/.travis.yml index a5acfa8b..3c5ffa79 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,42 +28,12 @@ matrix: fast_finish: true include: - - os: osx - osx_image: xcode7.3 - # Travis macOS env does not support Python yet, - # so we have to set things up manually in install.sh. - env: BUILD=tests,wheels PYTHON_VERSION=3.5.5 - branches: {only: [releases]} - - - os: osx - osx_image: xcode7.3 - # Travis macOS env does not support Python yet, - # so we have to set things up manually in install.sh. - env: BUILD=tests,wheels PYTHON_VERSION=3.6.5 - branches: {only: [releases]} - - os: linux - dist: trusty + dist: xenial sudo: false language: python - python: "3.5" - env: BUILD=tests - - - os: linux - dist: trusty - sudo: false - language: python - python: "3.6" - env: BUILD=tests - - - os: linux - dist: trusty - branches: {only: [releases]} - sudo: required - language: python - python: "3.5" - services: [docker] - env: BUILD=tests,wheels,release + python: "3.7-dev" + env: BUILD=tests PYENV=true PYTHON_VERSION=3.7.0b3 cache: pip diff --git a/tests/test_dns.py b/tests/test_dns.py index 47820284..958a03e8 100644 --- a/tests/test_dns.py +++ b/tests/test_dns.py @@ -2,6 +2,7 @@ import unittest from uvloop import _testbase as tb +from uvloop import tracing, TracingCollector class BaseTestDNS: @@ -174,6 +175,34 @@ async def run(): finally: self.loop.close() + def test_getaddrinfo_tracing(self): + + class DNSCollector(TracingCollector): + dns_request_begin_called = False + dns_request_end_called = False + + def dns_request_begin(self, *args): + self.dns_request_begin_called = True + + def dns_request_end(self, *args): + self.dns_request_end_called = True + + collector = DNSCollector() + with tracing(collector): + self.loop.run_until_complete( + self.loop.getaddrinfo('example.com', 80) + ) + assert collector.dns_request_begin_called + assert collector.dns_request_end_called + + collector.dns_request_begin_called = False + collector.dns_request_end_called = False + self.loop.run_until_complete( + self.loop.getaddrinfo('example.com', 80) + ) + assert not collector.dns_request_begin_called + assert not collector.dns_request_end_called + class Test_AIO_DNS(BaseTestDNS, tb.AIOTestCase): pass diff --git a/tests/test_pipes.py b/tests/test_pipes.py index 8bec52b3..6304bc81 100644 --- a/tests/test_pipes.py +++ b/tests/test_pipes.py @@ -1,8 +1,8 @@ import asyncio import io import os +import socket -from asyncio import test_utils from uvloop import _testbase as tb @@ -81,11 +81,11 @@ def connect(): self.loop.run_until_complete(connect()) os.write(wpipe, b'1') - test_utils.run_until(self.loop, lambda: proto.nbytes >= 1) + tb.run_until(self.loop, lambda: proto.nbytes >= 1) self.assertEqual(1, proto.nbytes) os.write(wpipe, b'2345') - test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) + tb.run_until(self.loop, lambda: proto.nbytes >= 5) self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) self.assertEqual(5, proto.nbytes) @@ -114,11 +114,11 @@ def connect(): self.loop.run_until_complete(connect()) os.write(slave, b'1') - test_utils.run_until(self.loop, lambda: proto.nbytes) + tb.run_until(self.loop, lambda: proto.nbytes) self.assertEqual(1, proto.nbytes) os.write(slave, b'2345') - test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) + tb.run_until(self.loop, lambda: proto.nbytes >= 5) self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) self.assertEqual(5, proto.nbytes) @@ -157,11 +157,11 @@ def reader(data): data += chunk return len(data) - test_utils.run_until(self.loop, lambda: reader(data) >= 1) + tb.run_until(self.loop, lambda: reader(data) >= 1) self.assertEqual(b'1', data) transport.write(b'2345') - test_utils.run_until(self.loop, lambda: reader(data) >= 5) + tb.run_until(self.loop, lambda: reader(data) >= 5) self.assertEqual(b'12345', data) self.assertEqual('CONNECTED', proto.state) @@ -176,7 +176,7 @@ def reader(data): self.assertEqual('CLOSED', proto.state) def test_write_pipe_disconnect_on_close(self): - rsock, wsock = test_utils.socketpair() + rsock, wsock = socket.socketpair() rsock.setblocking(False) pipeobj = io.open(wsock.detach(), 'wb', 1024) @@ -222,12 +222,12 @@ def reader(data): data += chunk return len(data) - test_utils.run_until(self.loop, lambda: reader(data) >= 1, + tb.run_until(self.loop, lambda: reader(data) >= 1, timeout=10) self.assertEqual(b'1', data) transport.write(b'2345') - test_utils.run_until(self.loop, lambda: reader(data) >= 5, + tb.run_until(self.loop, lambda: reader(data) >= 5, timeout=10) self.assertEqual(b'12345', data) self.assertEqual('CONNECTED', proto.state) diff --git a/tests/test_process.py b/tests/test_process.py index 51e1970a..fced8562 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -9,7 +9,6 @@ import time import unittest -from asyncio import test_utils from uvloop import _testbase as tb @@ -504,7 +503,7 @@ def cancel_make_transport(): # ignore the log: # "Exception during subprocess creation, kill the subprocess" - with test_utils.disable_logger(): + with tb.disable_logger(): self.loop.run_until_complete(cancel_make_transport()) def test_cancel_post_init(self): @@ -522,9 +521,9 @@ def cancel_make_transport(): # ignore the log: # "Exception during subprocess creation, kill the subprocess" - with test_utils.disable_logger(): + with tb.disable_logger(): self.loop.run_until_complete(cancel_make_transport()) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) class Test_UV_Process(_TestProcess, tb.UVTestCase): diff --git a/tests/test_tasks.py b/tests/test_tasks.py index b237ab67..f157452a 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -2,8 +2,8 @@ import asyncio -from asyncio import test_utils from uvloop import _testbase as tb +from uvloop import tracing, TracingCollector class Dummy: @@ -44,7 +44,7 @@ def notmuch(): '_TestTasks.test_task_repr..notmuch') self.assertEqual(notmuch.__module__, __name__) - filename, lineno = test_utils.get_function_source(notmuch) + filename, lineno = tb.get_function_source(notmuch) src = "%s:%s" % (filename, lineno) # test coroutine object @@ -109,7 +109,7 @@ def task(): return 12 t = self.create_task(task()) - test_utils.run_briefly(self.loop) # start coro + tb.run_briefly(self.loop) # start coro t.cancel() self.assertRaises( asyncio.CancelledError, self.loop.run_until_complete, t) @@ -126,7 +126,7 @@ def task(): return 12 t = self.create_task(task()) - test_utils.run_briefly(self.loop) # start task + tb.run_briefly(self.loop) # start task f.cancel() with self.assertRaises(asyncio.CancelledError): self.loop.run_until_complete(t) @@ -143,7 +143,7 @@ def task(): t = self.create_task(task()) self.assertEqual(asyncio.Task.all_tasks(loop=self.loop), {t}) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) f.cancel() t.cancel() @@ -168,10 +168,10 @@ def task(): return 42 t = self.create_task(task()) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut1) # White-box test. fut1.set_result(None) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut2) # White-box test. t.cancel() self.assertTrue(fut2.cancelled()) @@ -195,14 +195,14 @@ def task(): return res t = self.create_task(task()) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut1) # White-box test. fut1.set_result(None) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut2) # White-box test. t.cancel() self.assertTrue(fut2.cancelled()) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut3) # White-box test. fut3.set_result(42) res = self.loop.run_until_complete(t) @@ -232,7 +232,8 @@ def notmutch(): raise BaseException() task = self.create_task(notmutch()) - self.assertRaises(BaseException, task._step) + with self.assertRaises(BaseException): + tb.run_briefly(self.loop) self.assertTrue(task.done()) self.assertIsInstance(task.exception(), BaseException) @@ -245,7 +246,7 @@ def __init__(self, *args, **kwds): self.cb_added = False super().__init__(*args, **kwds) - def add_done_callback(self, fn): + def add_done_callback(self, fn, context=None): self.cb_added = True super().add_done_callback(fn) @@ -258,12 +259,12 @@ def wait_for_future(): result = yield from fut t = self.create_task(wait_for_future()) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) self.assertTrue(fut.cb_added) res = object() fut.set_result(res) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) self.assertIs(res, result) self.assertTrue(t.done()) self.assertIsNone(t.result()) @@ -356,7 +357,7 @@ def outer(): proof += 10 f = asyncio.ensure_future(outer(), loop=self.loop) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) f.cancel() self.loop.run_until_complete(f) self.assertEqual(proof, 101) @@ -381,12 +382,12 @@ def outer(): proof += 100 f = asyncio.ensure_future(outer(), loop=self.loop) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) f.cancel() self.assertRaises( asyncio.CancelledError, self.loop.run_until_complete, f) waiter.set_result(None) - test_utils.run_briefly(self.loop) + tb.run_briefly(self.loop) self.assertEqual(proof, 1) @@ -402,6 +403,29 @@ def create_future(self): def create_task(self, coro): return self.loop.create_task(coro) + def test_create_task_tracing(self): + + @asyncio.coroutine + def coro(): + pass + + class CreateTaskCollector(TracingCollector): + task_created_called = False + + def task_created(self, *args): + self.task_created_called = True + + collector = CreateTaskCollector() + with tracing(collector): + self.create_task(coro()) + assert collector.task_created_called + + collector.task_created_called = False + self.create_task(coro()) + assert not collector.task_created_called + + tb.run_briefly(self.loop) + class Test_UV_UV_Tasks_AIO_Future(_TestTasks, tb.UVTestCase): def create_future(self): diff --git a/tests/test_tracing.py b/tests/test_tracing.py new file mode 100644 index 00000000..19e1ee3f --- /dev/null +++ b/tests/test_tracing.py @@ -0,0 +1,18 @@ +import unittest + +from uvloop import tracing, TracingCollector + + +class TestTracingCollector(unittest.TestCase): + def test_tracing_methods(self): + collector = TracingCollector() + assert hasattr(collector, 'dns_request_begin') + assert hasattr(collector, 'dns_request_end') + assert hasattr(collector, 'task_created') + + +class TestTracing(unittest.TestCase): + def test_invalid_collector(self): + with self.assertRaises(ValueError): + with tracing(None): + pass diff --git a/tests/test_udp.py b/tests/test_udp.py index b3f79e8b..1e5be9b5 100644 --- a/tests/test_udp.py +++ b/tests/test_udp.py @@ -3,7 +3,6 @@ import unittest import sys -from asyncio import test_utils from uvloop import _testbase as tb @@ -77,9 +76,9 @@ def datagram_received(self, data, addr): self.assertIs(client.transport, transport) transport.sendto(b'xxx') - test_utils.run_until(self.loop, lambda: server.nbytes) + tb.run_until(self.loop, lambda: server.nbytes) self.assertEqual(3, server.nbytes) - test_utils.run_until(self.loop, lambda: client.nbytes) + tb.run_until(self.loop, lambda: client.nbytes) # received self.assertEqual(8, client.nbytes) diff --git a/tests/test_unix.py b/tests/test_unix.py index 87466c10..e9900d3e 100644 --- a/tests/test_unix.py +++ b/tests/test_unix.py @@ -5,6 +5,7 @@ import tempfile import time import unittest +import sys from uvloop import _testbase as tb @@ -362,6 +363,7 @@ async def client(): self.assertIn(excs[0].__class__, (BrokenPipeError, ConnectionResetError)) + @unittest.skipUnless(sys.version_info < (3, 7), 'Python version must be < 3.7') def test_transport_unclosed_warning(self): async def test(sock): return await self.loop.create_unix_connection( diff --git a/uvloop/__init__.py b/uvloop/__init__.py index 8f09dd07..38ea126a 100644 --- a/uvloop/__init__.py +++ b/uvloop/__init__.py @@ -5,9 +5,10 @@ from . import includes as __includes # NOQA from . import _patch # NOQA from .loop import Loop as __BaseLoop # NOQA +from .loop import tracing, TracingCollector -__all__ = ('new_event_loop', 'EventLoopPolicy') +__all__ = ('new_event_loop', 'EventLoopPolicy', 'tracing', 'TracingCollector') class Loop(__BaseLoop, asyncio.AbstractEventLoop): diff --git a/uvloop/_testbase.py b/uvloop/_testbase.py index 22ebda08..680bc8dd 100644 --- a/uvloop/_testbase.py +++ b/uvloop/_testbase.py @@ -5,6 +5,7 @@ import asyncio.events import collections import contextlib +import functools import gc import logging import os @@ -12,14 +13,17 @@ import re import select import socket +import sys import ssl import tempfile import threading import time +import inspect import unittest import uvloop + class MockPattern(str): def __eq__(self, other): return bool(re.search(str(self), other, re.S)) @@ -483,3 +487,78 @@ def _handle_client(self, sock): @property def addr(self): return self._sock.getsockname() + + +def run_briefly(loop): + async def once(): + pass + gen = once() + t = loop.create_task(gen) + # Don't log a warning if the task is not done after run_until_complete(). + # It occurs if the loop is stopped or if a task raises a BaseException. + t._log_destroy_pending = False + try: + loop.run_until_complete(t) + finally: + gen.close() + + +def run_until(loop, pred, timeout=30): + deadline = time.time() + timeout + while not pred(): + if timeout is not None: + timeout = deadline - time.time() + if timeout <= 0: + raise asyncio.futures.TimeoutError() + loop.run_until_complete(asyncio.tasks.sleep(0.001, loop=loop)) + + +def run_once(loop): + """Legacy API to run once through the event loop. + + This is the recommended pattern for test code. It will poll the + selector once and run all callbacks scheduled in response to I/O + events. + """ + loop.call_soon(loop.stop) + loop.run_forever() + + +@contextlib.contextmanager +def disable_logger(): + """Context manager to disable asyncio logger. + + For example, it can be used to ignore warnings in debug mode. + """ + old_level = asyncio.log.logger.level + try: + asyncio.log.logger.setLevel(logging.CRITICAL+1) + yield + finally: + asyncio.log.logger.setLevel(old_level) + + +def mock_nonblocking_socket(proto=socket.IPPROTO_TCP, type=socket.SOCK_STREAM, + family=socket.AF_INET): + """Create a mock of a non-blocking socket.""" + sock = mock.MagicMock(socket.socket) + sock.proto = proto + sock.type = type + sock.family = family + sock.gettimeout.return_value = 0.0 + return sock + + +def get_function_source(func): + if sys.version_info >= (3, 4): + func = inspect.unwrap(func) + elif hasattr(func, '__wrapped__'): + func = func.__wrapped__ + if inspect.isfunction(func): + code = func.__code__ + return (code.co_filename, code.co_firstlineno) + if isinstance(func, functools.partial): + return _get_function_source(func.func) + if compat.PY34 and isinstance(func, functools.partialmethod): + return _get_function_source(func.func) + return None diff --git a/uvloop/includes/stdlib.pxi b/uvloop/includes/stdlib.pxi index c09cec57..94c2dd7c 100644 --- a/uvloop/includes/stdlib.pxi +++ b/uvloop/includes/stdlib.pxi @@ -38,10 +38,10 @@ cdef aio_iscoroutinefunction = asyncio.iscoroutinefunction cdef aio_BaseProtocol = asyncio.BaseProtocol cdef aio_Protocol = asyncio.Protocol cdef aio_SSLProtocol = asyncio.sslproto.SSLProtocol -cdef aio_debug_wrapper = asyncio.coroutines.debug_wrapper cdef aio_isfuture = getattr(asyncio, 'isfuture', None) cdef aio_get_running_loop = getattr(asyncio, '_get_running_loop', None) cdef aio_set_running_loop = getattr(asyncio, '_set_running_loop', None) +cdef aio_debug_wrapper = getattr(asyncio.coroutines, 'debug_wrapper', None) cdef col_deque = collections.deque cdef col_Iterable = collections.Iterable diff --git a/uvloop/loop.pxd b/uvloop/loop.pxd index c715e028..c13fd6f6 100644 --- a/uvloop/loop.pxd +++ b/uvloop/loop.pxd @@ -29,11 +29,14 @@ ctypedef object (*method2_t)(object, object, object) ctypedef object (*method3_t)(object, object, object, object) + cdef class Loop: cdef: uv.uv_loop_t *uvloop - bint _coroutine_wrapper_set + + bint _coroutine_debug_set + int _coroutine_origin_tracking_saved_depth public slow_callback_duration @@ -194,7 +197,7 @@ cdef class Loop: cdef _read_from_self(self) cdef _process_self_data(self, data) - cdef _set_coroutine_wrapper(self, bint enabled) + cdef _set_coroutine_debug(self, bint enabled) cdef _print_debug_info(self) @@ -219,3 +222,5 @@ include "request.pxd" include "handles/udp.pxd" include "server.pxd" + +include "tracing.pxd" diff --git a/uvloop/loop.pyx b/uvloop/loop.pyx index 1e505675..4d53470c 100644 --- a/uvloop/loop.pyx +++ b/uvloop/loop.pyx @@ -38,6 +38,7 @@ include "includes/stdlib.pxi" include "errors.pyx" + cdef _is_sock_stream(sock_type): if SOCK_NONBLOCK == -1: return sock_type == uv.SOCK_STREAM @@ -152,7 +153,7 @@ cdef class Loop: self._signal_handlers = {} self._listening_signals = False - self._coroutine_wrapper_set = False + self._coroutine_debug_set = False if hasattr(sys, 'get_asyncgen_hooks'): # Python >= 3.6 @@ -778,12 +779,17 @@ cdef class Loop: except Exception as ex: if not fut.cancelled(): fut.set_exception(ex) + __trace_dns_request_end(None) else: if not fut.cancelled(): fut.set_result(data) + __trace_dns_request_end(data) else: if not fut.cancelled(): fut.set_exception(result) + __trace_dns_request_end(result) + + __trace_dns_request_begin(host, port, family, type, proto, flags) AddrInfoRequest(self, host, port, family, type, proto, flags, callback) return fut @@ -970,32 +976,45 @@ cdef class Loop: if err < 0: raise convert_error(-errno.errno) - cdef _set_coroutine_wrapper(self, bint enabled): + cdef _set_coroutine_debug(self, bint enabled): enabled = bool(enabled) - if self._coroutine_wrapper_set == enabled: + if self._coroutine_debug_set == enabled: return - wrapper = aio_debug_wrapper - current_wrapper = sys_get_coroutine_wrapper() - - if enabled: - if current_wrapper not in (None, wrapper): - warnings.warn( - "loop.set_debug(True): cannot set debug coroutine " - "wrapper; another wrapper is already set %r" % - current_wrapper, RuntimeWarning) + if sys_version_info >= (3, 7, 0): + if enabled: + self._coroutine_origin_tracking_saved_depth = ( + sys.get_coroutine_origin_tracking_depth()) + sys.set_coroutine_origin_tracking_depth( + DEBUG_STACK_DEPTH) else: - sys_set_coroutine_wrapper(wrapper) - self._coroutine_wrapper_set = True + sys.set_coroutine_origin_tracking_depth( + self._coroutine_origin_tracking_saved_depth) + + self._coroutine_debug_set = enabled else: - if current_wrapper not in (None, wrapper): - warnings.warn( - "loop.set_debug(False): cannot unset debug coroutine " - "wrapper; another wrapper was set %r" % - current_wrapper, RuntimeWarning) + wrapper = aio_debug_wrapper + current_wrapper = sys_get_coroutine_wrapper() + + if enabled: + if current_wrapper not in (None, wrapper): + warnings.warn( + "loop.set_debug(True): cannot set debug coroutine " + "wrapper; another wrapper is already set %r" % + current_wrapper, RuntimeWarning) + else: + sys_set_coroutine_wrapper(wrapper) + self._coroutine_debug_set = True else: - sys_set_coroutine_wrapper(None) - self._coroutine_wrapper_set = False + if current_wrapper not in (None, wrapper): + warnings.warn( + "loop.set_debug(False): cannot unset debug coroutine " + "wrapper; another wrapper was set %r" % + current_wrapper, RuntimeWarning) + else: + sys_set_coroutine_wrapper(None) + self._coroutine_debug_set = False + cdef _create_server(self, system.sockaddr *addr, object protocol_factory, @@ -1151,7 +1170,7 @@ cdef class Loop: self.is_closed(), self.get_debug()) - def call_soon(self, callback, *args): + def call_soon(self, callback, *args, context=None): """Arrange for a callback to be called as soon as possible. This operates as a FIFO queue: callbacks are called in the @@ -1168,7 +1187,7 @@ cdef class Loop: else: return self._call_soon(callback, None) - def call_soon_threadsafe(self, callback, *args): + def call_soon_threadsafe(self, callback, *args, context=None): """Like call_soon(), but thread-safe.""" if not args: args = None @@ -1176,7 +1195,7 @@ cdef class Loop: self.handler_async.send() return handle - def call_later(self, delay, callback, *args): + def call_later(self, delay, callback, *args, context=None): """Arrange for a callback to be called at a given time. Return a Handle: an opaque object with a cancel() method that @@ -1213,7 +1232,7 @@ cdef class Loop: else: return self._call_later(when, callback, args) - def call_at(self, when, callback, *args): + def call_at(self, when, callback, *args, context=None): """Like call_later(), but uses an absolute time. Absolute time corresponds to the event loop's time() method. @@ -1251,7 +1270,7 @@ cdef class Loop: # loop.stop() was called right before loop.run_forever(). # This is how asyncio loop behaves. mode = uv.UV_RUN_NOWAIT - self._set_coroutine_wrapper(self._debug) + self._set_coroutine_debug(self._debug) if self._asyncgens is not None: old_agen_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, @@ -1259,7 +1278,7 @@ cdef class Loop: try: self._run(mode) finally: - self._set_coroutine_wrapper(False) + self._set_coroutine_debug(False) if self._asyncgens is not None: sys.set_asyncgen_hooks(*old_agen_hooks) @@ -1280,7 +1299,7 @@ cdef class Loop: def set_debug(self, enabled): self._debug = bool(enabled) if self.is_running(): - self._set_coroutine_wrapper(self._debug) + self.call_soon_threadsafe(self._set_coroutine_debug, self, self._debug) def is_running(self): """Return whether the event loop is currently running.""" @@ -1304,6 +1323,7 @@ cdef class Loop: task = aio_Task(coro, loop=self) else: task = self._task_factory(self, coro) + __trace_task_created(task) return task def set_task_factory(self, factory): @@ -2773,6 +2793,8 @@ include "handles/udp.pyx" include "server.pyx" +include "tracing.pyx" + # Used in UVProcess cdef vint __atfork_installed = 0 diff --git a/uvloop/tracing.pxd b/uvloop/tracing.pxd new file mode 100644 index 00000000..64dcab4c --- /dev/null +++ b/uvloop/tracing.pxd @@ -0,0 +1,10 @@ +cdef class TracingCollector: + cpdef dns_request_begin(self, object arg1, object arg2, object arg3, + object arg4, object arg5, object arg6) + cpdef dns_request_end(self, object arg1) + cpdef task_created(self, object arg1) + +cdef __trace_dns_request_end(object arg1) +cdef __trace_dns_request_begin(object arg1, object arg2, object arg3, + object arg4, object arg5, object arg6) +cdef __trace_task_created(object arg1) diff --git a/uvloop/tracing.pyx b/uvloop/tracing.pyx new file mode 100644 index 00000000..80dd9f5d --- /dev/null +++ b/uvloop/tracing.pyx @@ -0,0 +1,43 @@ +import contextvars +from contextlib import contextmanager + + +__tracing_ctx = contextvars.ContextVar('__tracing_ctx') + + +cdef class TracingCollector: + cpdef dns_request_begin(self, arg1, arg2, arg3, arg4, arg5, arg6): + pass + + cpdef dns_request_end(self, arg1): + pass + + cpdef task_created(self, arg1): + pass + + +@contextmanager +def tracing(collector): + if not isinstance(collector, TracingCollector): + raise ValueError("collector must be a TracingCollector class instance") + + __tracing_ctx.set(collector) + yield + __tracing_ctx.set(None) + + +cdef inline __trace_dns_request_end(arg1): + cdef TracingCollector collector = __tracing_ctx.get(None) + if collector: + collector.dns_request_end(arg1) + + +cdef inline __trace_dns_request_begin(arg1, arg2, arg3, arg4, arg5, arg6): + cdef TracingCollector collector = __tracing_ctx.get(None) + if collector: + collector.dns_request_begin(arg1, arg2, arg3, arg4, arg5, arg6) + +cdef inline __trace_task_created(arg1): + cdef TracingCollector collector = __tracing_ctx.get(None) + if collector: + collector.task_created(arg1)