diff --git a/lang/python/tests/support.py b/lang/python/tests/support.py index 80c3a4bf..8f9d6452 100644 --- a/lang/python/tests/support.py +++ b/lang/python/tests/support.py @@ -1,107 +1,114 @@ # Copyright (C) 2016 g10 Code GmbH # # This file is part of GPGME. # # GPGME is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # GPGME is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General # Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program; if not, see . from __future__ import absolute_import, print_function, unicode_literals del absolute_import, print_function, unicode_literals import contextlib import shutil import sys import os import tempfile import time import gpg +def assert_gpg_version(version=(2, 1, 0)): + with gpg.Context() as c: + if tuple(map(int, c.engine_info.version.split('.'))) < version: + print("GnuPG too old: have {0}, need {1}.".format( + c.engine_info.version, '.'.join(version))) + sys.exit(77) + # known keys alpha = "A0FF4590BB6122EDEF6E3C542D727CC768697734" bob = "D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2" encrypt_only = "F52770D5C4DB41408D918C9F920572769B9FE19C" sign_only = "7CCA20CCDE5394CEE71C9F0BFED153F12F18F45D" no_such_key = "A" * 40 def make_filename(name): return os.path.join(os.environ['top_srcdir'], 'tests', 'gpg', name) def in_srcdir(name): return os.path.join(os.environ['srcdir'], name) verbose = int(os.environ.get('verbose', 0)) > 1 def print_data(data): if verbose: try: # See if it is a file-like object. data.seek(0, os.SEEK_SET) data = data.read() except: # Hope for the best. pass if hasattr(sys.stdout, "buffer"): sys.stdout.buffer.write(data) else: sys.stdout.write(data) def mark_key_trusted(ctx, key): class Editor(object): def __init__(self): self.steps = ["trust", "save"] def edit(self, status, args, out): if args == "keyedit.prompt": result = self.steps.pop(0) elif args == "edit_ownertrust.value": result = "5" elif args == "edit_ownertrust.set_ultimate.okay": result = "Y" elif args == "keyedit.save.okay": result = "Y" else: result = None return result with gpg.Data() as sink: ctx.op_edit(key, Editor().edit, sink, sink) # Python3.2 and up has tempfile.TemporaryDirectory, but we cannot use # that, because there shutil.rmtree is used without # ignore_errors=True, and that races against gpg-agent deleting its # sockets. class TemporaryDirectory(object): def __enter__(self): self.path = tempfile.mkdtemp() return self.path def __exit__(self, *args): shutil.rmtree(self.path, ignore_errors=True) @contextlib.contextmanager def EphemeralContext(): with TemporaryDirectory() as tmp: home = os.environ['GNUPGHOME'] shutil.copy(os.path.join(home, "gpg.conf"), tmp) shutil.copy(os.path.join(home, "gpg-agent.conf"), tmp) with gpg.Context(home_dir=tmp) as ctx: yield ctx # Ask the agent to quit. agent_socket = os.path.join(tmp, "S.gpg-agent") ctx.protocol = gpg.constants.protocol.ASSUAN ctx.set_engine_info(ctx.protocol, file_name=agent_socket) ctx.assuan_transact(["KILLAGENT"]) # Block until it is really gone. while os.path.exists(agent_socket): time.sleep(.01) diff --git a/lang/python/tests/t-callbacks.py b/lang/python/tests/t-callbacks.py index ae157878..94cf11ef 100755 --- a/lang/python/tests/t-callbacks.py +++ b/lang/python/tests/t-callbacks.py @@ -1,255 +1,257 @@ #!/usr/bin/env python # Copyright (C) 2016 g10 Code GmbH # # This file is part of GPGME. # # GPGME is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # GPGME is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General # Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program; if not, see . from __future__ import absolute_import, print_function, unicode_literals del absolute_import, print_function, unicode_literals import os import gpg import support +support.assert_gpg_version() + c = gpg.Context() c.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK) source = gpg.Data("Hallo Leute\n") sink = gpg.Data() # Valid passphrases, both as string and bytes. for passphrase in ('foo', b'foo'): def passphrase_cb(hint, desc, prev_bad, hook=None): assert hook == passphrase return hook c.set_passphrase_cb(passphrase_cb, passphrase) c.op_encrypt([], 0, source, sink) # Returning an invalid type. def passphrase_cb(hint, desc, prev_bad, hook=None): return 0 c.set_passphrase_cb(passphrase_cb, None) try: c.op_encrypt([], 0, source, sink) except Exception as e: assert type(e) == TypeError assert str(e) == "expected str or bytes from passphrase callback, got int" else: assert False, "Expected an error, got none" # Raising an exception inside callback. myException = Exception() def passphrase_cb(hint, desc, prev_bad, hook=None): raise myException c.set_passphrase_cb(passphrase_cb, None) try: c.op_encrypt([], 0, source, sink) except Exception as e: assert e == myException else: assert False, "Expected an error, got none" # Wrong kind of callback function. def bad_passphrase_cb(): pass c.set_passphrase_cb(bad_passphrase_cb, None) try: c.op_encrypt([], 0, source, sink) except Exception as e: assert type(e) == TypeError else: assert False, "Expected an error, got none" # Test the progress callback. parms = """ Key-Type: RSA Key-Length: 1024 Name-Real: Joe Tester Name-Comment: with stupid passphrase Name-Email: joe+gpg@example.org Passphrase: Crypt0R0cks Expire-Date: 2020-12-31 """ messages = [] def progress_cb(what, typ, current, total, hook=None): assert hook == messages messages.append( "PROGRESS UPDATE: what = {}, type = {}, current = {}, total = {}" .format(what, typ, current, total)) c = gpg.Context() c.set_progress_cb(progress_cb, messages) c.op_genkey(parms, None, None) assert len(messages) > 0 # Test exception handling. def progress_cb(what, typ, current, total, hook=None): raise myException c = gpg.Context() c.set_progress_cb(progress_cb, None) try: c.op_genkey(parms, None, None) except Exception as e: assert e == myException else: assert False, "Expected an error, got none" # Test the edit callback. c = gpg.Context() c.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK) c.set_passphrase_cb(lambda *args: "abc") sink = gpg.Data() alpha = c.get_key("A0FF4590BB6122EDEF6E3C542D727CC768697734", False) cookie = object() edit_cb_called = False def edit_cb(status, args, hook): global edit_cb_called edit_cb_called = True assert hook == cookie return "quit" if args == "keyedit.prompt" else None c.op_edit(alpha, edit_cb, cookie, sink) assert edit_cb_called # Test exceptions. c = gpg.Context() c.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK) c.set_passphrase_cb(lambda *args: "abc") sink = gpg.Data() def edit_cb(status, args): raise myException try: c.op_edit(alpha, edit_cb, None, sink) except Exception as e: assert e == myException else: assert False, "Expected an error, got none" # Test the status callback. source = gpg.Data("Hallo Leute\n") sink = gpg.Data() status_cb_called = False def status_cb(keyword, args, hook=None): global status_cb_called status_cb_called = True assert hook == cookie c = gpg.Context() c.set_status_cb(status_cb, cookie) c.set_ctx_flag("full-status", "1") c.op_encrypt([alpha], gpg.constants.ENCRYPT_ALWAYS_TRUST, source, sink) assert status_cb_called # Test exceptions. source = gpg.Data("Hallo Leute\n") sink = gpg.Data() def status_cb(keyword, args): raise myException c = gpg.Context() c.set_status_cb(status_cb, None) c.set_ctx_flag("full-status", "1") try: c.op_encrypt([alpha], gpg.constants.ENCRYPT_ALWAYS_TRUST, source, sink) except Exception as e: assert e == myException else: assert False, "Expected an error, got none" # Test the data callbacks. def read_cb(amount, hook=None): assert hook == cookie return 0 def release_cb(hook=None): assert hook == cookie data = gpg.Data(cbs=(read_cb, None, None, release_cb, cookie)) try: data.read() except Exception as e: assert type(e) == TypeError else: assert False, "Expected an error, got none" def read_cb(amount): raise myException data = gpg.Data(cbs=(read_cb, None, None, lambda: None)) try: data.read() except Exception as e: assert e == myException else: assert False, "Expected an error, got none" def write_cb(what, hook=None): assert hook == cookie return "wrong type" data = gpg.Data(cbs=(None, write_cb, None, release_cb, cookie)) try: data.write(b'stuff') except Exception as e: assert type(e) == TypeError else: assert False, "Expected an error, got none" def write_cb(what): raise myException data = gpg.Data(cbs=(None, write_cb, None, lambda: None)) try: data.write(b'stuff') except Exception as e: assert e == myException else: assert False, "Expected an error, got none" def seek_cb(offset, whence, hook=None): assert hook == cookie return "wrong type" data = gpg.Data(cbs=(None, None, seek_cb, release_cb, cookie)) try: data.seek(0, os.SEEK_SET) except Exception as e: assert type(e) == TypeError else: assert False, "Expected an error, got none" def seek_cb(offset, whence): raise myException data = gpg.Data(cbs=(None, None, seek_cb, lambda: None)) try: data.seek(0, os.SEEK_SET) except Exception as e: assert e == myException else: assert False, "Expected an error, got none" diff --git a/lang/python/tests/t-edit.py b/lang/python/tests/t-edit.py index 7ac3626f..ffc32965 100755 --- a/lang/python/tests/t-edit.py +++ b/lang/python/tests/t-edit.py @@ -1,70 +1,72 @@ #!/usr/bin/env python # Copyright (C) 2005 Igor Belyi # Copyright (C) 2016 g10 Code GmbH # # This file is part of GPGME. # # GPGME is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # GPGME is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General # Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program; if not, see . from __future__ import absolute_import, print_function, unicode_literals del absolute_import, print_function, unicode_literals import sys import os import gpg import support +support.assert_gpg_version() + class KeyEditor(object): def __init__(self): self.steps = ["fpr", "expire", "1", "primary", "quit"] self.step = 0 self.done = False self.verbose = int(os.environ.get('verbose', 0)) > 1 def edit_fnc(self, status, args, out=None): if args == "keyedit.prompt": result = self.steps[self.step] self.step += 1 elif args == "keyedit.save.okay": result = "Y" self.done = self.step == len(self.steps) elif args == "keygen.valid": result = "0" else: result = None if self.verbose: sys.stderr.write("Code: {}, args: {!r}, Returning: {!r}\n" .format(status, args, result)) return result c = gpg.Context() c.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK) c.set_passphrase_cb(lambda *args: "abc") c.set_armor(True) # The deprecated interface. editor = KeyEditor() c.interact(c.get_key("A0FF4590BB6122EDEF6E3C542D727CC768697734", False), editor.edit_fnc) assert editor.done # The deprecated interface. sink = gpg.Data() editor = KeyEditor() c.op_edit(c.get_key("A0FF4590BB6122EDEF6E3C542D727CC768697734", False), editor.edit_fnc, sink, sink) assert editor.done diff --git a/lang/python/tests/t-encrypt-sym.py b/lang/python/tests/t-encrypt-sym.py index c15955a9..8ee9cd6b 100755 --- a/lang/python/tests/t-encrypt-sym.py +++ b/lang/python/tests/t-encrypt-sym.py @@ -1,83 +1,85 @@ #!/usr/bin/env python # Copyright (C) 2016 g10 Code GmbH # # This file is part of GPGME. # # GPGME is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # GPGME is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General # Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program; if not, see . from __future__ import absolute_import, print_function, unicode_literals del absolute_import, print_function, unicode_literals import os import gpg import support +support.assert_gpg_version() + for passphrase in ("abc", b"abc"): c = gpg.Context() c.set_armor(True) c.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK) source = gpg.Data("Hallo Leute\n") cipher = gpg.Data() passphrase_cb_called = 0 def passphrase_cb(hint, desc, prev_bad, hook=None): global passphrase_cb_called passphrase_cb_called += 1 return passphrase c.set_passphrase_cb(passphrase_cb, None) c.op_encrypt([], 0, source, cipher) assert passphrase_cb_called == 1, \ "Callback called {} times".format(passphrase_cb_called) support.print_data(cipher) c = gpg.Context() c.set_armor(True) c.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK) c.set_passphrase_cb(passphrase_cb, None) plain = gpg.Data() cipher.seek(0, os.SEEK_SET) c.op_decrypt(cipher, plain) # Seems like the passphrase is cached. #assert passphrase_cb_called == 2, \ # "Callback called {} times".format(passphrase_cb_called) support.print_data(plain) plain.seek(0, os.SEEK_SET) plaintext = plain.read() assert plaintext == b"Hallo Leute\n", \ "Wrong plaintext {!r}".format(plaintext) # Idiomatic interface. for passphrase in ("abc", b"abc"): with gpg.Context(armor=True) as c: # Check that the passphrase callback is not altered. def f(*args): assert False c.set_passphrase_cb(f) message = "Hallo Leute\n".encode() ciphertext, _, _ = c.encrypt(message, passphrase=passphrase, sign=False) assert ciphertext.find(b'BEGIN PGP MESSAGE') > 0, 'Marker not found' plaintext, _, _ = c.decrypt(ciphertext, passphrase=passphrase) assert plaintext == message, 'Message body not recovered' assert c._passphrase_cb[1] == f, "Passphrase callback not restored" diff --git a/lang/python/tests/t-quick-key-creation.py b/lang/python/tests/t-quick-key-creation.py index c642c5b4..8b7372e7 100755 --- a/lang/python/tests/t-quick-key-creation.py +++ b/lang/python/tests/t-quick-key-creation.py @@ -1,139 +1,140 @@ #!/usr/bin/env python # Copyright (C) 2017 g10 Code GmbH # # This file is part of GPGME. # # GPGME is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # GPGME is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General # Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program; if not, see . from __future__ import absolute_import, print_function, unicode_literals del absolute_import, print_function, unicode_literals import gpg import itertools import time import support +support.assert_gpg_version((2, 1, 2)) alpha = "Alpha " with support.EphemeralContext() as ctx: res = ctx.create_key(alpha) keys = list(ctx.keylist()) assert len(keys) == 1, "Weird number of keys created" key = keys[0] assert key.fpr == res.fpr assert len(key.subkeys) == 2, "Expected one primary key and one subkey" assert key.subkeys[0].expires > 0, "Expected primary key to expire" # Try to create a key with the same UID try: ctx.create_key(alpha) assert False, "Expected an error but got none" except gpg.errors.GpgError as e: pass # Try to create a key with the same UID, now with force! res2 = ctx.create_key(alpha, force=True) assert res.fpr != res2.fpr # From here on, we use one context, and create unique UIDs uid_counter = 0 def make_uid(): global uid_counter uid_counter += 1 return "user{0}@invalid.example.org".format(uid_counter) with support.EphemeralContext() as ctx: # Check gpg.constants.create.NOEXPIRE... res = ctx.create_key(make_uid(), expires=False) key = ctx.get_key(res.fpr, secret=True) assert key.fpr == res.fpr assert len(key.subkeys) == 2, "Expected one primary key and one subkey" assert key.subkeys[0].expires == 0, "Expected primary key not to expire" t = 2 * 24 * 60 * 60 slack = 5 * 60 res = ctx.create_key(make_uid(), expires_in=t) key = ctx.get_key(res.fpr, secret=True) assert key.fpr == res.fpr assert len(key.subkeys) == 2, "Expected one primary key and one subkey" assert abs(time.time() + t - key.subkeys[0].expires) < slack, \ "Primary keys expiration time is off" # Check capabilities for sign, encrypt, certify, authenticate in itertools.product([False, True], [False, True], [False, True], [False, True]): # Filter some out if not (sign or encrypt or certify or authenticate): # This triggers the default capabilities tested before. continue if (sign or encrypt or authenticate) and not certify: # The primary key always certifies. continue res = ctx.create_key(make_uid(), algorithm="rsa", sign=sign, encrypt=encrypt, certify=certify, authenticate=authenticate) key = ctx.get_key(res.fpr, secret=True) assert key.fpr == res.fpr assert len(key.subkeys) == 1, \ "Expected no subkey for non-default capabilities" p = key.subkeys[0] assert sign == p.can_sign assert encrypt == p.can_encrypt assert certify == p.can_certify assert authenticate == p.can_authenticate # Check algorithm res = ctx.create_key(make_uid(), algorithm="rsa") key = ctx.get_key(res.fpr, secret=True) assert key.fpr == res.fpr for k in key.subkeys: assert k.pubkey_algo == 1 # Check algorithm with size res = ctx.create_key(make_uid(), algorithm="rsa1024") key = ctx.get_key(res.fpr, secret=True) assert key.fpr == res.fpr for k in key.subkeys: assert k.pubkey_algo == 1 assert k.length == 1024 # Check algorithm future-default ctx.create_key(make_uid(), algorithm="future-default") # Check passphrase protection recipient = make_uid() passphrase = "streng geheim" res = ctx.create_key(recipient, passphrase=passphrase) ciphertext, _, _ = ctx.encrypt(b"hello there", recipients=[ctx.get_key(res.fpr)]) cb_called = False def cb(*args): global cb_called cb_called = True return passphrase ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK ctx.set_passphrase_cb(cb) plaintext, _, _ = ctx.decrypt(ciphertext) assert plaintext == b"hello there" assert cb_called diff --git a/lang/python/tests/t-quick-key-manipulation.py b/lang/python/tests/t-quick-key-manipulation.py index 45e4c0e2..0f47006f 100755 --- a/lang/python/tests/t-quick-key-manipulation.py +++ b/lang/python/tests/t-quick-key-manipulation.py @@ -1,124 +1,125 @@ #!/usr/bin/env python # Copyright (C) 2017 g10 Code GmbH # # This file is part of GPGME. # # GPGME is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # GPGME is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General # Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program; if not, see . from __future__ import absolute_import, print_function, unicode_literals del absolute_import, print_function, unicode_literals import os import gpg import support +support.assert_gpg_version((2, 1, 14)) alpha = "Alpha " bravo = "Bravo " with support.EphemeralContext() as ctx: res = ctx.create_key(alpha, certify=True) key = ctx.get_key(res.fpr) assert len(key.subkeys) == 1, "Expected one primary key and no subkeys" assert len(key.uids) == 1, "Expected exactly one UID" def get_uid(uid): key = ctx.get_key(res.fpr) for u in key.uids: if u.uid == uid: return u return None # sanity check uid = get_uid(alpha) assert uid, "UID alpha not found" assert uid.revoked == 0 # add bravo ctx.key_add_uid(key, bravo) uid = get_uid(bravo) assert uid, "UID bravo not found" assert uid.revoked == 0 # revoke alpha ctx.key_revoke_uid(key, alpha) uid = get_uid(alpha) assert uid, "UID alpha not found" assert uid.revoked == 1 uid = get_uid(bravo) assert uid, "UID bravo not found" assert uid.revoked == 0 # try to revoke the last UID try: ctx.key_revoke_uid(key, alpha) # IMHO this should fail. issue2961. # assert False, "Expected an error but got none" except gpg.errors.GpgError: pass # Everything should be the same uid = get_uid(alpha) assert uid, "UID alpha not found" assert uid.revoked == 1 uid = get_uid(bravo) assert uid, "UID bravo not found" assert uid.revoked == 0 # try to revoke a non-existent UID try: ctx.key_revoke_uid(key, "i dont exist") # IMHO this should fail. issue2963. # assert False, "Expected an error but got none" except gpg.errors.GpgError: pass # try to add an pre-existent UID try: ctx.key_add_uid(key, bravo) assert False, "Expected an error but got none" except gpg.errors.GpgError: pass # Check setting the TOFU policy. with open(os.path.join(ctx.home_dir, "gpg.conf"), "a") as handle: handle.write("trust-model tofu+pgp\n") for name, policy in [(name, getattr(gpg.constants.tofu.policy, name)) for name in filter(lambda x: not x.startswith('__'), dir(gpg.constants.tofu.policy))]: if policy == gpg.constants.tofu.policy.NONE: # We must not set the policy to NONE. continue ctx.key_tofu_policy(key, policy) keys = list(ctx.keylist(key.uids[0].uid, mode=(gpg.constants.keylist.mode.LOCAL |gpg.constants.keylist.mode.WITH_TOFU))) assert len(keys) == 1 if policy == gpg.constants.tofu.policy.AUTO: # We cannot check that it is set to AUTO. continue for uid in keys[0].uids: if uid.uid == alpha: # TOFU information of revoked UIDs is not updated. # XXX: Is that expected? continue assert uid.tofu[0].policy == policy, \ "Expected policy {0} ({1}), got {2}".format(policy, name, uid.tofu[0].policy) diff --git a/lang/python/tests/t-quick-key-signing.py b/lang/python/tests/t-quick-key-signing.py index f9778a33..3d648c5b 100755 --- a/lang/python/tests/t-quick-key-signing.py +++ b/lang/python/tests/t-quick-key-signing.py @@ -1,120 +1,121 @@ #!/usr/bin/env python # Copyright (C) 2017 g10 Code GmbH # # This file is part of GPGME. # # GPGME is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # GPGME is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General # Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program; if not, see . from __future__ import absolute_import, print_function, unicode_literals del absolute_import, print_function, unicode_literals import gpg import itertools import time import support +support.assert_gpg_version((2, 1, 1)) with support.EphemeralContext() as ctx: uid_counter = 0 def make_uid(): global uid_counter uid_counter += 1 return "user{0}@invalid.example.org".format(uid_counter) def make_key(): uids = [make_uid() for i in range(3)] res = ctx.create_key(uids[0], certify=True) key = ctx.get_key(res.fpr) for u in uids[1:]: ctx.key_add_uid(key, u) return key, uids def check_sigs(key, expected_sigs): keys = list(ctx.keylist(key.fpr, mode=(gpg.constants.keylist.mode.LOCAL |gpg.constants.keylist.mode.SIGS))) assert len(keys) == 1 key_uids = {uid.uid: [s for s in uid.signatures] for uid in keys[0].uids} expected = list(expected_sigs) while key_uids and expected: uid, signing_key, func = expected[0] match = False for i, s in enumerate(key_uids[uid]): if signing_key.fpr.endswith(s.keyid): if func: func(s) match = True break if match: expected.pop(0) key_uids[uid].pop(i) if not key_uids[uid]: del key_uids[uid] assert not key_uids, "Superfluous signatures: {0}".format(key_uids) assert not expected, "Missing signatures: {0}".format(expected) # Simplest case. Sign without any options. key_a, uids_a = make_key() key_b, uids_b = make_key() ctx.signers = [key_a] def exportable_non_expiring(s): assert s.exportable assert s.expires == 0 check_sigs(key_b, itertools.product(uids_b, [key_b], [exportable_non_expiring])) ctx.key_sign(key_b) check_sigs(key_b, itertools.product(uids_b, [key_b, key_a], [exportable_non_expiring])) # Create a non-exportable signature, and explicitly name all uids. key_c, uids_c = make_key() ctx.signers = [key_a, key_b] def non_exportable_non_expiring(s): assert s.exportable == 0 assert s.expires == 0 ctx.key_sign(key_c, local=True, uids=uids_c) check_sigs(key_c, list(itertools.product(uids_c, [key_c], [exportable_non_expiring])) + list(itertools.product(uids_c, [key_b, key_a], [non_exportable_non_expiring]))) # Create a non-exportable, expiring signature for a single uid. key_d, uids_d = make_key() ctx.signers = [key_c] expires_in = 600 slack = 10 def non_exportable_expiring(s): assert s.exportable == 0 assert abs(time.time() + expires_in - s.expires) < slack ctx.key_sign(key_d, local=True, expires_in=expires_in, uids=uids_d[0]) check_sigs(key_d, list(itertools.product(uids_d, [key_d], [exportable_non_expiring])) + list(itertools.product(uids_d[:1], [key_c], [non_exportable_expiring]))) # Now sign the second in the same fashion, but use a singleton list. ctx.key_sign(key_d, local=True, expires_in=expires_in, uids=uids_d[1:2]) check_sigs(key_d, list(itertools.product(uids_d, [key_d], [exportable_non_expiring])) + list(itertools.product(uids_d[:2], [key_c], [non_exportable_expiring])))