Source code for asyncssh.agent

# Copyright (c) 2016-2023 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
#    GNU General Public License, Version 2.0, or any later versions of
#    that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation

"""SSH agent client"""

import asyncio
import errno
import os
import sys
from types import TracebackType
from typing import TYPE_CHECKING, List, Optional, Sequence, Tuple, Type, Union
from typing_extensions import Protocol

from .listener import SSHForwardListener
from .misc import async_context_manager, maybe_wait_closed
from .packet import Byte, String, UInt32, PacketDecodeError, SSHPacket
from .public_key import KeyPairListArg, SSHCertificate, SSHKeyPair
from .public_key import load_default_keypairs, load_keypairs

if TYPE_CHECKING:
    from tempfile import TemporaryDirectory


class AgentReader(Protocol):
    """Protocol for reading from an SSH agent"""

    async def readexactly(self, n: int) -> bytes:
        """Read exactly n bytes from the SSH agent"""


class AgentWriter(Protocol):
    """Protocol for writing to an SSH agent"""

    def write(self, data: bytes) -> None:
        """Write bytes to the SSH agent"""

    def close(self) -> None:
        """Close connection to the SSH agent"""

    async def wait_closed(self) -> None:
        """Wait for the connection to the SSH agent to close"""


try:
    if sys.platform == 'win32': # pragma: no cover
        from .agent_win32 import open_agent
    else:
        from .agent_unix import open_agent
except ImportError as _exc: # pragma: no cover
    async def open_agent(agent_path: str) -> \
            Tuple[AgentReader, AgentWriter]:
        """Dummy function if we're unable to import agent support"""

        raise OSError(errno.ENOENT, 'Agent support unavailable: %s' % str(_exc))


class _SupportsOpenAgentConnection(Protocol):
    """A class that supports open_agent_connection"""

    async def open_agent_connection(self) -> Tuple[AgentReader, AgentWriter]:
        """Open a forwarded ssh-agent connection back to the client"""


_AgentPath = Union[str, _SupportsOpenAgentConnection]


# Client request message numbers
SSH_AGENTC_REQUEST_IDENTITIES            = 11
SSH_AGENTC_SIGN_REQUEST                  = 13
SSH_AGENTC_ADD_IDENTITY                  = 17
SSH_AGENTC_REMOVE_IDENTITY               = 18
SSH_AGENTC_REMOVE_ALL_IDENTITIES         = 19
SSH_AGENTC_ADD_SMARTCARD_KEY             = 20
SSH_AGENTC_REMOVE_SMARTCARD_KEY          = 21
SSH_AGENTC_LOCK                          = 22
SSH_AGENTC_UNLOCK                        = 23
SSH_AGENTC_ADD_ID_CONSTRAINED            = 25
SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED = 26
SSH_AGENTC_EXTENSION                     = 27

# Agent response message numbers
SSH_AGENT_FAILURE                        = 5
SSH_AGENT_SUCCESS                        = 6
SSH_AGENT_IDENTITIES_ANSWER              = 12
SSH_AGENT_SIGN_RESPONSE                  = 14
SSH_AGENT_EXTENSION_FAILURE              = 28

# SSH agent constraint numbers
SSH_AGENT_CONSTRAIN_LIFETIME             = 1
SSH_AGENT_CONSTRAIN_CONFIRM              = 2
SSH_AGENT_CONSTRAIN_EXTENSION            = 255

# SSH agent signature flags
SSH_AGENT_RSA_SHA2_256                   = 2
SSH_AGENT_RSA_SHA2_512                   = 4


[docs] class SSHAgentKeyPair(SSHKeyPair): """Surrogate for a key managed by the SSH agent""" _key_type = 'agent' def __init__(self, agent: 'SSHAgentClient', algorithm: bytes, public_data: bytes, comment: bytes): is_cert = algorithm.endswith(b'-cert-v01@openssh.com') if is_cert: if algorithm.startswith(b'sk-'): sig_algorithm = algorithm[:-21] + b'@openssh.com' else: sig_algorithm = algorithm[:-21] else: sig_algorithm = algorithm # Neither Pageant nor the Win10 OpenSSH agent seems to support the # ssh-agent protocol flags used to request RSA SHA2 signatures yet if sig_algorithm == b'ssh-rsa' and sys.platform != 'win32': sig_algorithms: Sequence[bytes] = \ (b'rsa-sha2-256', b'rsa-sha2-512', b'ssh-rsa') else: sig_algorithms = (sig_algorithm,) if is_cert: host_key_algorithms: Sequence[bytes] = (algorithm,) else: host_key_algorithms = sig_algorithms super().__init__(algorithm, sig_algorithm, sig_algorithms, host_key_algorithms, public_data, comment) self._agent = agent self._is_cert = is_cert self._flags = 0 @property def has_cert(self) -> bool: """ Return if this key pair has an associated cert""" return self._is_cert @property def has_x509_chain(self) -> bool: """ Return if this key pair has an associated X.509 cert chain""" return False def set_certificate(self, cert: SSHCertificate) -> None: """Set certificate to use with this key""" super().set_certificate(cert) self._is_cert = True def set_sig_algorithm(self, sig_algorithm: bytes) -> None: """Set the signature algorithm to use when signing data""" super().set_sig_algorithm(sig_algorithm) if sig_algorithm in (b'rsa-sha2-256', b'x509v3-rsa2048-sha256'): self._flags |= SSH_AGENT_RSA_SHA2_256 elif sig_algorithm == b'rsa-sha2-512': self._flags |= SSH_AGENT_RSA_SHA2_512 async def sign_async(self, data: bytes) -> bytes: """Asynchronously sign a block of data with this private key""" return await self._agent.sign(self.key_public_data, data, self._flags)
[docs] async def remove(self) -> None: """Remove this key pair from the agent""" await self._agent.remove_keys([self])
[docs] class SSHAgentClient: """SSH agent client""" def __init__(self, agent_path: _AgentPath): self._agent_path = agent_path self._reader: Optional[AgentReader] = None self._writer: Optional[AgentWriter] = None self._lock = asyncio.Lock() async def __aenter__(self) -> 'SSHAgentClient': """Allow SSHAgentClient to be used as an async context manager""" return self async def __aexit__(self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType]) -> bool: """Wait for connection close when used as an async context manager""" await self._cleanup() return False async def _cleanup(self) -> None: """Clean up this SSH agent client""" self.close() await self.wait_closed() @staticmethod def encode_constraints(lifetime: Optional[int], confirm: bool) -> bytes: """Encode key constraints""" result = b'' if lifetime: result += Byte(SSH_AGENT_CONSTRAIN_LIFETIME) + UInt32(lifetime) if confirm: result += Byte(SSH_AGENT_CONSTRAIN_CONFIRM) return result async def connect(self) -> None: """Connect to the SSH agent""" if isinstance(self._agent_path, str): self._reader, self._writer = await open_agent(self._agent_path) else: self._reader, self._writer = \ await self._agent_path.open_agent_connection() async def _make_request(self, msgtype: int, *args: bytes) -> \ Tuple[int, SSHPacket]: """Send an SSH agent request""" async with self._lock: try: if not self._writer: await self.connect() reader = self._reader writer = self._writer assert reader is not None assert writer is not None payload = Byte(msgtype) + b''.join(args) writer.write(UInt32(len(payload)) + payload) resplen = int.from_bytes((await reader.readexactly(4)), 'big') resp = SSHPacket((await reader.readexactly(resplen))) resptype = resp.get_byte() return resptype, resp except (OSError, EOFError, PacketDecodeError) as exc: await self._cleanup() raise ValueError(str(exc)) from None
[docs] async def get_keys(self, identities: Optional[Sequence[bytes]] = None) -> \ Sequence[SSHKeyPair]: """Request the available client keys This method is a coroutine which returns a list of client keys available in the ssh-agent. :param identities: (optional) A list of allowed byte string identities to return. If empty, all identities on the SSH agent will be returned. :returns: A list of :class:`SSHKeyPair` objects """ resptype, resp = \ await self._make_request(SSH_AGENTC_REQUEST_IDENTITIES) if resptype == SSH_AGENT_IDENTITIES_ANSWER: result: List[SSHKeyPair] = [] num_keys = resp.get_uint32() for _ in range(num_keys): key_blob = resp.get_string() comment = resp.get_string() if identities and key_blob not in identities: continue packet = SSHPacket(key_blob) algorithm = packet.get_string() result.append(SSHAgentKeyPair(self, algorithm, key_blob, comment)) resp.check_end() return result else: raise ValueError('Unknown SSH agent response: %d' % resptype)
async def sign(self, key_blob: bytes, data: bytes, flags: int = 0) -> bytes: """Sign a block of data with the requested key""" resptype, resp = await self._make_request(SSH_AGENTC_SIGN_REQUEST, String(key_blob), String(data), UInt32(flags)) if resptype == SSH_AGENT_SIGN_RESPONSE: sig = resp.get_string() resp.check_end() return sig elif resptype == SSH_AGENT_FAILURE: raise ValueError('Unable to sign with requested key') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
[docs] async def add_keys(self, keylist: KeyPairListArg = (), passphrase: Optional[str] = None, lifetime: Optional[int] = None, confirm: bool = False) -> None: """Add keys to the agent This method adds a list of local private keys and optional matching certificates to the agent. :param keylist: (optional) The list of keys to add. If not specified, an attempt will be made to load keys from the files :file:`.ssh/id_ed25519_sk`, :file:`.ssh/id_ecdsa_sk`, :file:`.ssh/id_ed448`, :file:`.ssh/id_ed25519`, :file:`.ssh/id_ecdsa`, :file:`.ssh/id_rsa` and :file:`.ssh/id_dsa` in the user's home directory with optional matching certificates loaded from the files :file:`.ssh/id_ed25519_sk-cert.pub`, :file:`.ssh/id_ecdsa_sk-cert.pub`, :file:`.ssh/id_ed448-cert.pub`, :file:`.ssh/id_ed25519-cert.pub`, :file:`.ssh/id_ecdsa-cert.pub`, :file:`.ssh/id_rsa-cert.pub`, and :file:`.ssh/id_dsa-cert.pub`. Failures when adding keys are ignored in this case, as the agent may not recognize some of these key types. :param passphrase: (optional) The passphrase to use to decrypt the keys. :param lifetime: (optional) The time in seconds after which the keys should be automatically deleted, or `None` to store these keys indefinitely (the default). :param confirm: (optional) Whether or not to require confirmation for each private key operation which uses these keys, defaulting to `False`. :type keylist: *see* :ref:`SpecifyingPrivateKeys` :type passphrase: `str` :type lifetime: `int` or `None` :type confirm: `bool` :raises: :exc:`ValueError` if the keys cannot be added """ if keylist: keypairs = load_keypairs(keylist, passphrase) ignore_failures = False else: keypairs = load_default_keypairs(passphrase) ignore_failures = True base_constraints = self.encode_constraints(lifetime, confirm) provider = os.environ.get('SSH_SK_PROVIDER') or 'internal' sk_constraints = Byte(SSH_AGENT_CONSTRAIN_EXTENSION) + \ String('sk-provider@openssh.com') + \ String(provider) for keypair in keypairs: constraints = base_constraints if keypair.algorithm.startswith(b'sk-'): constraints += sk_constraints msgtype = SSH_AGENTC_ADD_ID_CONSTRAINED if constraints else \ SSH_AGENTC_ADD_IDENTITY comment = keypair.get_comment_bytes() resptype, resp = \ await self._make_request(msgtype, keypair.get_agent_private_key(), String(comment or b''), constraints) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: if not ignore_failures: raise ValueError('Unable to add key') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
[docs] async def add_smartcard_keys(self, provider: str, pin: Optional[str] = None, lifetime: Optional[int] = None, confirm: bool = False) -> None: """Store keys associated with a smart card in the agent :param provider: The name of the smart card provider :param pin: (optional) The PIN to use to unlock the smart card :param lifetime: (optional) The time in seconds after which the keys should be automatically deleted, or `None` to store these keys indefinitely (the default). :param confirm: (optional) Whether or not to require confirmation for each private key operation which uses these keys, defaulting to `False`. :type provider: `str` :type pin: `str` or `None` :type lifetime: `int` or `None` :type confirm: `bool` :raises: :exc:`ValueError` if the keys cannot be added """ constraints = self.encode_constraints(lifetime, confirm) msgtype = SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED \ if constraints else SSH_AGENTC_ADD_SMARTCARD_KEY resptype, resp = await self._make_request(msgtype, String(provider), String(pin or ''), constraints) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Unable to add keys') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
[docs] async def remove_keys(self, keylist: Sequence[SSHKeyPair]) -> None: """Remove a key stored in the agent :param keylist: The list of keys to remove. :type keylist: `list` of :class:`SSHKeyPair` :raises: :exc:`ValueError` if any keys are not found """ for keypair in keylist: resptype, resp = \ await self._make_request(SSH_AGENTC_REMOVE_IDENTITY, String(keypair.public_data)) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Key not found') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
[docs] async def remove_smartcard_keys(self, provider: str, pin: Optional[str] = None) -> None: """Remove keys associated with a smart card stored in the agent :param provider: The name of the smart card provider :param pin: (optional) The PIN to use to unlock the smart card :type provider: `str` :type pin: `str` or `None` :raises: :exc:`ValueError` if the keys are not found """ resptype, resp = \ await self._make_request(SSH_AGENTC_REMOVE_SMARTCARD_KEY, String(provider), String(pin or '')) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Keys not found') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
[docs] async def remove_all(self) -> None: """Remove all keys stored in the agent :raises: :exc:`ValueError` if the keys can't be removed """ resptype, resp = \ await self._make_request(SSH_AGENTC_REMOVE_ALL_IDENTITIES) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Unable to remove all keys') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
[docs] async def lock(self, passphrase: str) -> None: """Lock the agent using the specified passphrase .. note:: The lock and unlock actions don't appear to be supported on the Windows 10 OpenSSH agent. :param passphrase: The passphrase required to later unlock the agent :type passphrase: `str` :raises: :exc:`ValueError` if the agent can't be locked """ resptype, resp = await self._make_request(SSH_AGENTC_LOCK, String(passphrase)) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Unable to lock SSH agent') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
[docs] async def unlock(self, passphrase: str) -> None: """Unlock the agent using the specified passphrase .. note:: The lock and unlock actions don't appear to be supported on the Windows 10 OpenSSH agent. :param passphrase: The passphrase to use to unlock the agent :type passphrase: `str` :raises: :exc:`ValueError` if the agent can't be unlocked """ resptype, resp = await self._make_request(SSH_AGENTC_UNLOCK, String(passphrase)) if resptype == SSH_AGENT_SUCCESS: resp.check_end() elif resptype == SSH_AGENT_FAILURE: raise ValueError('Unable to unlock SSH agent') else: raise ValueError('Unknown SSH agent response: %d' % resptype)
[docs] async def query_extensions(self) -> Sequence[str]: """Return a list of extensions supported by the agent :returns: A list of strings of supported extension names """ resptype, resp = await self._make_request(SSH_AGENTC_EXTENSION, String('query')) if resptype == SSH_AGENT_SUCCESS: result = [] while resp: exttype = resp.get_string() try: exttype_str = exttype.decode('utf-8') except UnicodeDecodeError: raise ValueError('Invalid extension type name') from None result.append(exttype_str) return result elif resptype == SSH_AGENT_FAILURE: return [] else: raise ValueError('Unknown SSH agent response: %d' % resptype)
[docs] def close(self) -> None: """Close the SSH agent connection This method closes the connection to the ssh-agent. Any attempts to use this :class:`SSHAgentClient` or the key pairs it previously returned will result in an error. """ if self._writer: self._writer.close()
[docs] async def wait_closed(self) -> None: """Wait for this agent connection to close This method is a coroutine which can be called to block until the connection to the agent has finished closing. """ if self._writer: await maybe_wait_closed(self._writer) self._reader = None self._writer = None
class SSHAgentListener: """Listener used to forward agent connections""" def __init__(self, tempdir: 'TemporaryDirectory[str]', path: str, unix_listener: SSHForwardListener): self._tempdir = tempdir self._path = path self._unix_listener = unix_listener def get_path(self) -> str: """Return the path being listened on""" return self._path def close(self) -> None: """Close the agent listener""" self._unix_listener.close() self._tempdir.cleanup()
[docs] @async_context_manager async def connect_agent(agent_path: _AgentPath = '') -> 'SSHAgentClient': """Make a connection to the SSH agent This function attempts to connect to an ssh-agent process listening on a UNIX domain socket at `agent_path`. If not provided, it will attempt to get the path from the `SSH_AUTH_SOCK` environment variable. If the connection is successful, an :class:`SSHAgentClient` object is returned that has methods on it you can use to query the ssh-agent. If no path is specified and the environment variable is not set or the connection to the agent fails, an error is raised. :param agent_path: (optional) The path to use to contact the ssh-agent process, or the :class:`SSHServerConnection` to forward the agent request over. :type agent_path: `str` or :class:`SSHServerConnection` :returns: An :class:`SSHAgentClient` :raises: :exc:`OSError` or :exc:`ChannelOpenError` if the connection to the agent can't be opened """ if not agent_path: agent_path = os.environ.get('SSH_AUTH_SOCK', '') agent = SSHAgentClient(agent_path) await agent.connect() return agent