diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md new file mode 100644 index 0000000..a412821 --- /dev/null +++ b/CONTRIBUTORS.md @@ -0,0 +1,4 @@ +Contributors +============ +* James Kizer +* Alan Viars diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..d1b0c43 --- /dev/null +++ b/setup.py @@ -0,0 +1,93 @@ +from distutils.core import setup +from distutils.command.install_data import install_data +from distutils.command.install import INSTALL_SCHEMES +import os +import sys + + +class osx_install_data(install_data): + # On MacOS, the platform-specific lib dir is /System/Library/Framework/Python/.../ + # which is wrong. Python 2.5 supplied with MacOS 10.5 has an Apple-specific fix + # for this in distutils.command.install_data#306. It fixes install_lib but not + # install_data, which is why we roll our own install_data class. + + def finalize_options(self): + # By the time finalize_options is called, install.install_lib is set to the + # fixed directory, so we set the installdir to install_lib. The + # install_data class uses ('install_data', 'install_dir') instead. + self.set_undefined_options('install', ('install_lib', 'install_dir')) + install_data.finalize_options(self) + +if sys.platform == "darwin": + cmdclasses = {'install_data': osx_install_data} +else: + cmdclasses = {'install_data': install_data} + + +def fullsplit(path, result=None): + """ + Split a pathname into components (the opposite of os.path.join) in a + platform-neutral way. + """ + if result is None: + result = [] + head, tail = os.path.split(path) + if head == '': + return [tail] + result + if head == path: + return result + return fullsplit(head, [tail] + result) + +# Tell distutils to put the data_files in platform-specific installation +# locations. See here for an explanation: +# http://groups.google.com/group/comp.lang.python/browse_thread/thread/35ec7b2fed36eaec/2105ee4d9e8042cb +for scheme in INSTALL_SCHEMES.values(): + scheme['data'] = scheme['purelib'] + +# Compile the list of packages available, because distutils doesn't have +# an easy way to do this. +packages, data_files = [], [] +root_dir = os.path.dirname(__file__) +if root_dir != '': + os.chdir(root_dir) +shc_dir = '' + +for dirpath, dirnames, filenames in os.walk(shc_dir): + # Ignore dirnames that start with '.' + for i, dirname in enumerate(dirnames): + if dirname.startswith('.'): + del dirnames[i] + if '__init__.py' in filenames: + packages.append('.'.join(fullsplit(dirpath))) + elif filenames: + data_files.append([dirpath, [os.path.join(dirpath, f) + for f in filenames]]) + +# Small hack for working with bdist_wininst. +# See http://mail.python.org/pipermail/distutils-sig/2004-August/004134.html +if len(sys.argv) > 1 and sys.argv[1] == 'bdist_wininst': + for file_info in data_files: + file_info[0] = '\\PURELIB\\%s' % file_info[0] + + +setup( + name="shc", + version="0.0.4", + description="SMART Health Card Tools", + long_description="""A collection of scripts and utilites for working with SMART Health Cards on the command line and in Python.""", + author="James Kizer, Alan Viars", + author_email="sales@videntity.com", + url="https://github.com/TransparentHealth/healthcards_python_sample_scripts", + download_url="https://github.com/TransparentHealth/healthcards_python_sample_scripts/tarball/master", + install_requires=[ + 'jwcrypto', 'canonicaljson', 'requests', 'py-multihash', 'PyLD', 'python-jose[cryptography]', 'qrcode[pil]' + ], + packages=['shc',], + scripts=[ + 'shc/decode_resource.py', + 'shc/decode_resource_local.py', + 'shc/encode_resource.py', + 'shc/generate_random_jwks.py', + 'shc/encode_resource_in_qr_code.py', + 'shc/encode_resource_file_in_qr_code.py', + ]) diff --git a/shc/__init__.py b/shc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/decode_inline.py b/shc/decode_inline.py similarity index 77% rename from decode_inline.py rename to shc/decode_inline.py index e9271db..5fb2918 100644 --- a/decode_inline.py +++ b/shc/decode_inline.py @@ -1,7 +1,9 @@ +#!/usr/bin/env python import json import argparse import base64 -import utils +import shc.utils + def main(): parser = argparse.ArgumentParser(description='Decodes a vc') @@ -11,13 +13,14 @@ def main(): try: jws_raw = base64.standard_b64decode(args.input).decode(encoding='utf-8') print("Base 64 decoding succeeded") - except: - print("Base 64 decoding failed, assuming input is JWS") + except Exception as e: + print("Base 64 decoding failed, assuming input is JWS", e) jws_raw = args.input payload_dict = utils.decode_vc(jws_raw) print(json.dumps(payload_dict, indent=4)) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/decode_resource.py b/shc/decode_resource.py similarity index 89% rename from decode_resource.py rename to shc/decode_resource.py index d7dac59..87aadbb 100644 --- a/decode_resource.py +++ b/shc/decode_resource.py @@ -1,6 +1,8 @@ +#!/usr/bin/env python import json import argparse -import utils +import shc.utils + def main(): parser = argparse.ArgumentParser(description='Decodes a vc') @@ -13,5 +15,6 @@ def main(): print(json.dumps(payload_dict, indent=4)) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/decode_resource_local.py b/shc/decode_resource_local.py similarity index 91% rename from decode_resource_local.py rename to shc/decode_resource_local.py index 9538869..54becc0 100644 --- a/decode_resource_local.py +++ b/shc/decode_resource_local.py @@ -1,6 +1,8 @@ +#!/usr/bin/env python import json import argparse -import utils +import shc.utils + def main(): parser = argparse.ArgumentParser(description='Decodes a vc') @@ -14,5 +16,6 @@ def main(): print(json.dumps(payload_dict, indent=4)) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/encode_resource.py b/shc/encode_resource.py similarity index 80% rename from encode_resource.py rename to shc/encode_resource.py index 20352f2..3c4d855 100644 --- a/encode_resource.py +++ b/shc/encode_resource.py @@ -1,8 +1,10 @@ +#!/usr/bin/env python import argparse import json import time import secrets -import utils +import shc.utils + def main(): parser = argparse.ArgumentParser(description='Encodes a vc') @@ -21,19 +23,20 @@ def main(): with open(args.input_file, 'r') as input_file: payload = json.load(input_file) - ##since we're using a static file to form the payload - ## it needs to be modified a bit + # since we're using a static file to form the payload + # it needs to be modified a bit now = int(time.time()) payload['iss'] = args.issuer payload['iat'] = now vc_jws = utils.encode_vc(payload, private_signing_key, kid) - ## this is the general format for a FHIR backed vc file, this is subject to change + # this is the general format for a FHIR backed vc file, this is subject to change with open(args.output_file, 'w') as outfile: output_dict = { 'verifiableCredential': [vc_jws] } json.dump(output_dict, outfile) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/encode_resource_file_in_qr_code.py b/shc/encode_resource_file_in_qr_code.py similarity index 93% rename from encode_resource_file_in_qr_code.py rename to shc/encode_resource_file_in_qr_code.py index 2d75979..de0cee5 100644 --- a/encode_resource_file_in_qr_code.py +++ b/shc/encode_resource_file_in_qr_code.py @@ -1,8 +1,9 @@ +#!/usr/bin/env python import argparse import json import time import secrets -import utils +import shc.utils def main(): @@ -20,5 +21,6 @@ def main(): with open(args.output_file, 'wb') as outfile: qr_img.save(outfile) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/encode_resource_in_qr_code.py b/shc/encode_resource_in_qr_code.py similarity index 87% rename from encode_resource_in_qr_code.py rename to shc/encode_resource_in_qr_code.py index 87faa73..c416b2d 100644 --- a/encode_resource_in_qr_code.py +++ b/shc/encode_resource_in_qr_code.py @@ -1,8 +1,9 @@ +#!/usr/bin/env python import argparse import json import time import secrets -import utils +import shc.utils def main(): @@ -23,8 +24,8 @@ def main(): with open(args.input_file, 'r') as input_file: payload = json.load(input_file) - ##since we're using a static file to form the payload - ## it needs to be modified a bit + # since we're using a static file to form the payload + # it needs to be modified a bit now = int(time.time()) payload['iss'] = args.issuer payload['iat'] = now @@ -35,5 +36,6 @@ def main(): with open(args.output_file, 'wb') as outfile: qr_img.save(outfile) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/fixtures/transform-response-payload.json b/shc/fixtures/transform-response-payload.json similarity index 100% rename from fixtures/transform-response-payload.json rename to shc/fixtures/transform-response-payload.json diff --git a/fixtures/vc-c19-pcr-jwt-payload.json b/shc/fixtures/vc-c19-pcr-jwt-payload.json similarity index 100% rename from fixtures/vc-c19-pcr-jwt-payload.json rename to shc/fixtures/vc-c19-pcr-jwt-payload.json diff --git a/fixtures/vc-covid-immunization.json b/shc/fixtures/vc-covid-immunization.json similarity index 100% rename from fixtures/vc-covid-immunization.json rename to shc/fixtures/vc-covid-immunization.json diff --git a/generate_random_jwks.py b/shc/generate_random_jwks.py similarity index 91% rename from generate_random_jwks.py rename to shc/generate_random_jwks.py index 670b9f4..a646867 100644 --- a/generate_random_jwks.py +++ b/shc/generate_random_jwks.py @@ -1,23 +1,28 @@ +#!/usr/bin/env python from jwcrypto import jwk import argparse import json + def generate_signing_key(): key = jwk.JWK.generate(kty='EC', crv='P-256', alg='ES256', use='sig') - key._params['kid'] = key.thumbprint() + key.setdefault("kid",key.thumbprint()) return key + def generate_encryption_key(): key = jwk.JWK.generate(kty='EC', crv='P-256', alg='ECDH-ES', use='enc') - key._params['kid'] = key.thumbprint() + key.setdefault("kid",key.thumbprint()) return key + def generate_keyset(keys): keyset = jwk.JWKSet() for key in keys: keyset.add(key) return keyset + def main(): parser = argparse.ArgumentParser(description='Generates a random JWK set') @@ -35,5 +40,6 @@ def main(): with open(args.public_file, 'w', newline='') as public_file: json.dump(keyset.export(private_keys=False, as_dict=True), public_file, indent=4) + if __name__ == "__main__": main() diff --git a/utils.py b/shc/utils.py similarity index 87% rename from utils.py rename to shc/utils.py index 971151f..5bd366b 100644 --- a/utils.py +++ b/shc/utils.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python import zlib import requests from jose import jwk as jose_jwk, jws @@ -9,32 +10,35 @@ SMALLEST_B64_CHAR_CODE = ord('-') SMART_HEALTH_CARD_PREFIX = 'shc:/' -## https://stackoverflow.com/a/1089787 +# https://stackoverflow.com/a/1089787 + + def deflate(data, compresslevel=9): compress = zlib.compressobj( compresslevel, # level: 0-9 zlib.DEFLATED, # method: must be DEFLATED -zlib.MAX_WBITS, # window size in bits: - # -15..-8: negate, suppress header - # 8..15: normal - # 16..30: subtract 16, gzip header + # -15..-8: negate, suppress header + # 8..15: normal + # 16..30: subtract 16, gzip header zlib.DEF_MEM_LEVEL, # mem level: 1..8/9 0 # strategy: - # 0 = Z_DEFAULT_STRATEGY - # 1 = Z_FILTERED - # 2 = Z_HUFFMAN_ONLY - # 3 = Z_RLE - # 4 = Z_FIXED + # 0 = Z_DEFAULT_STRATEGY + # 1 = Z_FILTERED + # 2 = Z_HUFFMAN_ONLY + # 3 = Z_RLE + # 4 = Z_FIXED ) deflated = compress.compress(data) deflated += compress.flush() return deflated + def inflate(data): - ## needed to add `-zlib.MAX_WBITS` here due to - ## zlib.error: Error -3 while decompressing data: incorrect header check + # needed to add `-zlib.MAX_WBITS` here due to + # zlib.error: Error -3 while decompressing data: incorrect header check decompress = zlib.decompressobj( - -zlib.MAX_WBITS # see above + -zlib.MAX_WBITS # see above ) inflated = decompress.decompress(data) inflated += decompress.flush() @@ -54,7 +58,7 @@ def resolve(iss, kid, algorithm): return key # TODO - the following line causes an exception to occur during verficiation # There's a fix on master for this, but for now, it does not work - # + # # File "/usr/local/lib/python3.7/site-packages/jose/jws.py", line 233, in _get_keys # if 'keys' in key: # TypeError: argument of type 'CryptographyECKey' is not iterable @@ -64,6 +68,7 @@ def resolve(iss, kid, algorithm): return resolve + def resolve_key_from_file(jwks_filename): def resolve(iss, kid, algorithm): with open(jwks_filename, 'r', newline='') as jwks_file: @@ -74,7 +79,7 @@ def resolve(iss, kid, algorithm): return key # TODO - the following line causes an exception to occur during verficiation # There's a fix on master for this, but for now, it does not work - # + # # File "/usr/local/lib/python3.7/site-packages/jose/jws.py", line 233, in _get_keys # if 'keys' in key: # TypeError: argument of type 'CryptographyECKey' is not iterable @@ -84,6 +89,7 @@ def resolve(iss, kid, algorithm): return resolve + def load_private_key_from_file(jwks_filename, use, algorithm): with open(jwks_filename, 'r', newline='') as jwks_file: jwks = json.load(jwks_file) @@ -93,7 +99,7 @@ def load_private_key_from_file(jwks_filename, use, algorithm): return (key.get('kid'), key) # TODO - the following line causes an exception to occur during verficiation # There's a fix on master for this, but for now, it does not work - # + # # File "/usr/local/lib/python3.7/site-packages/jose/jws.py", line 233, in _get_keys # if 'keys' in key: # TypeError: argument of type 'CryptographyECKey' is not iterable @@ -102,18 +108,19 @@ def load_private_key_from_file(jwks_filename, use, algorithm): raise Exception(f'Key with use = {use} algorithm = {algorithm} not found') + def _decode_vc(jws_raw, key_resolver): - ## before we can verify the vc, we first need to resolve the key - ## the key ID is stored in the header - ## Per the health cards IG, + # before we can verify the vc, we first need to resolve the key + # the key ID is stored in the header + # Per the health cards IG, ## "Issuers SHALL publish keys as JSON Web Key Sets (see RFC7517), available at <> + .well-known/jwks.json" - ## therefore, we need decode the claims to get the iss value in order to resolve the key - ## The claims are compressed via Deflate, so decompress the data - ## then, extract the iss claim to get access to the base URL, use that to resolve key with id = kid - ## then, verify the jws + # therefore, we need decode the claims to get the iss value in order to resolve the key + # The claims are compressed via Deflate, so decompress the data + # then, extract the iss claim to get access to the base URL, use that to resolve key with id = kid + # then, verify the jws unverified_headers = jws.get_unverified_headers(jws_raw) - ## we expect data to be zipped, so deflate the data + # we expect data to be zipped, so deflate the data if unverified_headers.get('zip') == 'DEF': unverfied_claims_zip = jws.get_unverified_claims(jws_raw) raw_data = inflate(unverfied_claims_zip) @@ -123,21 +130,24 @@ def _decode_vc(jws_raw, key_resolver): iss = data['iss'] kid = unverified_headers['kid'] - + key = key_resolver(iss, kid, 'ES256') verified_jws = jws.verify(jws_raw, key, algorithms='ES256') payload = json.loads(inflate(verified_jws)) return payload + def decode_vc(jws_raw): resolver = resolve_key_from_issuer() return _decode_vc(jws_raw, resolver) + def decode_vc_from_local_issuer(jws_raw, jwks_file): resolver = resolve_key_from_file(jwks_file) return _decode_vc(jws_raw, resolver) + def encode_vc(payload, private_signing_key, kid): payload_bytes = json.dumps(payload, separators=(',', ':')).encode('utf-8') @@ -146,13 +156,16 @@ def encode_vc(payload, private_signing_key, kid): headers = {"kid": kid, 'zip': 'DEF'} return jws.sign(compressed_payload, private_signing_key, headers=headers, algorithm='ES256') + def encode_char_to_numeric(ch): numeric_value = ord(ch) - SMALLEST_B64_CHAR_CODE return '%02d' % (numeric_value) + def encode_to_numeric(payload): return ''.join([encode_char_to_numeric(ch) for ch in payload]) + def create_qr_code(numeric_encoded_payload): qr = qrcode.QRCode() qr.add_data(SMART_HEALTH_CARD_PREFIX)