Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 59 additions & 3 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from binascii import b2a_hex
from collections import deque
from io import StringIO, TextIOBase
from threading import local
from typing import Any, Callable, Deque, Optional
from weakref import WeakSet

Expand Down Expand Up @@ -403,6 +404,7 @@ def __init__(
self.echo = None
self._isatty = bool(isatty)
self._should_watch = False
self._local = local()

if (
watchfd
Expand Down Expand Up @@ -525,11 +527,19 @@ def _flush(self):
# There should be a better way to do this.
self.session.pid = os.getpid()
content = {"name": self.name, "text": data}
msg = self.session.msg("stream", content, parent=self.parent_header)

# Each transform either returns a new
# message or None. If None is returned,
# the message has been 'used' and we return.
for hook in self._hooks:
msg = hook(msg)
if msg is None:
return

self.session.send(
self.pub_thread,
"stream",
content=content,
parent=self.parent_header,
msg,
ident=self.topic,
)

Expand Down Expand Up @@ -601,3 +611,49 @@ def _rotate_buffer(self):
old_buffer = self._buffer
self._buffer = StringIO()
return old_buffer

@property
def _hooks(self):
if not hasattr(self._local, "hooks"):
# create new list for a new thread
self._local.hooks = []
return self._local.hooks

def register_hook(self, hook):
"""
Registers a hook with the thread-local storage.

Parameters
----------
hook : Any callable object

Returns
-------
Either a publishable message, or `None`.
The hook callable must return a message from
the __call__ method if they still require the
`session.send` method to be called after transformation.
Returning `None` will halt that execution path, and
session.send will not be called.
"""
self._hooks.append(hook)

def unregister_hook(self, hook):
"""
Un-registers a hook with the thread-local storage.

Parameters
----------
hook : Any callable object which has previously been
registered as a hook.

Returns
-------
bool - `True` if the hook was removed, `False` if it wasn't
found.
"""
try:
self._hooks.remove(hook)
return True
except ValueError:
return False