|
29 | 29 | import sys
|
30 | 30 | import warnings
|
31 | 31 | import weakref
|
32 |
| -from contextlib import contextmanager |
33 |
| -from heapq import heappop |
34 | 32 |
|
35 | 33 | try:
|
36 | 34 | import ssl
|
@@ -388,10 +386,7 @@ async def wait_closed(self):
|
388 | 386 |
|
389 | 387 | class BaseEventLoop(events.AbstractEventLoop):
|
390 | 388 |
|
391 |
| - _is_proactorloop = False |
392 |
| - |
393 | 389 | def __init__(self):
|
394 |
| - self._num_runs_pending = 0 |
395 | 390 | self._timer_cancelled_count = 0
|
396 | 391 | self._closed = False
|
397 | 392 | self._stopping = False
|
@@ -586,75 +581,75 @@ def _do_shutdown(self, future):
|
586 | 581 | except Exception as ex:
|
587 | 582 | self.call_soon_threadsafe(future.set_exception, ex)
|
588 | 583 |
|
589 |
| - def _check_running(self): |
590 |
| - pass |
591 |
| - |
592 |
| - @contextmanager |
593 |
| - def manage_run(self): |
594 |
| - """Set up the loop for running.""" |
| 584 | + def _check_running(self, running_ok=False): |
| 585 | + if self.is_running(): |
| 586 | + raise RuntimeError('This event loop is already running') |
| 587 | + if not running_ok and events._get_running_loop() is not None: |
| 588 | + raise RuntimeError( |
| 589 | + 'Cannot run the event loop while another loop is running') |
| 590 | + |
| 591 | + def run_forever(self, running_ok=False): |
| 592 | + """Run until stop() is called.""" |
595 | 593 | self._check_closed()
|
| 594 | + self._check_running(running_ok=running_ok) |
| 595 | + self._set_coroutine_origin_tracking(self._debug) |
596 | 596 | old_thread_id = self._thread_id
|
597 | 597 | old_running_loop = events._get_running_loop()
|
| 598 | + self._thread_id = threading.get_ident() |
| 599 | + |
| 600 | + old_agen_hooks = sys.get_asyncgen_hooks() |
| 601 | + sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, |
| 602 | + finalizer=self._asyncgen_finalizer_hook) |
598 | 603 | try:
|
599 |
| - self._thread_id = threading.get_ident() |
600 | 604 | events._set_running_loop(self)
|
601 |
| - self._num_runs_pending += 1 |
602 |
| - if self._is_proactorloop: |
603 |
| - if self._self_reading_future is None: |
604 |
| - self.call_soon(self._loop_self_reading) |
605 |
| - yield |
| 605 | + while True: |
| 606 | + self._run_once() |
| 607 | + if self._stopping: |
| 608 | + break |
606 | 609 | finally:
|
| 610 | + self._stopping = False |
607 | 611 | self._thread_id = old_thread_id
|
608 | 612 | events._set_running_loop(old_running_loop)
|
609 |
| - self._num_runs_pending -= 1 |
610 |
| - if self._is_proactorloop: |
611 |
| - if (self._num_runs_pending == 0 |
612 |
| - and self._self_reading_future is not None): |
613 |
| - ov = self._self_reading_future._ov |
614 |
| - self._self_reading_future.cancel() |
615 |
| - if ov is not None: |
616 |
| - self._proactor._unregister(ov) |
617 |
| - self._self_reading_future = None |
618 |
| - |
619 |
| - @contextmanager |
620 |
| - def manage_asyncgens(self): |
621 |
| - if not hasattr(sys, 'get_asyncgen_hooks'): |
622 |
| - # Python version is too old. |
623 |
| - return |
624 |
| - old_agen_hooks = sys.get_asyncgen_hooks() |
625 |
| - try: |
626 |
| - self._set_coroutine_origin_tracking(self._debug) |
627 |
| - if self._asyncgens is not None: |
628 |
| - sys.set_asyncgen_hooks( |
629 |
| - firstiter=self._asyncgen_firstiter_hook, |
630 |
| - finalizer=self._asyncgen_finalizer_hook) |
631 |
| - yield |
632 |
| - finally: |
633 | 613 | self._set_coroutine_origin_tracking(False)
|
634 |
| - if self._asyncgens is not None: |
635 |
| - sys.set_asyncgen_hooks(*old_agen_hooks) |
| 614 | + sys.set_asyncgen_hooks(*old_agen_hooks) |
636 | 615 |
|
637 |
| - def run_forever(self): |
638 |
| - with self.manage_run(), self.manage_asyncgens(): |
639 |
| - while True: |
640 |
| - self._run_once() |
641 |
| - if self._stopping: |
642 |
| - break |
643 |
| - self._stopping = False |
| 616 | + def run_until_complete(self, future, running_ok=False): |
| 617 | + """Run until the Future is done. |
644 | 618 |
|
645 |
| - def run_until_complete(self, future): |
646 |
| - with self.manage_run(): |
647 |
| - f = tasks.ensure_future(future, loop=self) |
648 |
| - if f is not future: |
649 |
| - f._log_destroy_pending = False |
650 |
| - while not f.done(): |
651 |
| - self._run_once() |
652 |
| - if self._stopping: |
653 |
| - break |
654 |
| - if not f.done(): |
655 |
| - raise RuntimeError( |
656 |
| - 'Event loop stopped before Future completed.') |
657 |
| - return f.result() |
| 619 | + If the argument is a coroutine, it is wrapped in a Task. |
| 620 | +
|
| 621 | + WARNING: It would be disastrous to call run_until_complete() |
| 622 | + with the same coroutine twice -- it would wrap it in two |
| 623 | + different Tasks and that can't be good. |
| 624 | +
|
| 625 | + Return the Future's result, or raise its exception. |
| 626 | + """ |
| 627 | + self._check_closed() |
| 628 | + self._check_running(running_ok=running_ok) |
| 629 | + |
| 630 | + new_task = not futures.isfuture(future) |
| 631 | + future = tasks.ensure_future(future, loop=self) |
| 632 | + if new_task: |
| 633 | + # An exception is raised if the future didn't complete, so there |
| 634 | + # is no need to log the "destroy pending task" message |
| 635 | + future._log_destroy_pending = False |
| 636 | + |
| 637 | + future.add_done_callback(_run_until_complete_cb) |
| 638 | + try: |
| 639 | + self.run_forever(running_ok=running_ok) |
| 640 | + except: |
| 641 | + if new_task and future.done() and not future.cancelled(): |
| 642 | + # The coroutine raised a BaseException. Consume the exception |
| 643 | + # to not log a warning, the caller doesn't have access to the |
| 644 | + # local task. |
| 645 | + future.exception() |
| 646 | + raise |
| 647 | + finally: |
| 648 | + future.remove_done_callback(_run_until_complete_cb) |
| 649 | + if not future.done(): |
| 650 | + raise RuntimeError('Event loop stopped before Future completed.') |
| 651 | + |
| 652 | + return future.result() |
658 | 653 |
|
659 | 654 | def stop(self):
|
660 | 655 | """Stop running the event loop.
|
@@ -1839,35 +1834,82 @@ def _timer_handle_cancelled(self, handle):
|
1839 | 1834 | self._timer_cancelled_count += 1
|
1840 | 1835 |
|
1841 | 1836 | def _run_once(self):
|
| 1837 | + """Run one full iteration of the event loop. |
| 1838 | +
|
| 1839 | + This calls all currently ready callbacks, polls for I/O, |
| 1840 | + schedules the resulting callbacks, and finally schedules |
| 1841 | + 'call_later' callbacks. |
1842 | 1842 | """
|
1843 |
| - Simplified re-implementation of asyncio's _run_once that |
1844 |
| - runs handles as they become ready. |
1845 |
| - """ |
1846 |
| - ready = self._ready |
1847 |
| - scheduled = self._scheduled |
1848 |
| - while scheduled and scheduled[0]._cancelled: |
1849 |
| - heappop(scheduled) |
1850 |
| - |
1851 |
| - timeout = ( |
1852 |
| - 0 if ready or self._stopping |
1853 |
| - else min(max( |
1854 |
| - scheduled[0]._when - self.time(), 0), 86400) if scheduled |
1855 |
| - else None) |
| 1843 | + |
| 1844 | + sched_count = len(self._scheduled) |
| 1845 | + if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and |
| 1846 | + self._timer_cancelled_count / sched_count > |
| 1847 | + _MIN_CANCELLED_TIMER_HANDLES_FRACTION): |
| 1848 | + # Remove delayed calls that were cancelled if their number |
| 1849 | + # is too high |
| 1850 | + new_scheduled = [] |
| 1851 | + for handle in self._scheduled: |
| 1852 | + if handle._cancelled: |
| 1853 | + handle._scheduled = False |
| 1854 | + else: |
| 1855 | + new_scheduled.append(handle) |
| 1856 | + |
| 1857 | + heapq.heapify(new_scheduled) |
| 1858 | + self._scheduled = new_scheduled |
| 1859 | + self._timer_cancelled_count = 0 |
| 1860 | + else: |
| 1861 | + # Remove delayed calls that were cancelled from head of queue. |
| 1862 | + while self._scheduled and self._scheduled[0]._cancelled: |
| 1863 | + self._timer_cancelled_count -= 1 |
| 1864 | + handle = heapq.heappop(self._scheduled) |
| 1865 | + handle._scheduled = False |
| 1866 | + |
| 1867 | + timeout = None |
| 1868 | + if self._ready or self._stopping: |
| 1869 | + timeout = 0 |
| 1870 | + elif self._scheduled: |
| 1871 | + # Compute the desired timeout. |
| 1872 | + when = self._scheduled[0]._when |
| 1873 | + timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) |
| 1874 | + |
1856 | 1875 | event_list = self._selector.select(timeout)
|
1857 | 1876 | self._process_events(event_list)
|
1858 | 1877 |
|
| 1878 | + # Handle 'later' callbacks that are ready. |
1859 | 1879 | end_time = self.time() + self._clock_resolution
|
1860 |
| - while scheduled and scheduled[0]._when < end_time: |
1861 |
| - handle = heappop(scheduled) |
1862 |
| - ready.append(handle) |
1863 |
| - |
1864 |
| - for _ in range(len(ready)): |
1865 |
| - if not ready: |
| 1880 | + while self._scheduled: |
| 1881 | + handle = self._scheduled[0] |
| 1882 | + if handle._when >= end_time: |
1866 | 1883 | break
|
1867 |
| - handle = ready.popleft() |
1868 |
| - if not handle._cancelled: |
| 1884 | + handle = heapq.heappop(self._scheduled) |
| 1885 | + handle._scheduled = False |
| 1886 | + self._ready.append(handle) |
| 1887 | + |
| 1888 | + # This is the only place where callbacks are actually *called*. |
| 1889 | + # All other places just add them to ready. |
| 1890 | + # Note: We run all currently scheduled callbacks, but not any |
| 1891 | + # callbacks scheduled by callbacks run this time around -- |
| 1892 | + # they will be run the next time (after another I/O poll). |
| 1893 | + # Use an idiom that is thread-safe without using locks. |
| 1894 | + ntodo = len(self._ready) |
| 1895 | + for i in range(ntodo): |
| 1896 | + handle = self._ready.popleft() |
| 1897 | + if handle._cancelled: |
| 1898 | + continue |
| 1899 | + if self._debug: |
| 1900 | + try: |
| 1901 | + self._current_handle = handle |
| 1902 | + t0 = self.time() |
| 1903 | + handle._run() |
| 1904 | + dt = self.time() - t0 |
| 1905 | + if dt >= self.slow_callback_duration: |
| 1906 | + logger.warning('Executing %s took %.3f seconds', |
| 1907 | + _format_handle(handle), dt) |
| 1908 | + finally: |
| 1909 | + self._current_handle = None |
| 1910 | + else: |
1869 | 1911 | handle._run()
|
1870 |
| - handle = None |
| 1912 | + handle = None # Needed to break cycles when an exception occurs. |
1871 | 1913 |
|
1872 | 1914 | def _set_coroutine_origin_tracking(self, enabled):
|
1873 | 1915 | if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
|
|
0 commit comments