From b56722e3db682433fa49c2afe58a535f56ee531d Mon Sep 17 00:00:00 2001 From: Perry Melange Date: Thu, 11 Sep 2025 12:56:01 +0200 Subject: [PATCH] ShellDriver: add optional arg dest_authorized_keys Add an addition optional arg "dest_authorized_keys" with the default value of ```~/.ssh/authorized_keys```, An example where this might be used is when an embedded system uses dropbear as it's ssh server daemon. In such a case, the authorized_keys file is ```/etc/dropbear/authorized_keys``` Additionally, the ```put_key_file``` takes an optional ```dest_authorized_keys``` function parameter which defaults to the above arg. Fixes: ##1626 Signed-off-by: Perry Melange --- doc/configuration.rst | 2 ++ labgrid/driver/shelldriver.py | 42 ++++++++++++++++++++--------------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/doc/configuration.rst b/doc/configuration.rst index d93e58c90..4f1d651e6 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -1796,6 +1796,8 @@ Arguments: Can be an empty string. - keyfile (str): optional, keyfile to upload after login, making the `SSHDriver`_ usable + - dest_authorized_keys (str): optional, default="~/.ssh/authorized_keys", + filename of the authorized_keys file - login_timeout (int, default=60): timeout for login prompt detection in seconds - await_login_timeout (int, default=2): time in seconds of silence that needs diff --git a/labgrid/driver/shelldriver.py b/labgrid/driver/shelldriver.py index 55799ac43..2091edee2 100644 --- a/labgrid/driver/shelldriver.py +++ b/labgrid/driver/shelldriver.py @@ -1,6 +1,7 @@ # pylint: disable=unused-argument """The ShellDriver provides the CommandProtocol, ConsoleProtocol and InfoProtocol on top of a SerialPort.""" +import os import io import re import shlex @@ -34,6 +35,7 @@ class ShellDriver(CommandMixin, Driver, CommandProtocol, FileTransferProtocol): username (str): username to login with password (str): password to login with keyfile (str): keyfile to bind mount over users authorized keys + dest_authorized_keys (str): optional, default="~/.ssh/authorized_keys", filename of the authorized_keys file login_timeout (int): optional, timeout for login prompt detection console_ready (regex): optional, pattern used by the kernel to inform the user that a console can be activated by pressing enter. @@ -49,6 +51,7 @@ class ShellDriver(CommandMixin, Driver, CommandProtocol, FileTransferProtocol): username = attr.ib(validator=attr.validators.instance_of(str)) password = attr.ib(default=None, validator=attr.validators.optional(attr.validators.instance_of(str))) keyfile = attr.ib(default="", validator=attr.validators.instance_of(str)) + dest_authorized_keys = attr.ib(default="~/.ssh/authorized_keys", validator=attr.validators.instance_of(str)) login_timeout = attr.ib(default=60, validator=attr.validators.instance_of(int)) console_ready = attr.ib(default="", validator=attr.validators.instance_of(str)) await_login_timeout = attr.ib(default=2, validator=attr.validators.instance_of(int)) @@ -72,7 +75,7 @@ def on_activate(self): if self.target.env: keyfile_path = self.target.env.config.resolve_path(self.keyfile) - self._put_ssh_key(keyfile_path) + self._put_ssh_key(keyfile_path, self.dest_authorized_keys) def on_deactivate(self): self._status = 0 @@ -217,8 +220,8 @@ def _write_key(self, keyline, dest): self._run_check(f'echo -n "{part}" >> {dest}') self._run_check(f'echo "" >> {dest}') - @step(args=['keyfile_path']) - def _put_ssh_key(self, keyfile_path): + @step(args=['keyfile_path', 'dest_authorized_keys']) + def _put_ssh_key(self, keyfile_path, dest_authorized_keys): """Upload an SSH Key to a target""" regex = re.compile( r"""ssh-(rsa|ed25519) @@ -236,7 +239,8 @@ def _put_ssh_key(self, keyfile_path): f"Could not parse SSH-Key from file: {keyfile}" ) self.logger.debug("Read Key: %s", new_key) - auth_keys, _, read_keys = self._run("cat ~/.ssh/authorized_keys") + dest_authorized_keys_dir = os.path.dirname(dest_authorized_keys) + auth_keys, _, read_keys = self._run(f"""cat {self.dest_authorized_keys}""") self.logger.debug("Exitcode trying to read keys: %s, keys: %s", read_keys, auth_keys) result = [] _, _, test_write = self._run("touch ~/.test") @@ -258,35 +262,37 @@ def _put_ssh_key(self, keyfile_path): if test_write == 0 and read_keys == 0: self.logger.debug("Key not on target and writeable, concatenating...") - self._write_key(keyline, "~/.ssh/authorized_keys") + self._write_key(keyline, dest_authorized_keys) self._run_check("rm ~/.test") return if test_write == 0: - self.logger.debug("Key not on target, testing for .ssh directory") - _, _, ssh_dir = self._run("[ -d ~/.ssh/ ]") + self.logger.debug("Key not on target, testing for % directory", dest_authorized_keys_dir) + _, _, ssh_dir = self._run(f"""[ -d {dest_authorized_keys_dir} ]""") if ssh_dir != 0: - self.logger.debug("~/.ssh did not exist, creating") - self._run("mkdir ~/.ssh/") - self._run_check("chmod 700 ~/.ssh/") - self.logger.debug("Creating ~/.ssh/authorized_keys") - self._run_check("touch ~/.ssh/authorized_keys") - self._write_key(keyline, "~/.ssh/authorized_keys") + self.logger.debug(" % did not exits, creating", dest_authorized_keys_dir) + self._run(f"""mkdir -p {dest_authorized_keys_dir}""") + self._run_check(f"""chmod 700 {dest_authorized_keys_dir}""") + self.logger.debug("Creating %s", dest_authorized_keys) + self._write_key(keyline, dest_authorized_keys) self._run_check("rm ~/.test") return self.logger.debug("Key not on target and not writeable, using bind mount...") self._run_check('mkdir -p -m 700 /tmp/labgrid-ssh/') - self._run("cp -a ~/.ssh/* /tmp/labgrid-ssh/") + self._run(f"""cp -a {dest_authorized_keys_dir}/* /tmp/labgrid-ssh/""") self._write_key(keyline, "/tmp/labgrid-ssh/authorized_keys") self._run_check('chmod 600 /tmp/labgrid-ssh/authorized_keys') - out, err, exitcode = self._run('mount --bind /tmp/labgrid-ssh/ ~/.ssh/') + out, err, exitcode = self._run(f"""mount --bind /tmp/labgrid-ssh/ {dest_authorized_keys_dir}""") if exitcode != 0: - self.logger.warning("Could not bind mount ~/.ssh directory: %s %s", out, err) + self.logger.warning("Could not bind mount %s directory: %s %s", + dest_authorized_keys_dir, out, err) @Driver.check_active - def put_ssh_key(self, keyfile_path): - self._put_ssh_key(keyfile_path) + def put_ssh_key(self, keyfile_path, dest_authorized_keys = None): + if dest_authorized_keys is None: + dest_authorized_keys = self.dest_authorized_keys + self._put_ssh_key(keyfile_path, dest_authorized_keys) def _xmodem_getc(self, size, timeout=10): """ called by the xmodem.XMODEM instance to read protocol data from the console """