diff --git a/lang/python/docs/GPGMEpythonHOWTOen.org b/lang/python/docs/GPGMEpythonHOWTOen.org index 3325c086..31929fa3 100644 --- a/lang/python/docs/GPGMEpythonHOWTOen.org +++ b/lang/python/docs/GPGMEpythonHOWTOen.org @@ -1,1421 +1,1500 @@ #+TITLE: GNU Privacy Guard (GnuPG) Made Easy Python Bindings HOWTO (English) #+LATEX_COMPILER: xelatex #+LATEX_CLASS: article #+LATEX_CLASS_OPTIONS: [12pt] #+LATEX_HEADER: \usepackage{xltxtra} #+LATEX_HEADER: \usepackage[margin=1in]{geometry} #+LATEX_HEADER: \setmainfont[Ligatures={Common}]{Times New Roman} #+LATEX_HEADER: \author{Ben McGinnes } #+HTML_HEAD_EXTRA: * Introduction :PROPERTIES: :CUSTOM_ID: intro :END: | Version: | 0.1.1 | | Author: | Ben McGinnes | | Author GPG Key: | DB4724E6FA4286C92B4E55C4321E4E2373590E5D | | Language: | Australian English, British English | | xml:lang: | en-AU, en-GB, en | This document provides basic instruction in how to use the GPGME Python bindings to programmatically leverage the GPGME library. ** Python 2 versus Python 3 :PROPERTIES: :CUSTOM_ID: py2-vs-py3 :END: Though the GPGME Python bindings themselves provide support for both Python 2 and 3, the focus is unequivocally on Python 3 and specifically from Python 3.4 and above. As a consequence all the examples and instructions in this guide use Python 3 code. Much of it will work with Python 2, but much of it also deals with Python 3 byte literals, particularly when reading and writing data. Developers concentrating on Python 2.7, and possibly even 2.6, will need to make the appropriate modifications to support the older string and unicode types as opposed to bytes. There are multiple reasons for concentrating on Python 3; some of which relate to the immediate integration of these bindings, some of which relate to longer term plans for both GPGME and the python bindings and some of which relate to the impending EOL period for Python 2.7. Essentially, though, there is little value in tying the bindings to a version of the language which is a dead end and the advantages offered by Python 3 over Python 2 make handling the data types with which GPGME deals considerably easier. ** Examples :PROPERTIES: :CUSTOM_ID: howto-python3-examples :END: All of the examples found in this document can be found as Python 3 scripts in the =lang/python/examples/howto= directory. * GPGME Concepts :PROPERTIES: :CUSTOM_ID: gpgme-concepts :END: ** A C API :PROPERTIES: :CUSTOM_ID: gpgme-c-api :END: Unlike many modern APIs with which programmers will be more familiar with these days, the GPGME API is a C API. The API is intended for use by C coders who would be able to access its features by including the =gpgme.h= header file with their own C source code and then access its functions just as they would any other C headers. This is a very effective method of gaining complete access to the API and in the most efficient manner possible. It does, however, have the drawback that it cannot be directly used by other languages without some means of providing an interface to those languages. This is where the need for bindings in various languages stems. ** Python bindings :PROPERTIES: :CUSTOM_ID: gpgme-python-bindings :END: The Python bindings for GPGME provide a higher level means of accessing the complete feature set of GPGME itself. It also provides a more pythonic means of calling these API functions. The bindings are generated dynamically with SWIG and the copy of =gpgme.h= generated when GPGME is compiled. This means that a version of the Python bindings is fundamentally tied to the exact same version of GPGME used to generate that copy of =gpgme.h=. ** Difference between the Python bindings and other GnuPG Python packages :PROPERTIES: :CUSTOM_ID: gpgme-python-bindings-diffs :END: There have been numerous attempts to add GnuPG support to Python over the years. Some of the most well known are listed here, along with what differentiates them. *** The python-gnupg package maintained by Vinay Sajip :PROPERTIES: :CUSTOM_ID: diffs-python-gnupg :END: This is arguably the most popular means of integrating GPG with Python. The package utilises the =subprocess= module to implement wrappers for the =gpg= and =gpg2= executables normally invoked on the command line (=gpg.exe= and =gpg2.exe= on Windows). The popularity of this package stemmed from its ease of use and capability in providing the most commonly required features. Unfortunately it has been beset by a number of security issues in the past; most of which stemmed from using unsafe methods of accessing the command line via the =subprocess= calls. While some effort has been made over the last two to three years (as of 2018) to mitigate this, particularly by no longer providing shell access through those subprocess calls, the wrapper is still somewhat limited in the scope of its GnuPG features coverage. The python-gnupg package is available under the MIT license. *** The gnupg package created and maintained by Isis Lovecruft :PROPERTIES: :CUSTOM_ID: diffs-isis-gnupg :END: In 2015 Isis Lovecruft from the Tor Project forked and then re-implemented the python-gnupg package as just gnupg. This new package also relied on subprocess to call the =gpg= or =gpg2= binaries, but did so somewhat more securely. The naming and version numbering selected for this package, however, resulted in conflicts with the original python-gnupg and since its functions were called in a different manner to python-gnupg, the release of this package also resulted in a great deal of consternation when people installed what they thought was an upgrade that subsequently broke the code relying on it. The gnupg package is available under the GNU General Public License version 3.0 (or any later version). *** The PyME package maintained by Martin Albrecht :PROPERTIES: :CUSTOM_ID: diffs-pyme :END: This package is the origin of these bindings, though they are somewhat different now. For details of when and how the PyME package was folded back into GPGME itself see the /Short History/ document[fn:1] in the Python bindings =docs= directory.[fn:2] The PyME package was first released in 2002 and was also the first attempt to implement a low level binding to GPGME. In doing so it provided access to considerably more functionality than either the =python-gnupg= or =gnupg= packages. The PyME package is only available for Python 2.6 and 2.7. Porting the PyME package to Python 3.4 in 2015 is what resulted in it being folded into the GPGME project and the current bindings are the end result of that effort. The PyME package is available under the same dual licensing as GPGME itself: the GNU General Public License version 2.0 (or any later version) and the GNU Lesser General Public License version 2.1 (or any later version). * GPGME Python bindings installation :PROPERTIES: :CUSTOM_ID: gpgme-python-install :END: ** No PyPI :PROPERTIES: :CUSTOM_ID: do-not-use-pypi :END: Most third-party Python packages and modules are available and distributed through the Python Package Installer, known as PyPI. Due to the nature of what these bindings are and how they work, it is infeasible to install the GPGME Python bindings in the same way. This is because the bindings use SWIG to dynamically generate C bindings against =gpgme.h= and =gpgme.h= is generated from =gpgme.h.in= at compile time when GPGME is built from source. Thus to include a package in PyPI which actually built correctly would require either statically built libraries for every architecture bundled with it or a full implementation of C for each architecture. ** Requirements :PROPERTIES: :CUSTOM_ID: gpgme-python-requirements :END: The GPGME Python bindings only have three requirements: 1. A suitable version of Python 2 or Python 3. With Python 2 that means Python 2.7 and with Python 3 that means Python 3.4 or higher. 2. SWIG. 3. GPGME itself. Which also means that all of GPGME's dependencies must be installed too. ** Installation :PROPERTIES: :CUSTOM_ID: installation :END: Installing the Python bindings is effectively achieved by compiling and installing GPGME itself. Once SWIG is installed with Python and all the dependencies for GPGME are installed you only need to confirm that the version(s) of Python you want the bindings installed for are in your =$PATH=. By default GPGME will attempt to install the bindings for the most recent or highest version number of Python 2 and Python 3 it detects in =$PATH=. It specifically checks for the =python= and =python3= executables first and then checks for specific version numbers. For Python 2 it checks for these executables in this order: =python=, =python2= and =python2.7=. For Python 3 it checks for these executables in this order: =python3=, =python3.6=, =python3.5= and =python3.4=. *** Installing GPGME :PROPERTIES: :CUSTOM_ID: install-gpgme :END: See the GPGME =README= file for details of how to install GPGME from source. * Fundamentals :PROPERTIES: :CUSTOM_ID: howto-fund-a-mental :END: Before we can get to the fun stuff, there are a few matters regarding GPGME's design which hold true whether you're dealing with the C code directly or these Python bindings. ** No REST :PROPERTIES: :CUSTOM_ID: no-rest-for-the-wicked :END: The first part of which is or will be fairly blatantly obvious upon viewing the first example, but it's worth reiterating anyway. That being that this API is /*not*/ a REST API. Nor indeed could it ever be one. Most, if not all, Python programmers (and not just Python programmers) know how easy it is to work with a RESTful API. In fact they've become so popular that many other APIs attempt to emulate REST-like behaviour as much as they are able. Right down to the use of JSON formatted output to facilitate the use of their API without having to retrain developers. This API does not do that. It would not be able to do that and also provide access to the entire C API on which it's built. It does, however, provide a very pythonic interface on top of the direct bindings and it's this pythonic layer with which this HOWTO deals with. ** Context :PROPERTIES: :CUSTOM_ID: howto-get-context :END: One of the reasons which prevents this API from being RESTful is that most operations require more than one instruction to the API to perform the task. Sure, there are certain functions which can be performed simultaneously, particularly if the result known or strongly anticipated (e.g. selecting and encrypting to a key known to be in the public keybox). There are many more, however, which cannot be manipulated so readily: they must be performed in a specific sequence and the result of one operation has a direct bearing on the outcome of subsequent operations. Not merely by generating an error either. When dealing with this type of persistent state on the web, full of both the RESTful and REST-like, it's most commonly referred to as a session. In GPGME, however, it is called a context and every operation type has one. * Working with keys :PROPERTIES: :CUSTOM_ID: howto-keys :END: ** Key selection :PROPERTIES: :CUSTOM_ID: howto-keys-selection :END: Selecting keys to encrypt to or to sign with will be a common occurrence when working with GPGMe and the means available for doing so are quite simple. They do depend on utilising a Context; however once the data is recorded in another variable, that Context does not need to be the same one which subsequent operations are performed. The easiest way to select a specific key is by searching for that key's key ID or fingerprint, preferably the full fingerprint without any spaces in it. A long key ID will probably be okay, but is not advised and short key IDs are already a problem with some being generated to match specific patterns. It does not matter whether the pattern is upper or lower case. So this is the best method: #+begin_src python import gpg k = gpg.Context().keylist(pattern="258E88DCBD3CD44D8E7AB43F6ECB6AF0DEADBEEF") keys = list(k) #+end_src This is passable and very likely to be common: #+begin_src python import gpg k = gpg.Context().keylist(pattern="0x6ECB6AF0DEADBEEF") keys = list(k) #+end_src And this is a really bad idea: #+begin_src python import gpg k = gpg.Context().keylist(pattern="0xDEADBEEF") keys = list(k) #+end_src Alternatively it may be that the intention is to create a list of keys which all match a particular search string. For instance all the addresses at a particular domain, like this: #+begin_src python import gpg ncsc = gpg.Context().keylist(pattern="ncsc.mil") nsa = list(ncsc) #+end_src *** Counting keys :PROPERTIES: :CUSTOM_ID: howto-keys-counting :END: Counting the number of keys in your public keybox (=pubring.kbx=), the format which has superseded the old keyring format (=pubring.gpg= and =secring.gpg=), or the number of secret keys is a very simple task. #+begin_src python import gpg c = gpg.Context() seckeys = c.keylist(pattern=None, secret=True) pubkeys = c.keylist(pattern=None, secret=False) seclist = list(seckeys) secnum = len(seclist) publist = list(pubkeys) pubnum = len(publist) print(""" Number of secret keys: {0} Number of public keys: {1} """.format(secnum, pubnum)) #+end_src ** Get key :PROPERTIES: :CUSTOM_ID: howto-get-key :END: An alternative method of getting a single key via its fingerprint is available directly within a Context with =Context().get_key=. This is the preferred method of selecting a key in order to modify it, sign or certify it and for obtaining relevant data about a single key as a part of other functions; when verifying a signature made by that key, for instance. By default this method will select public keys, but it can select secret keys as well. This first example demonstrates selecting the current key of Werner Koch, which is due to expire at the end of 2018: #+begin_src python import gpg fingerprint = "80615870F5BAD690333686D0F2AD85AC1E42B367" key = gpg.Context().get_key(fingerprint) #+end_src Whereas this example demonstrates selecting the author's current key with the =secret= key word argument set to =True=: #+begin_src python import gpg fingerprint = "DB4724E6FA4286C92B4E55C4321E4E2373590E5D" key = gpg.Context().get_key(fingerprint, secret=True) #+end_src It is, of course, quite possible to select expired, disabled and revoked keys with this function, but only to effectively display information about those keys. It is also possible to use both unicode or string literals and byte literals with the fingerprint when getting a key in this way. +** Importing keys + :PROPERTIES: + :CUSTOM_ID: howto-import-key + :END: + + Importing keys is possible with the =key_import()= method and takes + one argument which is a bytes literal object containing either the + binary or ASCII armoured key data for one or more keys. + + In the following example a key will be retrieved from the SKS + keyservers via the web using the requests module. Since requests + returns the content as a bytes literal object, we can then use that + directly to import the resulting data into our keybox. In order to + demonstrate multiple imports this example searches for all the keys + of users at a particular domain name; in this case the EFF. + + #+begin_src python + import gpg + import os.path + import requests + + c = gpg.Context() + + homedir = input("Enter the GPG configuration directory path (optional): ") + pattern = input("The pattern to search for in key or user IDs: ") + url = "https://sks-keyservers.net/pks/lookup" + payload = { "op": "get", "search": pattern } + + if homedir.startswith("~"): + if os.path.exists(os.path.expanduser(homedir)) is True: + c.home_dir = os.path.expanduser(homedir) + else: + pass + elif os.path.exists(homedir) is True: + c.home_dir = homedir + else: + pass + + r = requests.get(url, verify=True, params=payload) + incoming = c.key_import(r.content) + + summary = """ + Total number of keys: {0} + Total number imported: {1} + Number of version 3 keys ignored: {2} + + Number of imported key objects or updates: {3} + Number of unchanged keys: {4} + Number of new signatures: {5} + Number of revoked keys: {6} + """.format(incoming.considered, len(incoming.imports), + incoming.skipped_v3_keys, incoming.imported, incoming.unchanged, + incoming.new_signatures, incoming.new_revocations) + + print(summary) + #+end_src + + The resulting output in that case, where the search pattern entered + was =@eff.org= was: + + #+begin_src shell + Total number of keys: 272 + Total number imported: 249 + Number of version 3 keys ignored: 23 + + Number of imported key objects or updates: 180 + Number of unchanged keys: 66 + Number of new signatures: 7 + Number of revoked keys: 0 + #+end_src + + The examples for this document in =lang/python/examples/howto/ now + include to variations of this; one for searching the SKS keyserver + pool and the other for importing from a local file. + + The example above and the corresponding executable script included + in the examples requires Kenneth Reitz's excellent [[http://docs.python-requests.org/en/master/][Requests module]]. + + * Basic Functions :PROPERTIES: :CUSTOM_ID: howto-the-basics :END: The most frequently called features of any cryptographic library will be the most fundamental tasks for encryption software. In this section we will look at how to programmatically encrypt data, decrypt it, sign it and verify signatures. ** Encryption :PROPERTIES: :CUSTOM_ID: howto-basic-encryption :END: Encrypting is very straight forward. In the first example below the message, =text=, is encrypted to a single recipient's key. In the second example the message will be encrypted to multiple recipients. *** Encrypting to one key :PROPERTIES: :CUSTOM_ID: howto-basic-encryption-single :END: Once the the Context is set the main issues with encrypting data is essentially reduced to key selection and the keyword arguments specified in the =gpg.Context().encrypt()= method. Those keyword arguments are: =recipients=, a list of keys encrypted to (covered in greater detail in the following section); =sign=, whether or not to sign the plaintext data, see subsequent sections on signing and verifying signatures below (defaults to =True=); =sink=, to write results or partial results to a secure sink instead of returning it (defaults to =None=); =passphrase=, only used when utilising symmetric encryption (defaults to =None=); =always_trust=, used to override the trust model settings for recipient keys (defaults to =False=); =add_encrypt_to=, utilises any preconfigured =encrypt-to= or =default-key= settings in the user's =gpg.conf= file (defaults to =False=); =prepare=, prepare for encryption (defaults to =False=); =expect_sign=, prepare for signing (defaults to =False=); =compress=, compresses the plaintext prior to encryption (defaults to =True=). #+begin_src python import gpg a_key = "0x12345678DEADBEEF" text = b"""Some text to test with. Since the text in this case must be bytes, it is most likely that the input form will be a separate file which is opened with "rb" as this is the simplest method of obtaining the correct data format. """ c = gpg.Context(armor=True) rkey = list(c.keylist(pattern=a_key, secret=False)) ciphertext, result, sign_result = c.encrypt(text, recipients=rkey, sign=False) with open("secret_plans.txt.asc", "wb") as afile: afile.write(ciphertext) #+end_src Though this is even more likely to be used like this; with the plaintext input read from a file, the recipient keys used for encryption regardless of key trust status and the encrypted output also encrypted to any preconfigured keys set in the =gpg.conf= file: #+begin_src python import gpg a_key = "0x12345678DEADBEEF" with open("secret_plans.txt", "rb") as afile: text = afile.read() c = gpg.Context(armor=True) rkey = list(c.keylist(pattern=a_key, secret=False)) ciphertext, result, sign_result = c.encrypt(text, recipients=rkey, sign=True, always_trust=True, add_encrypt_to=True) with open("secret_plans.txt.asc", "wb") as afile: afile.write(ciphertext) #+end_src If the =recipients= paramater is empty then the plaintext is encrypted symmetrically. If no =passphrase= is supplied as a parameter or via a callback registered with the =Context()= then an out-of-band prompt for the passphrase via pinentry will be invoked. *** Encrypting to multiple keys :PROPERTIES: :CUSTOM_ID: howto-basic-encryption-multiple :END: Encrypting to multiple keys essentially just expands upon the key selection process and the recipients from the previous examples. The following example encrypts a message (=text=) to everyone with an email address on the =gnupg.org= domain,[fn:3] but does /not/ encrypt to a default key or other key which is configured to normally encrypt to. #+begin_src python import gpg text = b"""Oh look, another test message. The same rules apply as with the previous example and more likely than not, the message will actually be drawn from reading the contents of a file or, maybe, from entering data at an input() prompt. Since the text in this case must be bytes, it is most likely that the input form will be a separate file which is opened with "rb" as this is the simplest method of obtaining the correct data format. """ c = gpg.Context(armor=True) rpattern = list(c.keylist(pattern="@gnupg.org", secret=False)) logrus = [] for i in range(len(rpattern)): if rpattern[i].can_encrypt == 1: logrus.append(rpattern[i]) ciphertext, result, sign_result = c.encrypt(text, recipients=logrus, sign=False, always_trust=True) with open("secret_plans.txt.asc", "wb") as afile: afile.write(ciphertext) #+end_src All it would take to change the above example to sign the message and also encrypt the message to any configured default keys would be to change the =c.encrypt= line to this: #+begin_src python ciphertext, result, sign_result = c.encrypt(text, recipients=logrus, always_trust=True, add_encrypt_to=True) #+end_src The only keyword arguments requiring modification are those for which the default values are changing. The default value of =sign= is =True=, the default of =always_trust= is =False=, the default of =add_encrypt_to= is =False=. If =always_trust= is not set to =True= and any of the recipient keys are not trusted (e.g. not signed or locally signed) then the encryption will raise an error. It is possible to mitigate this somewhat with something more like this: #+begin_src python import gpg with open("secret_plans.txt.asc", "rb") as afile: text = afile.read() c = gpg.Context(armor=True) rpattern = list(c.keylist(pattern="@gnupg.org", secret=False)) logrus = [] for i in range(len(rpattern)): if rpattern[i].can_encrypt == 1: logrus.append(rpattern[i]) try: ciphertext, result, sign_result = c.encrypt(text, recipients=logrus, add_encrypt_to=True) except gpg.errors.InvalidRecipients as e: for i in range(len(e.recipients)): for n in range(len(logrus)): if logrus[n].fpr == e.recipients[i].fpr: logrus.remove(logrus[n]) else: pass try: ciphertext, result, sign_result = c.encrypt(text, recipients=logrus, add_encrypt_to=True) with open("secret_plans.txt.asc", "wb") as afile: afile.write(ciphertext) except: pass #+end_src This will attempt to encrypt to all the keys searched for, then remove invalid recipients if it fails and try again. ** Decryption :PROPERTIES: :CUSTOM_ID: howto-basic-decryption :END: Decrypting something encrypted to a key in one's secret keyring is fairly straight forward. In this example code, however, preconfiguring either =gpg.Context()= or =gpg.core.Context()= as =c= is unnecessary because there is no need to modify the Context prior to conducting the decryption and since the Context is only used once, setting it to =c= simply adds lines for no gain. #+begin_src python import gpg ciphertext = input("Enter path and filename of encrypted file: ") newfile = input("Enter path and filename of file to save decrypted data to: ") with open(ciphertext, "rb") as cfile: try: plaintext, result, verify_result = gpg.Context().decrypt(cfile) except gpg.errors.GPGMEError as e: plaintext = None print(e) if plaintext is not None: with open(newfile, "wb") as nfile: nfile.write(plaintext) else: pass #+end_src The data available in =plaintext= in this example is the decrypted content as a byte object, the recipient key IDs and algorithms in =result= and the results of verifying any signatures of the data in =verify_result=. ** Signing text and files :PROPERTIES: :CUSTOM_ID: howto-basic-signing :END: The following sections demonstrate how to specify keys to sign with. *** Signing key selection :PROPERTIES: :CUSTOM_ID: howto-basic-signing-signers :END: By default GPGME and the Python bindings will use the default key configured for the user invoking the GPGME API. If there is no default key specified and there is more than one secret key available it may be necessary to specify the key or keys with which to sign messages and files. #+begin_src python import gpg logrus = input("Enter the email address or string to match signing keys to: ") hancock = gpg.Context().keylist(pattern=logrus, secret=True) sig_src = list(hancock) #+end_src The signing examples in the following sections include the explicitly designated =signers= parameter in two of the five examples; once where the resulting signature would be ASCII armoured and once where it would not be armoured. While it would be possible to enter a key ID or fingerprint here to match a specific key, it is not possible to enter two fingerprints and match two keys since the patten expects a string, bytes or None and not a list. A string with two fingerprints won't match any single key. *** Normal or default signing messages or files :PROPERTIES: :CUSTOM_ID: howto-basic-signing-normal :END: The normal or default signing process is essentially the same as is most often invoked when also encrypting a message or file. So when the encryption component is not utilised, the result is to produce an encoded and signed output which may or may not be ASCII armoured and which may or may not also be compressed. By default compression will be used unless GnuPG detects that the plaintext is already compressed. ASCII armouring will be determined according to the value of =gpg.Context().armor=. The compression algorithm is selected in much the same way as the symmetric encryption algorithm or the hash digest algorithm is when multiple keys are involved; from the preferences saved into the key itself or by comparison with the preferences with all other keys involved. #+begin_src python import gpg text0 = """Declaration of ... something. """ text = text0.encode() c = gpg.Context(armor=True, signers=sig_src) signed_data, result = c.sign(text, mode=gpg.constants.sig.mode.NORMAL) with open("/path/to/statement.txt.asc", "w") as afile: afile.write(signed_data.decode()) #+end_src Though everything in this example is accurate, it is more likely that reading the input data from another file and writing the result to a new file will be performed more like the way it is done in the next example. Even if the output format is ASCII armoured. #+begin_src python import gpg with open("/path/to/statement.txt", "rb") as tfile: text = tfile.read() c = gpg.Context() signed_data, result = c.sign(text, mode=gpg.constants.sig.mode.NORMAL) with open("/path/to/statement.txt.sig", "wb") as afile: afile.write(signed_data) #+end_src *** Detached signing messages and files :PROPERTIES: :CUSTOM_ID: howto-basic-signing-detached :END: Detached signatures will often be needed in programmatic uses of GPGME, either for signing files (e.g. tarballs of code releases) or as a component of message signing (e.g. PGP/MIME encoded email). #+begin_src python import gpg text0 = """Declaration of ... something. """ text = text0.encode() c = gpg.Context(armor=True) signed_data, result = c.sign(text, mode=gpg.constants.sig.mode.DETACH) with open("/path/to/statement.txt.asc", "w") as afile: afile.write(signed_data.decode()) #+end_src As with normal signatures, detached signatures are best handled as byte literals, even when the output is ASCII armoured. #+begin_src python import gpg with open("/path/to/statement.txt", "rb") as tfile: text = tfile.read() c = gpg.Context(signers=sig_src) signed_data, result = c.sign(text, mode=gpg.constants.sig.mode.DETACH) with open("/path/to/statement.txt.sig", "wb") as afile: afile.write(signed_data) #+end_src *** Clearsigning messages or text :PROPERTIES: :CUSTOM_ID: howto-basic-signing-clear :END: Though PGP/in-line messages are no longer encouraged in favour of PGP/MIME, there is still sometimes value in utilising in-line signatures. This is where clear-signed messages or text is of value. #+begin_src python import gpg text0 = """Declaration of ... something. """ text = text0.encode() c = gpg.Context() signed_data, result = c.sign(text, mode=gpg.constants.sig.mode.CLEAR) with open("/path/to/statement.txt.asc", "w") as afile: afile.write(signed_data.decode()) #+end_src In spite of the appearance of a clear-signed message, the data handled by GPGME in signing it must still be byte literals. #+begin_src python import gpg with open("/path/to/statement.txt", "rb") as tfile: text = tfile.read() c = gpg.Context() signed_data, result = c.sign(text, mode=gpg.constants.sig.mode.CLEAR) with open("/path/to/statement.txt.asc", "wb") as afile: afile.write(signed_data) #+end_src ** Signature verification :PROPERTIES: :CUSTOM_ID: howto-basic-verification :END: Essentially there are two principal methods of verification of a signature. The first of these is for use with the normal or default signing method and for clear-signed messages. The second is for use with files and data with detached signatures. The following example is intended for use with the default signing method where the file was not ASCII armoured: #+begin_src python import gpg import time filename = "statement.txt" gpg_file = "statement.txt.gpg" c = gpg.Context() try: data, result = c.verify(open(gpg_file)) verified = True except gpg.errors.BadSignatures as e: verified = False print(e) if verified is True: for i in range(len(result.signatures)): sign = result.signatures[i] print("""Good signature from: {0} with key {1} made at {2} """.format(c.get_key(sign.fpr).uids[0].uid, sign.fpr, time.ctime(sign.timestamp))) else: pass #+end_src Whereas this next example, which is almost identical would work with normal ASCII armoured files and with clear-signed files: #+begin_src python import gpg import time filename = "statement.txt" asc_file = "statement.txt.asc" c = gpg.Context() try: data, result = c.verify(open(asc_file)) verified = True except gpg.errors.BadSignatures as e: verified = False print(e) if verified is True: for i in range(len(result.signatures)): sign = result.signatures[i] print("""Good signature from: {0} with key {1} made at {2} """.format(c.get_key(sign.fpr).uids[0].uid, sign.fpr, time.ctime(sign.timestamp))) else: pass #+end_src In both of the previous examples it is also possible to compare the original data that was signed against the signed data in =data= to see if it matches with something like this: #+begin_src python with open(filename, "rb") as afile: text = afile.read() if text == data: print("Good signature.") else: pass #+end_src The following two examples, however, deal with detached signatures. With his method of verification the data that was signed does not get returned since it is already being explicitly referenced in the first argument of =c.verify=. So =data= is =None= and only the information in =result= is available. #+begin_src python import gpg import time filename = "statement.txt" sig_file = "statement.txt.sig" c = gpg.Context() try: data, result = c.verify(open(filename), open(sig_file)) verified = True except gpg.errors.BadSignatures as e: verified = False print(e) if verified is True: for i in range(len(result.signatures)): sign = result.signatures[i] print("""Good signature from: {0} with key {1} made at {2} """.format(c.get_key(sign.fpr).uids[0].uid, sign.fpr, time.ctime(sign.timestamp))) else: pass #+end_src #+begin_src python import gpg import time filename = "statement.txt" asc_file = "statement.txt.asc" c = gpg.Context() try: data, result = c.verify(open(filename), open(asc_file)) verified = True except gpg.errors.BadSignatures as e: verified = False print(e) if verified is True: for i in range(len(result.signatures)): sign = result.signatures[i] print("""Good signature from: {0} with key {1} made at {2} """.format(c.get_key(sign.fpr).uids[0].uid, sign.fpr, time.ctime(sign.timestamp))) else: pass #+end_src * Creating keys and subkeys :PROPERTIES: :CUSTOM_ID: key-generation :END: The one thing, aside from GnuPG itself, that GPGME depends on, of course, is the keys themselves. So it is necessary to be able to generate them and modify them by adding subkeys, revoking or disabling them, sometimes deleting them and doing the same for user IDs. In the following examples a key will be created for the world's greatest secret agent, Danger Mouse. Since Danger Mouse is a secret agent he needs to be able to protect information to =SECRET= level clearance, so his keys will be 3072-bit keys. The pre-configured =gpg.conf= file which sets cipher, digest and other preferences contains the following configuration parameters: #+begin_src conf expert allow-freeform-uid allow-secret-key-import trust-model tofu+pgp tofu-default-policy unknown enable-large-rsa enable-dsa2 cert-digest-algo SHA512 default-preference-list TWOFISH CAMELLIA256 AES256 CAMELLIA192 AES192 CAMELLIA128 AES BLOWFISH IDEA CAST5 3DES SHA512 SHA384 SHA256 SHA224 RIPEMD160 SHA1 ZLIB BZIP2 ZIP Uncompressed personal-cipher-preferences TWOFISH CAMELLIA256 AES256 CAMELLIA192 AES192 CAMELLIA128 AES BLOWFISH IDEA CAST5 3DES personal-digest-preferences SHA512 SHA384 SHA256 SHA224 RIPEMD160 SHA1 personal-compress-preferences ZLIB BZIP2 ZIP Uncompressed #+end_src ** Primary key :PROPERTIES: :CUSTOM_ID: keygen-primary :END: Generating a primary key uses the =create_key= method in a Context. It contains multiple arguments and keyword arguments, including: =userid=, =algorithm=, =expires_in=, =expires=, =sign=, =encrypt=, =certify=, =authenticate=, =passphrase= and =force=. The defaults for all of those except =userid=, =algorithm=, =expires_in=, =expires= and =passphrase= is =False=. The defaults for =algorithm= and =passphrase= is =None=. The default for =expires_in= is =0=. The default for =expires= is =True=. There is no default for =userid=. If =passphrase= is left as =None= then the key will not be generated with a passphrase, if =passphrase= is set to a string then that will be the passphrase and if =passphrase= is set to =True= then gpg-agent will launch pinentry to prompt for a passphrase. For the sake of convenience, these examples will keep =passphrase= set to =None=. #+begin_src python import gpg c = gpg.Context() c.home_dir = "~/.gnupg-dm" userid = "Danger Mouse " dmkey = c.create_key(userid, algorithm="rsa3072", expires_in=31536000, sign=True, certify=True) #+end_src One thing to note here is the use of setting the =c.home_dir= parameter. This enables generating the key or keys in a different location. In this case to keep the new key data created for this example in a separate location rather than adding it to existing and active key store data. As with the default directory, =~/.gnupg=, any temporary or separate directory needs the permissions set to only permit access by the directory owner. On posix systems this means setting the directory permissions to 700. The =temp-homedir-config.py= script in the HOWTO examples directory will create an alternative homedir with these configuration options already set and the correct directory and file permissions. The successful generation of the key can be confirmed via the returned =GenkeyResult= object, which includes the following data: #+begin_src python print(""" Fingerprint: {0} Primary Key: {1} Public Key: {2} Secret Key: {3} Sub Key: {4} User IDs: {5} """.format(dmkey.fpr, dmkey.primary, dmkey.pubkey, dmkey.seckey, dmkey.sub, dmkey.uid)) #+end_src Alternatively the information can be confirmed using the command line program: #+begin_src shell bash-4.4$ gpg --homedir ~/.gnupg-dm -K ~/.gnupg-dm/pubring.kbx ---------------------- sec rsa3072 2018-03-15 [SC] [expires: 2019-03-15] 177B7C25DB99745EE2EE13ED026D2F19E99E63AA uid [ultimate] Danger Mouse bash-4.4$ #+end_src As with generating keys manually, to preconfigure expanded preferences for the cipher, digest and compression algorithms, the =gpg.conf= file must contain those details in the home directory in which the new key is being generated. I used a cut down version of my own =gpg.conf= file in order to be able to generate this: #+begin_src shell bash-4.4$ gpg --homedir ~/.gnupg-dm --edit-key 177B7C25DB99745EE2EE13ED026D2F19E99E63AA showpref quit Secret key is available. sec rsa3072/026D2F19E99E63AA created: 2018-03-15 expires: 2019-03-15 usage: SC trust: ultimate validity: ultimate [ultimate] (1). Danger Mouse [ultimate] (1). Danger Mouse Cipher: TWOFISH, CAMELLIA256, AES256, CAMELLIA192, AES192, CAMELLIA128, AES, BLOWFISH, IDEA, CAST5, 3DES Digest: SHA512, SHA384, SHA256, SHA224, RIPEMD160, SHA1 Compression: ZLIB, BZIP2, ZIP, Uncompressed Features: MDC, Keyserver no-modify bash-4.4$ #+end_src ** Subkeys :PROPERTIES: :CUSTOM_ID: keygen-subkeys :END: Adding subkeys to a primary key is fairly similar to creating the primary key with the =create_subkey= method. Most of the arguments are the same, but not quite all. Instead of the =userid= argument there is now a =key= argument for selecting which primary key to add the subkey to. In the following example an encryption subkey will be added to the primary key. Since Danger Mouse is a security conscious secret agent, this subkey will only be valid for about six months, half the length of the primary key. #+begin_src python import gpg c = gpg.Context() c.home_dir = "~/.gnupg-dm" key = c.get_key(dmkey.fpr, secret=True) dmsub = c.create_subkey(key, algorithm="rsa3072", expires_in=15768000, encrypt=True) #+end_src As with the primary key, the results here can be checked with: #+begin_src python print(""" Fingerprint: {0} Primary Key: {1} Public Key: {2} Secret Key: {3} Sub Key: {4} User IDs: {5} """.format(dmsub.fpr, dmsub.primary, dmsub.pubkey, dmsub.seckey, dmsub.sub, dmsub.uid)) #+end_src As well as on the command line with: #+begin_src shell bash-4.4$ gpg --homedir ~/.gnupg-dm -K ~/.gnupg-dm/pubring.kbx ---------------------- sec rsa3072 2018-03-15 [SC] [expires: 2019-03-15] 177B7C25DB99745EE2EE13ED026D2F19E99E63AA uid [ultimate] Danger Mouse ssb rsa3072 2018-03-15 [E] [expires: 2018-09-13] bash-4.4$ #+end_src ** User IDs :PROPERTIES: :CUSTOM_ID: keygen-uids :END: *** Adding User IDs :PROPERTIES: :CUSTOM_ID: keygen-uids-add :END: By comparison to creating primary keys and subkeys, adding a new user ID to an existing key is much simpler. The method used to do this is =key_add_uid= and the only arguments it takes are for the =key= and the new =uid=. #+begin_src python import gpg c = gpg.Context() c.home_dir = "~/.gnupg-dm" dmfpr = "177B7C25DB99745EE2EE13ED026D2F19E99E63AA" key = c.get_key(dmfpr, secret=True) uid = "Danger Mouse " c.key_add_uid(key, uid) #+end_src Unsurprisingly the result of this is: #+begin_src shell bash-4.4$ gpg --homedir ~/.gnupg-dm -K ~/.gnupg-dm/pubring.kbx ---------------------- sec rsa3072 2018-03-15 [SC] [expires: 2019-03-15] 177B7C25DB99745EE2EE13ED026D2F19E99E63AA uid [ultimate] Danger Mouse uid [ultimate] Danger Mouse ssb rsa3072 2018-03-15 [E] [expires: 2018-09-13] bash-4.4$ #+end_src *** Revokinging User IDs :PROPERTIES: :CUSTOM_ID: keygen-uids-revoke :END: Revoking a user ID is a fairly similar process, except that it uses the =key_revoke_uid= method. #+begin_src python import gpg c = gpg.Context() c.home_dir = "~/.gnupg-dm" dmfpr = "177B7C25DB99745EE2EE13ED026D2F19E99E63AA" key = c.get_key(dmfpr, secret=True) uid = "Danger Mouse " c.key_revoke_uid(key, uid) #+end_src ** Key certification :PROPERTIES: :CUSTOM_ID: key-sign :END: Since key certification is more frequently referred to as key signing, the method used to perform this function is =key_sign=. The =key_sign= method takes four arguments: =key=, =uids=, =expires_in= and =local=. The default value of =uids= is =None= and which results in all user IDs being selected. The default value of both =expires_in= and =local= is =False=; which results in the signature never expiring and being able to be exported. The =key= is the key being signed rather than the key doing the signing. To change the key doing the signing refer to the signing key selection above for signing messages and files. If the =uids= value is not =None= then it must either be a string to match a single user ID or a list of strings to match multiple user IDs. In this case the matching of those strings must be precise and it is case sensitive. To sign Danger Mouse's key for just the initial user ID with a signature which will last a little over a month, do this: #+begin_src python import gpg c = gpg.Context() uid = "Danger Mouse " dmfpr = "177B7C25DB99745EE2EE13ED026D2F19E99E63AA" key = c.get_key(dmfpr, secret=True) c.key_sign(key, uids=uid, expires_in=2764800) #+end_src * Miscellaneous work-arounds :PROPERTIES: :CUSTOM_ID: cheats-and-hacks :END: ** Group lines :PROPERTIES: :CUSTOM_ID: group-lines :END: There is not yet an easy way to access groups configured in the gpg.conf file from within GPGME. As a consequence these central groupings of keys cannot be shared amongst multiple programs, such as MUAs readily. The following code, however, provides a work-around for obtaining this information in Python. #+begin_src python import subprocess lines = subprocess.getoutput("gpgconf --list-options gpg").splitlines() for i in range(len(lines)): if lines[i].startswith("group") is True: line = lines[i] else: pass groups = line.split(":")[-1].replace('"', '').split(',') group_lines = [] group_lists = [] for i in range(len(groups)): group_lines.append(groups[i].split("=")) group_lists.append(groups[i].split("=")) for i in range(len(group_lists)): group_lists[i][1] = group_lists[i][1].split() #+end_src The result of that code is that =group_lines= is a list of lists where =group_lines[i][0]= is the name of the group and =group_lines[i][1]= is the key IDs of the group as a string. The =group_lists= result is very similar in that it is a list of lists. The first part, =group_lists[i][0]= matches =group_lines[i][0]= as the name of the group, but =group_lists[i][1]= is the key IDs of the group as a string. A demonstration of using the =groups.py= module is also available in the form of the executable =mutt-groups.py= script. This second script reads all the group entries in a user's =gpg.conf= file and converts them into crypt-hooks suitable for use with the Mutt and Neomutt mail clients. * Copyright and Licensing :PROPERTIES: :CUSTOM_ID: copyright-and-license :END: ** Copyright (C) The GnuPG Project, 2018 :PROPERTIES: :CUSTOM_ID: copyright :END: Copyright © The GnuPG Project, 2018. ** License GPL compatible :PROPERTIES: :CUSTOM_ID: license :END: This file is free software; as a special exception the author gives unlimited permission to copy and/or distribute it, with or without modifications, as long as this notice is preserved. This file is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY, to the extent permitted by law; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * Footnotes [fn:1] =Short_History.org= and/or =Short_History.html=. [fn:2] The =lang/python/docs/= directory in the GPGME source. [fn:3] You probably don't really want to do this. Searching the keyservers for "gnupg.org" produces over 400 results, the majority of which aren't actually at the gnupg.org domain, but just included a comment regarding the project in their key somewhere. diff --git a/lang/python/examples/howto/import-key-file.py b/lang/python/examples/howto/import-key-file.py new file mode 100755 index 00000000..26ec5242 --- /dev/null +++ b/lang/python/examples/howto/import-key-file.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, division, unicode_literals + +# Copyright (C) 2018 Ben McGinnes +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License and the GNU +# Lesser General Public Licensefor more details. +# +# You should have received a copy of the GNU General Public License and the GNU +# Lesser General Public along with this program; if not, see +# . + +import gpg +import os.path + +print(""" +This script imports a key or keys matching a pattern from the SKS keyserver +pool. +""") + +c = gpg.Context() + +homedir = input("Enter the GPG configuration directory path (optional): ") +keyfile = input("Enter the path and filename to the file of key(s): ") + +if homedir.startswith("~"): + if os.path.exists(os.path.expanduser(homedir)) is True: + c.home_dir = os.path.expanduser(homedir) + else: + pass +elif os.path.exists(homedir) is True: + c.home_dir = homedir +else: + pass + +with open(keyfile, "rb") as f: + kdata = f.read() + +incoming = c.key_import(kdata) + +summary = """ +Total number of keys: {0} +Total number imported: {1} +Number of version 3 keys ignored: {2} + +Number of imported key objects or updates: {3} +Number of unchanged keys: {4} +Number of new signatures: {5} +Number of revoked keys: {6} +""".format(incoming.considered, len(incoming.imports), + incoming.skipped_v3_keys, incoming.imported, incoming.unchanged, + incoming.new_signatures, incoming.new_revocations) + +print(summary) + +# EOF diff --git a/lang/python/examples/howto/import-keys.py b/lang/python/examples/howto/import-keys.py new file mode 100755 index 00000000..455b8e0c --- /dev/null +++ b/lang/python/examples/howto/import-keys.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, division, unicode_literals + +# Copyright (C) 2018 Ben McGinnes +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License and the GNU +# Lesser General Public Licensefor more details. +# +# You should have received a copy of the GNU General Public License and the GNU +# Lesser General Public along with this program; if not, see +# . + +import gpg +import os.path +import requests + +print(""" +This script imports a key or keys matching a pattern from the SKS keyserver +pool. + +Uses the requests module. +""") + +c = gpg.Context() + +homedir = input("Enter the GPG configuration directory path (optional): ") +pattern = input("The pattern to search for in key or user IDs: ") +url = "https://sks-keyservers.net/pks/lookup" +payload = { "op": "get", "search": pattern } +hexload = { "op": "get", "search": "0x{0}".format(pattern) } + +if homedir.startswith("~"): + if os.path.exists(os.path.expanduser(homedir)) is True: + c.home_dir = os.path.expanduser(homedir) + else: + pass +elif os.path.exists(homedir) is True: + c.home_dir = homedir +else: + pass + +resp = requests.get(url, verify=True, params=payload) +if resp.ok is False: + rhex = requests.get(url, verify=True, params=hexload) + incoming = c.key_import(rhex.content) +else: + incoming = c.key_import(resp.content) + +summary = """ +Total number of keys: {0} +Total number imported: {1} +Number of version 3 keys ignored: {2} + +Number of imported key objects or updates: {3} +Number of unchanged keys: {4} +Number of new signatures: {5} +Number of revoked keys: {6} +""".format(incoming.considered, len(incoming.imports), + incoming.skipped_v3_keys, incoming.imported, incoming.unchanged, + incoming.new_signatures, incoming.new_revocations) + +print(summary) + +# EOF diff --git a/lang/python/src/core.py b/lang/python/src/core.py index bd95d231..10db3d69 100644 --- a/lang/python/src/core.py +++ b/lang/python/src/core.py @@ -1,1516 +1,1538 @@ # Copyright (C) 2016-2017 g10 Code GmbH # Copyright (C) 2004,2008 Igor Belyi # Copyright (C) 2002 John Goerzen # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """Core functionality Core functionality of GPGME wrapped in a object-oriented fashion. Provides the 'Context' class for performing cryptographic operations, and the 'Data' class describing buffers of data. """ from __future__ import absolute_import, print_function, unicode_literals del absolute_import, print_function, unicode_literals import re import os import warnings import weakref from . import gpgme from .errors import errorcheck, GPGMEError from . import constants from . import errors from . import util class GpgmeWrapper(object): """Base wrapper class Not to be instantiated directly. """ def __init__(self, wrapped): self._callback_excinfo = None self.wrapped = wrapped def __repr__(self): return '<{}/{!r}>'.format(super(GpgmeWrapper, self).__repr__(), self.wrapped) def __str__(self): acc = ['{}.{}'.format(__name__, self.__class__.__name__)] flags = [f for f in self._boolean_properties if getattr(self, f)] if flags: acc.append('({})'.format(' '.join(flags))) return '<{}>'.format(' '.join(acc)) def __hash__(self): return hash(repr(self.wrapped)) def __eq__(self, other): if other == None: return False else: return repr(self.wrapped) == repr(other.wrapped) @property def _ctype(self): """The name of the c type wrapped by this class Must be set by child classes. """ raise NotImplementedError() @property def _cprefix(self): """The common prefix of c functions wrapped by this class Must be set by child classes. """ raise NotImplementedError() def _errorcheck(self, name): """Must be implemented by child classes. This function must return a trueish value for all c functions returning gpgme_error_t.""" raise NotImplementedError() """The set of all boolean properties""" _boolean_properties = set() def __wrap_boolean_property(self, key, do_set=False, value=None): get_func = getattr(gpgme, "{}get_{}".format(self._cprefix, key)) set_func = getattr(gpgme, "{}set_{}".format(self._cprefix, key)) def get(slf): return bool(get_func(slf.wrapped)) def set_(slf, value): set_func(slf.wrapped, bool(value)) p = property(get, set_, doc="{} flag".format(key)) setattr(self.__class__, key, p) if do_set: set_(self, bool(value)) else: return get(self) _munge_docstring = re.compile(r'gpgme_([^(]*)\(([^,]*), (.*\) -> .*)') def __getattr__(self, key): """On-the-fly generation of wrapper methods and properties""" if key[0] == '_' or self._cprefix == None: return None if key in self._boolean_properties: return self.__wrap_boolean_property(key) name = self._cprefix + key func = getattr(gpgme, name) if self._errorcheck(name): def _funcwrap(slf, *args): result = func(slf.wrapped, *args) if slf._callback_excinfo: gpgme.gpg_raise_callback_exception(slf) return errorcheck(result, name) else: def _funcwrap(slf, *args): result = func(slf.wrapped, *args) if slf._callback_excinfo: gpgme.gpg_raise_callback_exception(slf) return result doc = self._munge_docstring.sub(r'\2.\1(\3', getattr(func, "__doc__")) _funcwrap.__doc__ = doc # Monkey-patch the class. setattr(self.__class__, key, _funcwrap) # Bind the method to 'self'. def wrapper(*args): return _funcwrap(self, *args) wrapper.__doc__ = doc return wrapper def __setattr__(self, key, value): """On-the-fly generation of properties""" if key in self._boolean_properties: self.__wrap_boolean_property(key, True, value) else: super(GpgmeWrapper, self).__setattr__(key, value) class Context(GpgmeWrapper): """Context for cryptographic operations All cryptographic operations in GPGME are performed within a context, which contains the internal state of the operation as well as configuration parameters. By using several contexts you can run several cryptographic operations in parallel, with different configuration. Access to a context must be synchronized. """ def __init__(self, armor=False, textmode=False, offline=False, signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT, protocol=constants.PROTOCOL_OpenPGP, wrapped=None, home_dir=None): """Construct a context object Keyword arguments: armor -- enable ASCII armoring (default False) textmode -- enable canonical text mode (default False) offline -- do not contact external key sources (default False) signers -- list of keys used for signing (default []) pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT) protocol -- protocol to use (default PROTOCOL_OpenPGP) home_dir -- state directory (default is the engine default) """ if wrapped: self.own = False else: tmp = gpgme.new_gpgme_ctx_t_p() errorcheck(gpgme.gpgme_new(tmp)) wrapped = gpgme.gpgme_ctx_t_p_value(tmp) gpgme.delete_gpgme_ctx_t_p(tmp) self.own = True super(Context, self).__init__(wrapped) self.armor = armor self.textmode = textmode self.offline = offline self.signers = signers self.pinentry_mode = pinentry_mode self.protocol = protocol self.home_dir = home_dir def __read__(self, sink, data): """Read helper Helper function to retrieve the results of an operation, or None if SINK is given. """ if sink or data == None: return None data.seek(0, os.SEEK_SET) return data.read() def __repr__(self): return ( "Context(armor={0.armor}, " "textmode={0.textmode}, offline={0.offline}, " "signers={0.signers}, pinentry_mode={0.pinentry_mode}, " "protocol={0.protocol}, home_dir={0.home_dir}" ")").format(self) def encrypt(self, plaintext, recipients=[], sign=True, sink=None, passphrase=None, always_trust=False, add_encrypt_to=False, prepare=False, expect_sign=False, compress=True): """Encrypt data Encrypt the given plaintext for the given recipients. If the list of recipients is empty, the data is encrypted symmetrically with a passphrase. The passphrase can be given as parameter, using a callback registered at the context, or out-of-band via pinentry. Keyword arguments: recipients -- list of keys to encrypt to sign -- sign plaintext (default True) sink -- write result to sink instead of returning it passphrase -- for symmetric encryption always_trust -- always trust the keys (default False) add_encrypt_to -- encrypt to configured additional keys (default False) prepare -- (ui) prepare for encryption (default False) expect_sign -- (ui) prepare for signing (default False) compress -- compress plaintext (default True) Returns: ciphertext -- the encrypted data (or None if sink is given) result -- additional information about the encryption sign_result -- additional information about the signature(s) Raises: InvalidRecipients -- if encryption using a particular key failed InvalidSigners -- if signing using a particular key failed GPGMEError -- as signaled by the underlying library """ ciphertext = sink if sink else Data() flags = 0 flags |= always_trust * constants.ENCRYPT_ALWAYS_TRUST flags |= (not add_encrypt_to) * constants.ENCRYPT_NO_ENCRYPT_TO flags |= prepare * constants.ENCRYPT_PREPARE flags |= expect_sign * constants.ENCRYPT_EXPECT_SIGN flags |= (not compress) * constants.ENCRYPT_NO_COMPRESS if passphrase != None: old_pinentry_mode = self.pinentry_mode old_passphrase_cb = getattr(self, '_passphrase_cb', None) self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK def passphrase_cb(hint, desc, prev_bad, hook=None): return passphrase self.set_passphrase_cb(passphrase_cb) try: if sign: self.op_encrypt_sign(recipients, flags, plaintext, ciphertext) else: self.op_encrypt(recipients, flags, plaintext, ciphertext) except errors.GPGMEError as e: result = self.op_encrypt_result() sig_result = self.op_sign_result() if sign else None results = (self.__read__(sink, ciphertext), result, sig_result) if e.getcode() == errors.UNUSABLE_PUBKEY: if result.invalid_recipients: raise errors.InvalidRecipients(result.invalid_recipients, error=e.error, results=results) if e.getcode() == errors.UNUSABLE_SECKEY: sig_result = self.op_sign_result() if sig_result.invalid_signers: raise errors.InvalidSigners(sig_result.invalid_signers, error=e.error, results=results) # Otherwise, just raise the error, but attach the results # first. e.results = results raise e finally: if passphrase != None: self.pinentry_mode = old_pinentry_mode if old_passphrase_cb: self.set_passphrase_cb(*old_passphrase_cb[1:]) result = self.op_encrypt_result() assert not result.invalid_recipients sig_result = self.op_sign_result() if sign else None assert not sig_result or not sig_result.invalid_signers return self.__read__(sink, ciphertext), result, sig_result def decrypt(self, ciphertext, sink=None, passphrase=None, verify=True): """Decrypt data Decrypt the given ciphertext and verify any signatures. If VERIFY is an iterable of keys, the ciphertext must be signed by all those keys, otherwise an error is raised. If the ciphertext is symmetrically encrypted using a passphrase, that passphrase can be given as parameter, using a callback registered at the context, or out-of-band via pinentry. Keyword arguments: sink -- write result to sink instead of returning it passphrase -- for symmetric decryption verify -- check signatures (default True) Returns: plaintext -- the decrypted data (or None if sink is given) result -- additional information about the decryption verify_result -- additional information about the signature(s) Raises: UnsupportedAlgorithm -- if an unsupported algorithm was used BadSignatures -- if a bad signature is encountered MissingSignatures -- if expected signatures are missing or bad GPGMEError -- as signaled by the underlying library """ plaintext = sink if sink else Data() if passphrase != None: old_pinentry_mode = self.pinentry_mode old_passphrase_cb = getattr(self, '_passphrase_cb', None) self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK def passphrase_cb(hint, desc, prev_bad, hook=None): return passphrase self.set_passphrase_cb(passphrase_cb) try: if verify: self.op_decrypt_verify(ciphertext, plaintext) else: self.op_decrypt(ciphertext, plaintext) except errors.GPGMEError as e: result = self.op_decrypt_result() verify_result = self.op_verify_result() if verify else None # Just raise the error, but attach the results first. e.results = (self.__read__(sink, plaintext), result, verify_result) raise e finally: if passphrase != None: self.pinentry_mode = old_pinentry_mode if old_passphrase_cb: self.set_passphrase_cb(*old_passphrase_cb[1:]) result = self.op_decrypt_result() verify_result = self.op_verify_result() if verify else None results = (self.__read__(sink, plaintext), result, verify_result) if result.unsupported_algorithm: raise errors.UnsupportedAlgorithm(result.unsupported_algorithm, results=results) if verify: if any(s.status != errors.NO_ERROR for s in verify_result.signatures): raise errors.BadSignatures(verify_result, results=results) if verify and verify != True: missing = list() for key in verify: ok = False for subkey in key.subkeys: for sig in verify_result.signatures: if sig.summary & constants.SIGSUM_VALID == 0: continue if subkey.can_sign and subkey.fpr == sig.fpr: ok = True break if ok: break if not ok: missing.append(key) if missing: raise errors.MissingSignatures(verify_result, missing, results=results) return results def sign(self, data, sink=None, mode=constants.SIG_MODE_NORMAL): """Sign data Sign the given data with either the configured default local key, or the 'signers' keys of this context. Keyword arguments: mode -- signature mode (default: normal, see below) sink -- write result to sink instead of returning it Returns: either signed_data -- encoded data and signature (normal mode) signature -- only the signature data (detached mode) cleartext -- data and signature as text (cleartext mode) (or None if sink is given) result -- additional information about the signature(s) Raises: InvalidSigners -- if signing using a particular key failed GPGMEError -- as signaled by the underlying library """ signeddata = sink if sink else Data() try: self.op_sign(data, signeddata, mode) except errors.GPGMEError as e: results = (self.__read__(sink, signeddata), self.op_sign_result()) if e.getcode() == errors.UNUSABLE_SECKEY: if results[1].invalid_signers: raise errors.InvalidSigners(results[1].invalid_signers, error=e.error, results=results) e.results = results raise e result = self.op_sign_result() assert not result.invalid_signers return self.__read__(sink, signeddata), result def verify(self, signed_data, signature=None, sink=None, verify=[]): """Verify signatures Verify signatures over data. If VERIFY is an iterable of keys, the ciphertext must be signed by all those keys, otherwise an error is raised. Keyword arguments: signature -- detached signature data sink -- write result to sink instead of returning it Returns: data -- the plain data (or None if sink is given, or we verified a detached signature) result -- additional information about the signature(s) Raises: BadSignatures -- if a bad signature is encountered MissingSignatures -- if expected signatures are missing or bad GPGMEError -- as signaled by the underlying library """ if signature: # Detached signature, we don't return the plain text. data = None else: data = sink if sink else Data() try: if signature: self.op_verify(signature, signed_data, None) else: self.op_verify(signed_data, None, data) except errors.GPGMEError as e: # Just raise the error, but attach the results first. e.results = (self.__read__(sink, data), self.op_verify_result()) raise e results = (self.__read__(sink, data), self.op_verify_result()) if any(s.status != errors.NO_ERROR for s in results[1].signatures): raise errors.BadSignatures(results[1], results=results) missing = list() for key in verify: ok = False for subkey in key.subkeys: for sig in results[1].signatures: if sig.summary & constants.SIGSUM_VALID == 0: continue if subkey.can_sign and subkey.fpr == sig.fpr: ok = True break if ok: break if not ok: missing.append(key) if missing: raise errors.MissingSignatures(results[1], missing, results=results) return results + def key_import(self, keydata): + """Importing keys + + Arguments: + keydata -- Binary or ASCII armored key(s) to be imported + + Returns + -- an object describing the results of keys imported or + updated + + Raises: + GPGMEError -- as signaled by the underlying library + """ + if keydata is not None: + try: + self.op_import(keydata) + result = self.op_import_result() + except GPGMEError as e: + result = e + else: + result = "No keys found." + def keylist(self, pattern=None, secret=False, mode=constants.keylist.mode.LOCAL, source=None): """List keys Keyword arguments: pattern -- return keys matching pattern (default: all keys) secret -- return only secret keys (default: False) mode -- keylist mode (default: list local keys) source -- read keys from source instead from the keyring (all other options are ignored in this case) Returns: -- an iterator returning key objects Raises: GPGMEError -- as signaled by the underlying library """ if not source: self.set_keylist_mode(mode) self.op_keylist_start(pattern, secret) else: # Automatic wrapping of SOURCE is not possible here, # because the object must not be deallocated until the # iteration over the results ends. if not isinstance(source, Data): source = Data(file=source) self.op_keylist_from_data_start(source, 0) key = self.op_keylist_next() while key: yield key key = self.op_keylist_next() self.op_keylist_end() def create_key(self, userid, algorithm=None, expires_in=0, expires=True, sign=False, encrypt=False, certify=False, authenticate=False, passphrase=None, force=False): """Create a primary key Create a primary key for the user id USERID. ALGORITHM may be used to specify the public key encryption algorithm for the new key. By default, a reasonable default is chosen. You may use "future-default" to select an algorithm that will be the default in a future implementation of the engine. ALGORITHM may be a string like "rsa", or "rsa2048" to explicitly request an algorithm and a key size. EXPIRES_IN specifies the expiration time of the key in number of seconds since the keys creation. By default, a reasonable expiration time is chosen. If you want to create a key that does not expire, use the keyword argument EXPIRES. SIGN, ENCRYPT, CERTIFY, and AUTHENTICATE can be used to request the capabilities of the new key. If you don't request any, a reasonable set of capabilities is selected, and in case of OpenPGP, a subkey with a reasonable set of capabilities is created. If PASSPHRASE is None (the default), then the key will not be protected with a passphrase. If PASSPHRASE is a string, it will be used to protect the key. If PASSPHRASE is True, the passphrase must be supplied using a passphrase callback or out-of-band with a pinentry. Keyword arguments: algorithm -- public key algorithm, see above (default: reasonable) expires_in -- expiration time in seconds (default: reasonable) expires -- whether or not the key should expire (default: True) sign -- request the signing capability (see above) encrypt -- request the encryption capability (see above) certify -- request the certification capability (see above) authenticate -- request the authentication capability (see above) passphrase -- protect the key with a passphrase (default: no passphrase) force -- force key creation even if a key with the same userid exists (default: False) Returns: -- an object describing the result of the key creation Raises: GPGMEError -- as signaled by the underlying library """ if util.is_a_string(passphrase): old_pinentry_mode = self.pinentry_mode old_passphrase_cb = getattr(self, '_passphrase_cb', None) self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK def passphrase_cb(hint, desc, prev_bad, hook=None): return passphrase self.set_passphrase_cb(passphrase_cb) try: self.op_createkey(userid, algorithm, 0, # reserved expires_in, None, # extrakey ((constants.create.SIGN if sign else 0) | (constants.create.ENCR if encrypt else 0) | (constants.create.CERT if certify else 0) | (constants.create.AUTH if authenticate else 0) | (constants.create.NOPASSWD if passphrase == None else 0) | (0 if expires else constants.create.NOEXPIRE) | (constants.create.FORCE if force else 0))) finally: if util.is_a_string(passphrase): self.pinentry_mode = old_pinentry_mode if old_passphrase_cb: self.set_passphrase_cb(*old_passphrase_cb[1:]) return self.op_genkey_result() def create_subkey(self, key, algorithm=None, expires_in=0, expires=True, sign=False, encrypt=False, authenticate=False, passphrase=None): """Create a subkey Create a subkey for the given KEY. As subkeys are a concept of OpenPGP, calling this is only valid for the OpenPGP protocol. ALGORITHM may be used to specify the public key encryption algorithm for the new subkey. By default, a reasonable default is chosen. You may use "future-default" to select an algorithm that will be the default in a future implementation of the engine. ALGORITHM may be a string like "rsa", or "rsa2048" to explicitly request an algorithm and a key size. EXPIRES_IN specifies the expiration time of the subkey in number of seconds since the subkeys creation. By default, a reasonable expiration time is chosen. If you want to create a subkey that does not expire, use the keyword argument EXPIRES. SIGN, ENCRYPT, and AUTHENTICATE can be used to request the capabilities of the new subkey. If you don't request any, an encryption subkey is generated. If PASSPHRASE is None (the default), then the subkey will not be protected with a passphrase. If PASSPHRASE is a string, it will be used to protect the subkey. If PASSPHRASE is True, the passphrase must be supplied using a passphrase callback or out-of-band with a pinentry. Keyword arguments: algorithm -- public key algorithm, see above (default: reasonable) expires_in -- expiration time in seconds (default: reasonable) expires -- whether or not the subkey should expire (default: True) sign -- request the signing capability (see above) encrypt -- request the encryption capability (see above) authenticate -- request the authentication capability (see above) passphrase -- protect the subkey with a passphrase (default: no passphrase) Returns: -- an object describing the result of the subkey creation Raises: GPGMEError -- as signaled by the underlying library """ if util.is_a_string(passphrase): old_pinentry_mode = self.pinentry_mode old_passphrase_cb = getattr(self, '_passphrase_cb', None) self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK def passphrase_cb(hint, desc, prev_bad, hook=None): return passphrase self.set_passphrase_cb(passphrase_cb) try: self.op_createsubkey(key, algorithm, 0, # reserved expires_in, ((constants.create.SIGN if sign else 0) | (constants.create.ENCR if encrypt else 0) | (constants.create.AUTH if authenticate else 0) | (constants.create.NOPASSWD if passphrase == None else 0) | (0 if expires else constants.create.NOEXPIRE))) finally: if util.is_a_string(passphrase): self.pinentry_mode = old_pinentry_mode if old_passphrase_cb: self.set_passphrase_cb(*old_passphrase_cb[1:]) return self.op_genkey_result() def key_add_uid(self, key, uid): """Add a UID Add the uid UID to the given KEY. Calling this function is only valid for the OpenPGP protocol. Raises: GPGMEError -- as signaled by the underlying library """ self.op_adduid(key, uid, 0) def key_revoke_uid(self, key, uid): """Revoke a UID Revoke the uid UID from the given KEY. Calling this function is only valid for the OpenPGP protocol. Raises: GPGMEError -- as signaled by the underlying library """ self.op_revuid(key, uid, 0) def key_sign(self, key, uids=None, expires_in=False, local=False): """Sign a key Sign a key with the current set of signing keys. Calling this function is only valid for the OpenPGP protocol. If UIDS is None (the default), then all UIDs are signed. If it is a string, then only the matching UID is signed. If it is a list of strings, then all matching UIDs are signed. Note that a case-sensitive exact string comparison is done. EXPIRES_IN specifies the expiration time of the signature in seconds. If EXPIRES_IN is False, the signature does not expire. Keyword arguments: uids -- user ids to sign, see above (default: sign all) expires_in -- validity period of the signature in seconds (default: do not expire) local -- create a local, non-exportable signature (default: False) Raises: GPGMEError -- as signaled by the underlying library """ flags = 0 if uids == None or util.is_a_string(uids): pass#through unchanged else: flags |= constants.keysign.LFSEP uids = "\n".join(uids) if not expires_in: flags |= constants.keysign.NOEXPIRE if local: flags |= constants.keysign.LOCAL self.op_keysign(key, uids, expires_in, flags) def key_tofu_policy(self, key, policy): """Set a keys' TOFU policy Set the TOFU policy associated with KEY to POLICY. Calling this function is only valid for the OpenPGP protocol. Raises: GPGMEError -- as signaled by the underlying library """ self.op_tofu_policy(key, policy) def assuan_transact(self, command, data_cb=None, inquire_cb=None, status_cb=None): """Issue a raw assuan command This function can be used to issue a raw assuan command to the engine. If command is a string or bytes, it will be used as-is. If it is an iterable of strings, it will be properly escaped and joined into an well-formed assuan command. Keyword arguments: data_cb -- a callback receiving data lines inquire_cb -- a callback providing more information status_cb -- a callback receiving status lines Returns: result -- the result of command as GPGMEError Raises: GPGMEError -- as signaled by the underlying library """ if util.is_a_string(command) or isinstance(command, bytes): cmd = command else: cmd = " ".join(util.percent_escape(f) for f in command) errptr = gpgme.new_gpgme_error_t_p() err = gpgme.gpgme_op_assuan_transact_ext( self.wrapped, cmd, (weakref.ref(self), data_cb) if data_cb else None, (weakref.ref(self), inquire_cb) if inquire_cb else None, (weakref.ref(self), status_cb) if status_cb else None, errptr) if self._callback_excinfo: gpgme.gpg_raise_callback_exception(self) errorcheck(err) status = gpgme.gpgme_error_t_p_value(errptr) gpgme.delete_gpgme_error_t_p(errptr) return GPGMEError(status) if status != 0 else None def interact(self, key, func, sink=None, flags=0, fnc_value=None): """Interact with the engine This method can be used to edit keys and cards interactively. KEY is the key to edit, FUNC is called repeatedly with two unicode arguments, 'keyword' and 'args'. See the GPGME manual for details. Keyword arguments: sink -- if given, additional output is written here flags -- use constants.INTERACT_CARD to edit a card Raises: GPGMEError -- as signaled by the underlying library """ if key == None: raise ValueError("First argument cannot be None") if sink == None: sink = Data() if fnc_value: opaquedata = (weakref.ref(self), func, fnc_value) else: opaquedata = (weakref.ref(self), func) result = gpgme.gpgme_op_interact(self.wrapped, key, flags, opaquedata, sink) if self._callback_excinfo: gpgme.gpg_raise_callback_exception(self) errorcheck(result) @property def signers(self): """Keys used for signing""" return [self.signers_enum(i) for i in range(self.signers_count())] @signers.setter def signers(self, signers): old = self.signers self.signers_clear() try: for key in signers: self.signers_add(key) except: self.signers = old raise @property def pinentry_mode(self): """Pinentry mode""" return self.get_pinentry_mode() @pinentry_mode.setter def pinentry_mode(self, value): self.set_pinentry_mode(value) @property def protocol(self): """Protocol to use""" return self.get_protocol() @protocol.setter def protocol(self, value): errorcheck(gpgme.gpgme_engine_check_version(value)) self.set_protocol(value) @property def home_dir(self): """Engine's home directory""" return self.engine_info.home_dir @home_dir.setter def home_dir(self, value): self.set_engine_info(self.protocol, home_dir=value) _ctype = 'gpgme_ctx_t' _cprefix = 'gpgme_' def _errorcheck(self, name): """This function should list all functions returning gpgme_error_t""" # The list of functions is created using: # # $ grep '^gpgme_error_t ' obj/lang/python/python3.5-gpg/gpgme.h \ # | grep -v _op_ | awk "/\(gpgme_ctx/ { printf (\"'%s',\\n\", \$2) } " return ((name.startswith('gpgme_op_') and not name.endswith('_result')) or name in { 'gpgme_new', 'gpgme_set_ctx_flag', 'gpgme_set_protocol', 'gpgme_set_sub_protocol', 'gpgme_set_keylist_mode', 'gpgme_set_pinentry_mode', 'gpgme_set_locale', 'gpgme_ctx_set_engine_info', 'gpgme_signers_add', 'gpgme_sig_notation_add', 'gpgme_set_sender', 'gpgme_cancel', 'gpgme_cancel_async', 'gpgme_get_key', }) _boolean_properties = {'armor', 'textmode', 'offline'} def __del__(self): if not gpgme: # At interpreter shutdown, gpgme is set to NONE. return self._free_passcb() self._free_progresscb() self._free_statuscb() if self.own and self.wrapped and gpgme.gpgme_release: gpgme.gpgme_release(self.wrapped) self.wrapped = None # Implement the context manager protocol. def __enter__(self): return self def __exit__(self, type, value, tb): self.__del__() def op_keylist_all(self, *args, **kwargs): self.op_keylist_start(*args, **kwargs) key = self.op_keylist_next() while key: yield key key = self.op_keylist_next() self.op_keylist_end() def op_keylist_next(self): """Returns the next key in the list created by a call to op_keylist_start(). The object returned is of type Key.""" ptr = gpgme.new_gpgme_key_t_p() try: errorcheck(gpgme.gpgme_op_keylist_next(self.wrapped, ptr)) key = gpgme.gpgme_key_t_p_value(ptr) except errors.GPGMEError as excp: key = None if excp.getcode() != errors.EOF: raise excp gpgme.delete_gpgme_key_t_p(ptr) if key: key.__del__ = lambda self: gpgme.gpgme_key_unref(self) return key def get_key(self, fpr, secret=False): """Get a key given a fingerprint Keyword arguments: secret -- to request a secret key Returns: -- the matching key Raises: KeyError -- if the key was not found GPGMEError -- as signaled by the underlying library """ ptr = gpgme.new_gpgme_key_t_p() try: errorcheck(gpgme.gpgme_get_key(self.wrapped, fpr, ptr, secret)) except errors.GPGMEError as e: if e.getcode() == errors.EOF: raise errors.KeyNotFound(fpr) raise e key = gpgme.gpgme_key_t_p_value(ptr) gpgme.delete_gpgme_key_t_p(ptr) assert key key.__del__ = lambda self: gpgme.gpgme_key_unref(self) return key def op_trustlist_all(self, *args, **kwargs): self.op_trustlist_start(*args, **kwargs) trust = self.op_trustlist_next() while trust: yield trust trust = self.op_trustlist_next() self.op_trustlist_end() def op_trustlist_next(self): """Returns the next trust item in the list created by a call to op_trustlist_start(). The object returned is of type TrustItem.""" ptr = gpgme.new_gpgme_trust_item_t_p() try: errorcheck(gpgme.gpgme_op_trustlist_next(self.wrapped, ptr)) trust = gpgme.gpgme_trust_item_t_p_value(ptr) except errors.GPGMEError as excp: trust = None if excp.getcode() != errors.EOF: raise gpgme.delete_gpgme_trust_item_t_p(ptr) return trust def set_passphrase_cb(self, func, hook=None): """Sets the passphrase callback to the function specified by func. When the system needs a passphrase, it will call func with three args: hint, a string describing the key it needs the passphrase for; desc, a string describing the passphrase it needs; prev_bad, a boolean equal True if this is a call made after unsuccessful previous attempt. If hook has a value other than None it will be passed into the func as a forth argument. Please see the GPGME manual for more information. """ if func == None: hookdata = None else: if hook == None: hookdata = (weakref.ref(self), func) else: hookdata = (weakref.ref(self), func, hook) gpgme.gpg_set_passphrase_cb(self, hookdata) def _free_passcb(self): if gpgme.gpg_set_passphrase_cb: self.set_passphrase_cb(None) def set_progress_cb(self, func, hook=None): """Sets the progress meter callback to the function specified by FUNC. If FUNC is None, the callback will be cleared. This function will be called to provide an interactive update of the system's progress. The function will be called with three arguments, type, total, and current. If HOOK is not None, it will be supplied as fourth argument. Please see the GPGME manual for more information. """ if func == None: hookdata = None else: if hook == None: hookdata = (weakref.ref(self), func) else: hookdata = (weakref.ref(self), func, hook) gpgme.gpg_set_progress_cb(self, hookdata) def _free_progresscb(self): if gpgme.gpg_set_progress_cb: self.set_progress_cb(None) def set_status_cb(self, func, hook=None): """Sets the status callback to the function specified by FUNC. If FUNC is None, the callback will be cleared. The function will be called with two arguments, keyword and args. If HOOK is not None, it will be supplied as third argument. Please see the GPGME manual for more information. """ if func == None: hookdata = None else: if hook == None: hookdata = (weakref.ref(self), func) else: hookdata = (weakref.ref(self), func, hook) gpgme.gpg_set_status_cb(self, hookdata) def _free_statuscb(self): if gpgme.gpg_set_status_cb: self.set_status_cb(None) @property def engine_info(self): """Configuration of the engine currently in use""" p = self.protocol infos = [i for i in self.get_engine_info() if i.protocol == p] assert len(infos) == 1 return infos[0] def get_engine_info(self): """Get engine configuration Returns information about all configured and installed engines. Returns: infos -- a list of engine infos """ return gpgme.gpgme_ctx_get_engine_info(self.wrapped) def set_engine_info(self, proto, file_name=None, home_dir=None): """Change engine configuration Changes the configuration of the crypto engine implementing the protocol 'proto' for the context. Keyword arguments: file_name -- engine program file name (unchanged if None) home_dir -- configuration directory (unchanged if None) """ self.ctx_set_engine_info(proto, file_name, home_dir) def wait(self, hang): """Wait for asynchronous call to finish. Wait forever if hang is True. Raises an exception on errors. Please read the GPGME manual for more information. """ ptr = gpgme.new_gpgme_error_t_p() gpgme.gpgme_wait(self.wrapped, ptr, hang) status = gpgme.gpgme_error_t_p_value(ptr) gpgme.delete_gpgme_error_t_p(ptr) errorcheck(status) def op_edit(self, key, func, fnc_value, out): """Start key editing using supplied callback function Note: This interface is deprecated and will be removed with GPGME 1.8. Please use .interact instead. Furthermore, we implement this using gpgme_op_interact, so callbacks will get called with string keywords instead of numeric status messages. Code that is using constants.STATUS_X or constants.status.X will continue to work, whereas code using magic numbers will break as a result. """ warnings.warn("Call to deprecated method op_edit.", category=DeprecationWarning) return self.interact(key, func, sink=out, fnc_value=fnc_value) class Data(GpgmeWrapper): """Data buffer A lot of data has to be exchanged between the user and the crypto engine, like plaintext messages, ciphertext, signatures and information about the keys. The technical details about exchanging the data information are completely abstracted by GPGME. The user provides and receives the data via `gpgme_data_t' objects, regardless of the communication protocol between GPGME and the crypto engine in use. This Data class is the implementation of the GpgmeData objects. Please see the information about __init__ for instantiation. """ _ctype = 'gpgme_data_t' _cprefix = 'gpgme_data_' def _errorcheck(self, name): """This function should list all functions returning gpgme_error_t""" # This list is compiled using # # $ grep -v '^gpgme_error_t ' obj/lang/python/python3.5-gpg/gpgme.h \ # | awk "/\(gpgme_data_t/ { printf (\"'%s',\\n\", \$2) } " | sed "s/'\\*/'/" return name not in { 'gpgme_data_read', 'gpgme_data_write', 'gpgme_data_seek', 'gpgme_data_release', 'gpgme_data_release_and_get_mem', 'gpgme_data_get_encoding', 'gpgme_data_get_file_name', 'gpgme_data_identify', } def __init__(self, string=None, file=None, offset=None, length=None, cbs=None, copy=True): """Initialize a new gpgme_data_t object. If no args are specified, make it an empty object. If string alone is specified, initialize it with the data contained there. If file, offset, and length are all specified, file must be either a filename or a file-like object, and the object will be initialized by reading the specified chunk from the file. If cbs is specified, it MUST be a tuple of the form: (read_cb, write_cb, seek_cb, release_cb[, hook]) where the first four items are functions implementing reading, writing, seeking the data, and releasing any resources once the data object is deallocated. The functions must match the following prototypes: def read(amount, hook=None): return def write(data, hook=None): return def seek(offset, whence, hook=None): return def release(hook=None): The functions may be bound methods. In that case, you can simply use the 'self' reference instead of using a hook. If file is specified without any other arguments, then it must be a filename, and the object will be initialized from that file. """ super(Data, self).__init__(None) self.data_cbs = None if cbs != None: self.new_from_cbs(*cbs) elif string != None: self.new_from_mem(string, copy) elif file != None and offset != None and length != None: self.new_from_filepart(file, offset, length) elif file != None: if util.is_a_string(file): self.new_from_file(file, copy) else: self.new_from_fd(file) else: self.new() def __del__(self): if not gpgme: # At interpreter shutdown, gpgme is set to NONE. return if self.wrapped != None and gpgme.gpgme_data_release: gpgme.gpgme_data_release(self.wrapped) if self._callback_excinfo: gpgme.gpg_raise_callback_exception(self) self.wrapped = None self._free_datacbs() # Implement the context manager protocol. def __enter__(self): return self def __exit__(self, type, value, tb): self.__del__() def _free_datacbs(self): self._data_cbs = None def new(self): tmp = gpgme.new_gpgme_data_t_p() errorcheck(gpgme.gpgme_data_new(tmp)) self.wrapped = gpgme.gpgme_data_t_p_value(tmp) gpgme.delete_gpgme_data_t_p(tmp) def new_from_mem(self, string, copy=True): tmp = gpgme.new_gpgme_data_t_p() errorcheck(gpgme.gpgme_data_new_from_mem(tmp,string,len(string),copy)) self.wrapped = gpgme.gpgme_data_t_p_value(tmp) gpgme.delete_gpgme_data_t_p(tmp) def new_from_file(self, filename, copy=True): tmp = gpgme.new_gpgme_data_t_p() try: errorcheck(gpgme.gpgme_data_new_from_file(tmp, filename, copy)) except errors.GPGMEError as e: if e.getcode() == errors.INV_VALUE and not copy: raise ValueError("delayed reads are not yet supported") else: raise e self.wrapped = gpgme.gpgme_data_t_p_value(tmp) gpgme.delete_gpgme_data_t_p(tmp) def new_from_cbs(self, read_cb, write_cb, seek_cb, release_cb, hook=None): tmp = gpgme.new_gpgme_data_t_p() if hook != None: hookdata = (weakref.ref(self), read_cb, write_cb, seek_cb, release_cb, hook) else: hookdata = (weakref.ref(self), read_cb, write_cb, seek_cb, release_cb) gpgme.gpg_data_new_from_cbs(self, hookdata, tmp) self.wrapped = gpgme.gpgme_data_t_p_value(tmp) gpgme.delete_gpgme_data_t_p(tmp) def new_from_filepart(self, file, offset, length): """This wraps the GPGME gpgme_data_new_from_filepart() function. The argument "file" may be: * a string specifying a file name, or * a file-like object supporting the fileno() and the mode attribute. """ tmp = gpgme.new_gpgme_data_t_p() filename = None fp = None if util.is_a_string(file): filename = file else: fp = gpgme.fdopen(file.fileno(), file.mode) if fp == None: raise ValueError("Failed to open file from %s arg %s" % \ (str(type(file)), str(file))) errorcheck(gpgme.gpgme_data_new_from_filepart(tmp, filename, fp, offset, length)) self.wrapped = gpgme.gpgme_data_t_p_value(tmp) gpgme.delete_gpgme_data_t_p(tmp) def new_from_fd(self, file): """This wraps the GPGME gpgme_data_new_from_fd() function. The argument "file" must be a file-like object, supporting the fileno() method. """ tmp = gpgme.new_gpgme_data_t_p() errorcheck(gpgme.gpgme_data_new_from_fd(tmp, file.fileno())) self.wrapped = gpgme.gpgme_data_t_p_value(tmp) gpgme.delete_gpgme_data_t_p(tmp) def new_from_stream(self, file): """This wrap around gpgme_data_new_from_stream is an alias for new_from_fd() method since in python there's not difference between file stream and file descriptor""" self.new_from_fd(file) def write(self, buffer): """Write buffer given as string or bytes. If a string is given, it is implicitly encoded using UTF-8.""" written = gpgme.gpgme_data_write(self.wrapped, buffer) if written < 0: if self._callback_excinfo: gpgme.gpg_raise_callback_exception(self) else: raise GPGMEError.fromSyserror() return written def read(self, size = -1): """Read at most size bytes, returned as bytes. If the size argument is negative or omitted, read until EOF is reached. Returns the data read, or the empty string if there was no data to read before EOF was reached.""" if size == 0: return '' if size > 0: try: result = gpgme.gpgme_data_read(self.wrapped, size) except: if self._callback_excinfo: gpgme.gpg_raise_callback_exception(self) else: raise return result else: chunks = [] while True: try: result = gpgme.gpgme_data_read(self.wrapped, 4096) except: if self._callback_excinfo: gpgme.gpg_raise_callback_exception(self) else: raise if len(result) == 0: break chunks.append(result) return b''.join(chunks) def pubkey_algo_string(subkey): """Return short algorithm string Return a public key algorithm string (e.g. "rsa2048") for a given SUBKEY. Returns: algo - a string """ return gpgme.gpgme_pubkey_algo_string(subkey) def pubkey_algo_name(algo): """Return name of public key algorithm Return the name of the public key algorithm for a given numeric algorithm id ALGO (cf. RFC4880). Returns: algo - a string """ return gpgme.gpgme_pubkey_algo_name(algo) def hash_algo_name(algo): """Return name of hash algorithm Return the name of the hash algorithm for a given numeric algorithm id ALGO (cf. RFC4880). Returns: algo - a string """ return gpgme.gpgme_hash_algo_name(algo) def get_protocol_name(proto): """Get protocol description Get the string describing protocol PROTO. Returns: proto - a string """ return gpgme.gpgme_get_protocol_name(proto) def addrspec_from_uid(uid): """Return the address spec Return the addr-spec (cf. RFC2822 section 4.3) from a user id UID. Returns: addr_spec - a string """ return gpgme.gpgme_addrspec_from_uid(uid) def check_version(version=None): return gpgme.gpgme_check_version(version) # check_version also makes sure that several subsystems are properly # initialized, and it must be run at least once before invoking any # other function. We do it here so that the user does not have to do # it unless she really wants to check for a certain version. check_version() def engine_check_version (proto): try: errorcheck(gpgme.gpgme_engine_check_version(proto)) return True except errors.GPGMEError: return False def get_engine_info(): ptr = gpgme.new_gpgme_engine_info_t_p() try: errorcheck(gpgme.gpgme_get_engine_info(ptr)) info = gpgme.gpgme_engine_info_t_p_value(ptr) except errors.GPGMEError: info = None gpgme.delete_gpgme_engine_info_t_p(ptr) return info def set_engine_info(proto, file_name, home_dir=None): """Changes the default configuration of the crypto engine implementing the protocol 'proto'. 'file_name' is the file name of the executable program implementing this protocol. 'home_dir' is the directory name of the configuration directory (engine's default is used if omitted).""" errorcheck(gpgme.gpgme_set_engine_info(proto, file_name, home_dir)) def set_locale(category, value): """Sets the default locale used by contexts""" errorcheck(gpgme.gpgme_set_locale(None, category, value)) def wait(hang): """Wait for asynchronous call on any Context to finish. Wait forever if hang is True. For finished anynch calls it returns a tuple (status, context): status - status return by asnynchronous call. context - context which caused this call to return. Please read the GPGME manual of more information.""" ptr = gpgme.new_gpgme_error_t_p() context = gpgme.gpgme_wait(None, ptr, hang) status = gpgme.gpgme_error_t_p_value(ptr) gpgme.delete_gpgme_error_t_p(ptr) if context == None: errorcheck(status) else: context = Context(context) return (status, context)