Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions Lib/_pyrepl/_module_completer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

if TYPE_CHECKING:
from typing import Any, Iterable, Iterator, Mapping
from .types import CompletionAction


HARDCODED_SUBMODULES = {
Expand Down Expand Up @@ -52,11 +53,17 @@ class ModuleCompleter:
def __init__(self, namespace: Mapping[str, Any] | None = None) -> None:
self.namespace = namespace or {}
self._global_cache: list[pkgutil.ModuleInfo] = []
self._failed_imports: set[str] = set()
self._curr_sys_path: list[str] = sys.path[:]
self._stdlib_path = os.path.dirname(importlib.__path__[0])

def get_completions(self, line: str) -> list[str] | None:
"""Return the next possible import completions for 'line'."""
def get_completions(self, line: str) -> tuple[list[str], CompletionAction | None] | None:
"""Return the next possible import completions for 'line'.

For attributes completion, if the module to complete from is not
imported, also return an action (prompt + callback to run if the
user press TAB again) to import the module.
"""
result = ImportParser(line).parse()
if not result:
return None
Expand All @@ -65,24 +72,26 @@ def get_completions(self, line: str) -> list[str] | None:
except Exception:
# Some unexpected error occurred, make it look like
# no completions are available
return []
return [], None

def complete(self, from_name: str | None, name: str | None) -> list[str]:
def complete(self, from_name: str | None, name: str | None) -> tuple[list[str], CompletionAction | None]:
if from_name is None:
# import x.y.z<tab>
assert name is not None
path, prefix = self.get_path_and_prefix(name)
modules = self.find_modules(path, prefix)
return [self.format_completion(path, module) for module in modules]
return [self.format_completion(path, module) for module in modules], None

if name is None:
# from x.y.z<tab>
path, prefix = self.get_path_and_prefix(from_name)
modules = self.find_modules(path, prefix)
return [self.format_completion(path, module) for module in modules]
return [self.format_completion(path, module) for module in modules], None

# from x.y import z<tab>
return self.find_modules(from_name, name)
submodules = self.find_modules(from_name, name)
attributes, action = self.find_attributes(from_name, name)
return sorted({*submodules, *attributes}), action

def find_modules(self, path: str, prefix: str) -> list[str]:
"""Find all modules under 'path' that start with 'prefix'."""
Expand Down Expand Up @@ -129,6 +138,33 @@ def _is_stdlib_module(self, module_info: pkgutil.ModuleInfo) -> bool:
return (isinstance(module_info.module_finder, FileFinder)
and module_info.module_finder.path == self._stdlib_path)

def find_attributes(self, path: str, prefix: str) -> tuple[list[str], CompletionAction | None]:
"""Find all attributes of module 'path' that start with 'prefix'."""
attributes, action = self._find_attributes(path, prefix)
# Filter out invalid attribute names
# (for example those containing dashes that cannot be imported with 'import')
return [attr for attr in attributes if attr.isidentifier()], action

def _find_attributes(self, path: str, prefix: str) -> tuple[list[str], CompletionAction | None]:
if path.startswith('.'):
# Convert relative path to absolute path
package = self.namespace.get('__package__', '')
path = self.resolve_relative_name(path, package) # type: ignore[assignment]
if path is None:
return [], None

imported_module = sys.modules.get(path)
if not imported_module:
if path in self._failed_imports: # Do not propose to import again
return [], None
return [], self._get_import_completion_action(path)
try:
module_attributes = dir(imported_module)
except Exception:
module_attributes = []
return [attr_name for attr_name in module_attributes
if self.is_suggestion_match(attr_name, prefix)], None

def is_suggestion_match(self, module_name: str, prefix: str) -> bool:
if prefix:
return module_name.startswith(prefix)
Expand Down Expand Up @@ -200,6 +236,21 @@ def global_cache(self) -> list[pkgutil.ModuleInfo]:
self._global_cache = list(pkgutil.iter_modules())
return self._global_cache

def _get_import_completion_action(self, path: str) -> CompletionAction:
prompt = ("[ module not imported, press again to import it "
"and propose attributes ]")

def _do_import() -> str | None:
try:
importlib.import_module(path)
return None
except Exception as exc:
sys.modules.pop(path, None) # Clean half-imported module
self._failed_imports.add(path)
return f"[ error during import: {exc} ]"

return (prompt, _do_import)


class ImportParser:
"""
Expand Down
34 changes: 28 additions & 6 deletions Lib/_pyrepl/completing_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@

# types
Command = commands.Command
if False:
from .types import KeySpec, CommandName
TYPE_CHECKING = False
if TYPE_CHECKING:
from .types import KeySpec, CommandName, CompletionAction


def prefix(wordlist: list[str], j: int = 0) -> str:
Expand Down Expand Up @@ -168,15 +169,24 @@ def do(self) -> None:
r: CompletingReader
r = self.reader # type: ignore[assignment]
last_is_completer = r.last_command_is(self.__class__)
if r.cmpltn_action:
if last_is_completer: # double-tab: execute action
msg = r.cmpltn_action[1]()
if msg:
r.msg = msg
else: # other input since last tab: cancel action
r.cmpltn_action = None

immutable_completions = r.assume_immutable_completions
completions_unchangable = last_is_completer and immutable_completions
stem = r.get_stem()
if not completions_unchangable:
r.cmpltn_menu_choices = r.get_completions(stem)
r.cmpltn_menu_choices, r.cmpltn_action = r.get_completions(stem)

completions = r.cmpltn_menu_choices
if not completions:
r.error("no matches")
if not r.cmpltn_action:
r.error("no matches")
elif len(completions) == 1:
if completions_unchangable and len(completions[0]) == len(stem):
r.msg = "[ sole completion ]"
Expand All @@ -202,6 +212,16 @@ def do(self) -> None:
r.msg = "[ not unique ]"
r.dirty = True

if r.cmpltn_action:
if r.msg and r.cmpltn_message_visible:
# There is already a message (eg. [ not unique ]) that
# would conflict for next tab: cancel action
r.cmpltn_action = None
else:
r.msg = r.cmpltn_action[0]
r.cmpltn_message_visible = True
r.dirty = True


class self_insert(commands.self_insert):
def do(self) -> None:
Expand Down Expand Up @@ -240,6 +260,7 @@ class CompletingReader(Reader):
cmpltn_message_visible: bool = field(init=False)
cmpltn_menu_end: int = field(init=False)
cmpltn_menu_choices: list[str] = field(init=False)
cmpltn_action: CompletionAction | None = field(init=False)

def __post_init__(self) -> None:
super().__post_init__()
Expand Down Expand Up @@ -281,6 +302,7 @@ def cmpltn_reset(self) -> None:
self.cmpltn_message_visible = False
self.cmpltn_menu_end = 0
self.cmpltn_menu_choices = []
self.cmpltn_action = None

def get_stem(self) -> str:
st = self.syntax_table
Expand All @@ -291,8 +313,8 @@ def get_stem(self) -> str:
p -= 1
return ''.join(b[p+1:self.pos])

def get_completions(self, stem: str) -> list[str]:
return []
def get_completions(self, stem: str) -> tuple[list[str], CompletionAction | None]:
return [], None

def get_line(self) -> str:
"""Return the current line until the cursor position."""
Expand Down
8 changes: 6 additions & 2 deletions Lib/_pyrepl/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,13 @@ def calc_screen(self) -> list[str]:
self.screeninfo = screeninfo
self.cxy = self.pos2xy()
if self.msg:
width = self.console.width
for mline in self.msg.split("\n"):
screen.append(mline)
screeninfo.append((0, []))
# If self.msg is larger that console width, make it fit
# TODO: try to split between words?
for r in range((len(mline) - 1) // width + 1):
screen.append(mline[r * width : (r + 1) * width:])
screeninfo.append((0, []))

self.last_refresh_cache.update_cache(self, screen, screeninfo)
return screen
Expand Down
10 changes: 5 additions & 5 deletions Lib/_pyrepl/readline.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
# types
Command = commands.Command
from collections.abc import Callable, Collection
from .types import Callback, Completer, KeySpec, CommandName
from .types import Callback, Completer, KeySpec, CommandName, CompletionAction

TYPE_CHECKING = False

Expand Down Expand Up @@ -134,7 +134,7 @@ def get_stem(self) -> str:
p -= 1
return "".join(b[p + 1 : self.pos])

def get_completions(self, stem: str) -> list[str]:
def get_completions(self, stem: str) -> tuple[list[str], CompletionAction | None]:
module_completions = self.get_module_completions()
if module_completions is not None:
return module_completions
Expand All @@ -144,7 +144,7 @@ def get_completions(self, stem: str) -> list[str]:
while p > 0 and b[p - 1] != "\n":
p -= 1
num_spaces = 4 - ((self.pos - p) % 4)
return [" " * num_spaces]
return [" " * num_spaces], None
result = []
function = self.config.readline_completer
if function is not None:
Expand All @@ -165,9 +165,9 @@ def get_completions(self, stem: str) -> list[str]:
# emulate the behavior of the standard readline that sorts
# the completions before displaying them.
result.sort()
return result
return result, None

def get_module_completions(self) -> list[str] | None:
def get_module_completions(self) -> tuple[list[str], CompletionAction | None] | None:
line = self.get_line()
return self.config.module_completer.get_completions(line)

Expand Down
1 change: 1 addition & 0 deletions Lib/_pyrepl/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
type Completer = Callable[[str, int], str | None]
type CharBuffer = list[str]
type CharWidths = list[int]
type CompletionAction = tuple[str, Callable[[], str | None]]
Loading
Loading