Source code for asyncssh.subprocess

# Copyright (c) 2019-2023 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
#    GNU General Public License, Version 2.0, or any later versions of
#    that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation

"""SSH subprocess handlers"""

from typing import TYPE_CHECKING, Any, AnyStr, Callable
from typing import Dict, Generic, Iterable, Optional

from .constants import EXTENDED_DATA_STDERR
from .process import SSHClientProcess
from .session import DataType


if TYPE_CHECKING:
    # pylint: disable=cyclic-import
    from .channel import SSHChannel, SSHClientChannel


SubprocessFactory = Callable[[], 'SSHSubprocessProtocol']


class SSHSubprocessPipe(Generic[AnyStr]):
    """SSH subprocess pipe"""

    def __init__(self, chan: 'SSHClientChannel[AnyStr]',
                 datatype: DataType = None):
        self._chan: 'SSHClientChannel[AnyStr]' = chan
        self._datatype = datatype

    def close(self) -> None:
        """Shut down the remote process"""

        self._chan.close()

    def get_extra_info(self, name: str, default: Any = None) -> Any:
        """Return additional information about the remote process

           This method returns extra information about the channel
           associated with this subprocess. See :meth:`get_extra_info()
           <SSHClientChannel.get_extra_info>` on :class:`SSHClientChannel`
           for additional information.

        """

        return self._chan.get_extra_info(name, default)


[docs] class SSHSubprocessReadPipe(SSHSubprocessPipe[AnyStr]): """SSH subprocess pipe reader"""
[docs] def pause_reading(self) -> None: """Pause delivery of incoming data from the remote process""" self._chan.pause_reading()
[docs] def resume_reading(self) -> None: """Resume delivery of incoming data from the remote process""" self._chan.resume_reading()
[docs] class SSHSubprocessWritePipe(SSHSubprocessPipe[AnyStr]): """SSH subprocess pipe writer"""
[docs] def abort(self) -> None: """Forcibly close the channel to the remote process""" self._chan.abort()
[docs] def can_write_eof(self) -> bool: """Return whether the pipe supports :meth:`write_eof`""" return self._chan.can_write_eof()
[docs] def get_write_buffer_size(self) -> int: """Return the current size of the pipe's output buffer""" return self._chan.get_write_buffer_size()
[docs] def set_write_buffer_limits(self, high: Optional[int] = None, low: Optional[int] = None) -> None: """Set the high- and low-water limits for write flow control""" self._chan.set_write_buffer_limits(high, low)
[docs] def write(self, data: AnyStr) -> None: """Write data on this pipe""" self._chan.write(data, self._datatype)
[docs] def writelines(self, list_of_data: Iterable[AnyStr]) -> None: """Write a list of data bytes on this pipe""" self._chan.writelines(list_of_data, self._datatype)
[docs] def write_eof(self) -> None: """Write EOF on this pipe""" self._chan.write_eof()
[docs] class SSHSubprocessProtocol(Generic[AnyStr]): """SSH subprocess protocol This class conforms to :class:`asyncio.SubprocessProtocol`, but with the following enhancement: * If encoding is set when the subprocess is created, all data passed to :meth:`pipe_data_received` will be string values containing Unicode data. However, for compatibility with :class:`asyncio.SubprocessProtocol`, encoding defaults to `None`, in which case all data is delivered as bytes. """
[docs] def connection_made(self, transport: 'SSHSubprocessTransport[AnyStr]') -> None: """Called when a remote process is successfully started This method is called when a remote process is successfully started. The transport parameter should be stored if needed for later use. :param transport: The transport to use to communicate with the remote process. :type transport: :class:`SSHSubprocessTransport` """
[docs] def pipe_data_received(self, fd: int, data: AnyStr) -> None: """Called when data is received from the remote process This method is called when data is received from the remote process. If an encoding was specified when the process was started, the data will be delivered as a string after decoding with the requested encoding. Otherwise, the data will be delivered as bytes. :param fd: The integer file descriptor of the pipe data was received on. This will be 1 for stdout or 2 for stderr. :param data: The data received from the remote process :type fd: `int` :type data: `str` or `bytes` """
[docs] def pipe_connection_lost(self, fd: int, exc: Optional[Exception]) -> None: """Called when the pipe to a remote process is closed This method is called when a pipe to a remote process is closed. If the channel is shut down cleanly, *exc* will be `None`. Otherwise, it will be an exception explaining the reason the pipe was closed. :param fd: The integer file descriptor of the pipe which was closed. This will be 1 for stdout or 2 for stderr. :param exc: The exception which caused the channel to close, or `None` if the channel closed cleanly. :type fd: `int` :type exc: :class:`Exception` or `None` """
[docs] def process_exited(self) -> None: """Called when a remote process has exited This method is called when the remote process has exited. Exit status information can be retrieved by calling :meth:`get_returncode() <SSHSubprocessTransport.get_returncode>` on the transport provided in :meth:`connection_made`. """
[docs] class SSHSubprocessTransport(SSHClientProcess[AnyStr]): """SSH subprocess transport This class conforms to :class:`asyncio.SubprocessTransport`, but with the following enhancements: * All functionality available through :class:`SSHClientProcess` is also available here, such as the ability to dynamically redirect stdin, stdout, and stderr at any time during the lifetime of the process. * If encoding is set when the subprocess is created, all data written to the transports created by :meth:`get_pipe_transport` should be strings containing Unicode data. The encoding defaults to `None`, though, to preserve compatibility with :class:`asyncio.SubprocessTransport`, which expects data to be written as bytes. """ _chan: 'SSHClientChannel[AnyStr]' def __init__(self, protocol_factory: SubprocessFactory): super().__init__() self._pipes: Dict[int, SSHSubprocessPipe[AnyStr]] = {} self._protocol: SSHSubprocessProtocol[AnyStr] = protocol_factory() def get_protocol(self) -> SSHSubprocessProtocol[AnyStr]: """Return the subprocess protocol associated with this transport""" return self._protocol def connection_made(self, chan: 'SSHChannel[AnyStr]') -> None: """Handle a newly opened channel""" super().connection_made(chan) self._protocol.connection_made(self) self._pipes = { 0: SSHSubprocessWritePipe(self._chan), 1: SSHSubprocessReadPipe(self._chan), 2: SSHSubprocessReadPipe(self._chan, EXTENDED_DATA_STDERR) } def session_started(self) -> None: """Override SSHClientProcess to avoid creating SSHReader/SSHWriter streams, since this class uses read/write pipe objects instead""" def connection_lost(self, exc: Optional[Exception]) -> None: """Handle an incoming channel close""" self._protocol.pipe_connection_lost(1, exc) self._protocol.pipe_connection_lost(2, exc) super().connection_lost(exc) def data_received(self, data: AnyStr, datatype: DataType) -> None: """Handle incoming data from the remote process""" writer = self._writers.get(datatype) if writer: writer.write(data) else: fd = 2 if datatype == EXTENDED_DATA_STDERR else 1 self._protocol.pipe_data_received(fd, data) def exit_status_received(self, status: int) -> None: """Handle exit status for the remote process""" super().exit_status_received(status) self._protocol.process_exited() def exit_signal_received(self, signal: str, core_dumped: bool, msg: str, lang: str) -> None: """Handle exit signal for the remote process""" super().exit_signal_received(signal, core_dumped, msg, lang) self._protocol.process_exited()
[docs] def get_pid(self) -> Optional[int]: """Return the PID of the remote process This method always returns `None`, since SSH doesn't report remote PIDs. """ # pylint: disable=no-self-use return None
[docs] def get_pipe_transport(self, fd: int) -> \ Optional[SSHSubprocessPipe[AnyStr]]: """Return a transport for the requested stream :param fd: The integer file descriptor (0-2) to return the transport for, where 0 means stdin, 1 means stdout, and 2 means stderr. :type fd: `int` :returns: an :class:`SSHSubprocessReadPipe` or :class:`SSHSubprocessWritePipe` """ return self._pipes.get(fd)
[docs] def get_returncode(self) -> Optional[int]: """Return the exit status or signal for the remote process This method returns the exit status of the session if one has been sent. If an exit signal was sent, this method returns the negative of the numeric value of that signal, matching the behavior of :meth:`asyncio.SubprocessTransport.get_returncode`. If neither has been sent, this method returns `None`. :returns: `int` or `None` """ return self.returncode