Source code for asyncssh.sshsig

# Copyright (c) 2026 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
#    GNU General Public License, Version 2.0, or any later versions of
#    that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation

"""Support for creating and validating SSH signatures"""


import binascii
import time

from hashlib import sha256, sha512
from pathlib import PurePath
from typing import List, Optional, Sequence, Union, cast

from .misc import BytesOrFilePath, FilePath, OptionsParser
from .misc import open_file, read_file, match_base64, wrap_base64, parse_time
from .packet import String, UInt32, PacketDecodeError, SSHPacket
from .pattern import WildcardPatternList
from .public_key import CERT_TYPE_ANY, KeyImportError, KeyPairListArg
from .public_key import SSHKey, SSHOpenSSHCertificate
from .public_key import decode_ssh_public_key, decode_ssh_certificate
from .public_key import import_public_key, load_keypairs


_SSHSIG_MAGIC = b'SSHSIG'
_SSHSIG_VERSION = 1

SSHAllowedSignersArg = Union[bytes, str, Sequence[str], 'SSHAllowedSigners']

_hashes = {b'sha256': sha256, b'sha512': sha512}


class SSHAllowedSignersEntry(OptionsParser):
    """An entry in an SSH allowed_senders list"""

    def __init__(self, line: str):
        super().__init__()

        try:
            principals, line = line.split(None, 1)
        except ValueError:
            raise ValueError('Missing public key in allowed_signers') from None

        self.principals = WildcardPatternList(principals)

        try:
            self.key = import_public_key(line)
            return
        except KeyImportError:
            pass

        line = self._parse_options(line)
        self.key = import_public_key(line)

    def _set_pattern(self, option: str, value: str) -> None:
        """Set an option with a wildcard pattern value"""

        self.options[option] = WildcardPatternList(value)

    def _set_time(self, option: str, value: str) -> None:
        """Set an option with a time value"""

        self.options[option] = parse_time(value)

    _handlers = {
        'namespaces':   _set_pattern,
        'valid-after':  _set_time,
        'valid-before': _set_time
    }

    def match_options(self, principal: str, namespace: str):
        """Match options in entry"""

        if not self.principals.matches(principal):
            return False

        namespaces = cast(Optional[WildcardPatternList],
                          self.options.get('namespaces'))

        if namespaces is not None and not namespaces.matches(namespace):
            return False

        now = time.time()

        valid_after = cast(Optional[int], self.options.get('valid-after'))
        if valid_after is not None and now < valid_after:
            return False

        valid_before = cast(Optional[int], self.options.get('valid-before'))
        if valid_before is not None and now >= valid_before:
            return False

        return True


[docs] class SSHAllowedSigners: """An OpenSSH-compatible allowed signers list""" def __init__(self, allowed_signers: Optional[str] = None): self._key_entries: List[SSHAllowedSignersEntry] = [] self._cert_entries: List[SSHAllowedSignersEntry] = [] if allowed_signers: self.load(allowed_signers) def load(self, allowed_signers: str) -> None: """Load allowed signers data into this object""" for line in allowed_signers.splitlines(): line = line.strip() if not line or line.startswith('#'): continue try: entry = SSHAllowedSignersEntry(line) except KeyImportError: continue if 'cert-authority' in entry.options: self._cert_entries.append(entry) else: self._key_entries.append(entry) if not self._key_entries and not self._cert_entries: raise ValueError('No valid entries found in ' 'allowed_signers') def validate(self, key: SSHKey, principal: str, namespace: str, ca: bool = False) -> bool: """Return whether a public key or CA is valid for SSHSIG signing""" for entry in self._cert_entries if ca else self._key_entries: if entry.key == key and entry.match_options(principal, namespace): return True return False
def _signed_data(data: BytesOrFilePath, is_hashed: bool, hash_name: bytes, namespace: str) -> bytes: """Return the data to be signed/verified""" try: hash_alg = _hashes[hash_name] except KeyError: raise ValueError('Unsupported hash algorithm') from None if not namespace: raise ValueError('Namespace must be a non-empty string') if isinstance(data, (PurePath, str)): h = hash_alg() with open_file(data, 'rb') as f: while chunk := f.read(8192): h.update(chunk) data = h.digest() elif not is_hashed: data = hash_alg(data).digest() elif len(data) != hash_alg().digest_size: raise ValueError('Incorrect hash size') return _SSHSIG_MAGIC + String(namespace) + String(b'') + \ String(hash_name) + String(data)
[docs] def import_allowed_signers(data: str) -> SSHAllowedSigners: """Import SSH allowed signers for SSHSIG This function imports public keys and associated options in OpenSSH allowed signers format. :param data: The allowed signers data to import. :type data: `str` :returns: An :class:`SSHAllowedSigners` object """ return SSHAllowedSigners(data)
[docs] def read_allowed_signers(filelist: Union[FilePath, Sequence[FilePath]]) -> \ SSHAllowedSigners: """Read SSH allowed signers for SSHSIG from a file or list of files This function reads public keys and associated options in OpenSSH allowed signers format from a file or list of files. :param filelist: The file or list of files to read allowed signers from. :type filelist: `PurePath`, `str`, or a list of these :returns: An :class:`SSHAllowedSigners` object """ allowed_signers = SSHAllowedSigners() if isinstance(filelist, (PurePath, str)): files: Sequence[FilePath] = [filelist] else: files = filelist for filename in files: allowed_signers.load(read_file(filename, 'r')) return allowed_signers
[docs] def create_sshsig(key: KeyPairListArg, data: BytesOrFilePath, *, is_hashed: bool = False, hash_name: str = 'sha512', namespace: str = 'file', raw: bool = False) -> bytes: """Create an SSHSIG signature This function creates and returns an SSHSIG for a block of data signed with the requested private key. :param key: The signing key to use. If this corresponds to multiple keys, the first one will be used. :param data: The data bytes to sign, or a string filename of where to read the data from. :param is_hashed: (optional) Whether or not hashing has already been performed on the data passed in. This can be useful when signing large blocks of data which have already had a hash calculated on them. If set to `True`, the `data` argument must be a byte string of the length required by the specified `hash_name`. This defualts to `False`, meaning that hashing will be performed on the data before signing. :param hash_name: (optional) The name of the hash algorithm to use. This can currently be `'sha256'` or `'sha512'`, defaulting to `'sha512'`. :param namespace: (optional) The namespace the hash should be created for, defaulting to `'file'`. :param raw: (optional) Whether to return the signature as a raw SSH blob or as a "armored" base64 encoded data with a PEM-style header and footer. This defaults to `False`, which returns the signature in standard "amored" format. :type key: *see* :ref:`SpecifyingPrivateKeys` :type data: `bytes` or `str` :type is_hashed: `bool` :type hash_name: `str` :type namespace: `str` :type raw: `bool` :returns: `bytes` """ try: keypair = load_keypairs(key)[0] except IndexError: raise ValueError('No signing key specified') from None if keypair.has_x509_chain: # pragma: no cover raise ValueError('X.509 certificates not supported') hash_name = hash_name.encode('utf-8') if keypair.sig_algorithm == b'ssh-rsa': keypair.set_sig_algorithm(b'rsa-sha2-' + hash_name[-3:]) data_to_sign = _signed_data(data, is_hashed, hash_name, namespace) sig = _SSHSIG_MAGIC + UInt32(_SSHSIG_VERSION) + \ String(keypair.public_data) + String(namespace) + String(b'') + \ String(hash_name) + String(keypair.sign(data_to_sign)) if not raw: sig = wrap_base64(sig, b'SSH SIGNATURE') return sig
[docs] def validate_sshsig(data: BytesOrFilePath, sig: BytesOrFilePath, principal: str, allowed_signers: SSHAllowedSignersArg, *, is_hashed: bool = False) -> bool: """Validate an SSHSIG signature This function validates whether an SSHSIG signature on a block of data is valid and that the signing key used matches an entry in the allowed signers associated with the requested principal. If present, namespace and validity period restrictions on the allowed signers entry are also enforced. In the case where an SSH certificate is used to sign the block, the specified principal must also match the certificate principals and the validity period of the certificate is also enforced. The `allowed_signers` argument can be any of the following: * a string or list of strings containing filenames to load allowed signers from * a byte string containing allowed signer data to match against * an already loaded :class:`SSHAllowedSigners` object containing allowed signers to match against :param data: The data bytes to validate, or a string filename of where to read the data from. :param sig: The signature bytes to validate, or a string filename of where to read the signature from. :param principal: The principal to look up in the allowed signers list. :param allowed_signers: A mapping from principals to allowed public keys or certificates. :param is_hashed: (optional) Whether or not hashing has already been performed on the data passed in. This can be useful when signing large blocks of data which have already had a hash calculated on them. If set to `True`, the `data` argument must be a byte string of the length required by the specified `hash_name`. This defualts to `False`, meaning that hashing will be performed on the data before signing. :type data: `PurePath`, `str`, or `bytes` :type sig: `PurePath`, `str`, or `bytes` :type principal: `str` :type allowed_signers: *see* :ref:`SpecifyingAllowedSigners` :type is_hashed: `bool` :returns: `bool` """ if isinstance(sig, (PurePath, str)): sig = read_file(sig) if sig[:1] == b'-': end = sig.find(b'\n') + 1 if not end: return False line = sig[:end].rstrip() if line != b'-----BEGIN SSH SIGNATURE-----': return False try: sig, end = match_base64(sig[end:], 0, line) sig = binascii.a2b_base64(sig) except ValueError: return False try: packet = SSHPacket(sig) if (packet.get_bytes(6) != _SSHSIG_MAGIC or packet.get_uint32() != _SSHSIG_VERSION): return False pubdata = packet.get_string() try: cert = decode_ssh_certificate(pubdata) if cert.is_x509: # pragma: no cover raise ValueError('X.509 certificates not supported') cert = cast(SSHOpenSSHCertificate, cert) key = cert.key except KeyImportError: try: cert = None key = decode_ssh_public_key(pubdata) except KeyImportError: return False try: namespace = packet.get_string().decode('utf-8') except UnicodeDecodeError: return False _ = packet.get_string() # reserved hash_name = packet.get_string() sig = packet.get_string() packet.check_end() except PacketDecodeError: return False data_to_verify = _signed_data(data, is_hashed, hash_name, namespace) if not key.verify(data_to_verify, sig): return False if isinstance(allowed_signers, bytes): allowed_signers = import_allowed_signers(allowed_signers.decode()) elif not isinstance(allowed_signers, SSHAllowedSigners): allowed_signers = read_allowed_signers(allowed_signers) allowed_signers: SSHAllowedSigners result = allowed_signers.validate(key, principal, namespace) if cert and not result: result = allowed_signers.validate(cert.signing_key, principal, namespace, ca=True) if result: try: cert.validate(CERT_TYPE_ANY, principal) except ValueError: return False return result