From da1ad77dab14d59477be0db4a2f8c96dffe26dd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Fra=C3=9F?= Date: Sat, 1 Jun 2024 18:41:52 +0200 Subject: [PATCH] [res] --- roles/tlscert_acme_inwx/files/inwx | 588 +++++++++++++++++++++-------- 1 file changed, 434 insertions(+), 154 deletions(-) diff --git a/roles/tlscert_acme_inwx/files/inwx b/roles/tlscert_acme_inwx/files/inwx index 2fdeb41..53b950c 100755 --- a/roles/tlscert_acme_inwx/files/inwx +++ b/roles/tlscert_acme_inwx/files/inwx @@ -10,6 +10,32 @@ import argparse as _argparse import pathlib as _pathlib import time as _time +def convey(x, fs): + y = x + for f in fs: + y = f(y) + return y + + +def string_coin( + template : str, + arguments : dict +): + result = template + for (key, value, ) in arguments.items(): + result = result.replace("{{%s}}" % key, value) + return result + + +def file_read( + path : str +): + handle = open(path, "r") + content = handle.read() + handle.close() + return content + + def log( messsage : str ): @@ -28,30 +54,6 @@ def path_read( return position -def path_write( - thing, - steps : List[str], - value -): - steps_first = steps[:-1] - step_last = steps[-1] - position = thing - for step in steps_first: - if (not (step in position)): - position[step] = {} - position = position[step] - position[step_last] = value - - -def merge( - core, - mantle -): - result = core.copy() - result.update(mantle) - return result - - def http_call( request : dict, ) -> dict: @@ -76,41 +78,124 @@ def http_call( return response -_conf_data = { - "url": { - "test": { - "scheme": "https", - "host": "api.ote.domrobot.com", - "port": 443, - "path": "jsonrpc/" - }, - "production": { - "scheme": "https", - "host": "api.domrobot.com", - "port": 443, - "path": "jsonrpc/" - } - }, - "environment": "production", - "account": { - "username": None, - "password": None - } -} +_conf_data = None +def conf_schema( +): + return { + "type": "object", + "properties": { + "url": { + "type": "object", + "properties": { + }, + "additionalProperties": { + "type": "object", + "properties": { + "scheme": { + "type": "string", + }, + "host": { + "type": "string", + }, + "port": { + "type": "number", + }, + "path": { + "type": "string", + }, + }, + "additionalProperties": False, + "required": [ + "host", + ] + }, + "required": [ + ] + }, + "environment": { + "type": "string", + }, + "account": { + "type": "object", + "properties": { + "username": { + "type": "string", + }, + "password": { + "type": "string", + }, + }, + "additionalProperties": False, + "required": [ + ] + } + }, + "additionalProperties": False, + "required": [ + ], + } + + def conf_load( path : str ): global _conf_data - if (not _os.path.exists(path)): - pass - else: - handle = open(path, "r") - content = handle.read() - handle.close() - data = _json.loads(content) - _conf_data = merge(_conf_data, data) + conf_data_raw = ( + _json.loads(file_read(path)) + if _os.path.exists(path) else + {} + ) + for pair in conf_data_raw.get("url", {}).items(): + if ("host" in pair[1]): + pass + else: + raise ValueError("flawed conf: missing mandatory value 'host' for url entry '%s'" % pair[0]) + _conf_data = { + "url": convey( + ( + { + "test": { + "scheme": "https", + "host": "api.ote.domrobot.com", + "port": 443, + "path": "jsonrpc/" + }, + "production": { + "scheme": "https", + "host": "api.domrobot.com", + "port": 443, + "path": "jsonrpc/" + } + } + | + conf_data_raw.get("url", {}) + ), + [ + lambda x: x.items(), + lambda pairs: map( + lambda pair: ( + pair[0], + { + "scheme": pair[1].get("scheme", "https"), + "host": pair[1]["host"], + "port": pair[1].get("port", 443), + "path": pair[1].get("path", "jsonrpc/"), + } + ), + pairs + ), + dict, + ] + ), + "environment": conf_data_raw.get("environment", "production"), + "account": { + "username": conf_data_raw.get("account", {}).get("username", None), + "password": conf_data_raw.get("account", {}).get("password", None), + } + } + # print(_json.dumps(_conf_data, indent = "\t")) def conf_get( @@ -119,15 +204,6 @@ def conf_get( global _conf_data return path_read(_conf_data, path.split(".")) - -def conf_set( - path : str, - value -): - global _conf_data - path_write(_conf_data, path.split("."), value) - - def api_call( environment : str, accesstoken : str, @@ -172,6 +248,9 @@ def api_call( return result +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.login +''' def api_macro_login( environment : str, username : str, @@ -195,6 +274,9 @@ def api_macro_login( return response["_accesstoken"] +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.logout +''' def api_macro_logout( environment : str, accesstoken : str @@ -210,6 +292,9 @@ def api_macro_logout( return None +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.info +''' def api_macro_info( environment : str, username : str, @@ -228,6 +313,9 @@ def api_macro_info( return info +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info +''' def api_macro_list( environment : str, username : str, @@ -248,12 +336,17 @@ def api_macro_list( return info +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.createRecord +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.updateRecord +''' def api_macro_save( environment : str, username : str, password : str, - domain : str, - name : str, + domain_base : str, + domain_path, type_ : str, content : str ): @@ -264,12 +357,28 @@ def api_macro_save( "nameserver", "info", { - "domain": domain, + "domain": domain_base, } ) matching = list( filter( - lambda record: ((record["name"] == (name + "." + domain)) and (record["type"] == type_)), + lambda record: ( + ( + ( + (domain_path is None) + and + (record["name"] == domain_base) + ) + or + ( + (domain_path is not None) + and + (record["name"] == (domain_path + "." + domain_base)) + ) + ) + and + (record["type"] == type_) + ), info["record"] ) ) @@ -281,8 +390,8 @@ def api_macro_save( "nameserver", "createRecord", { - "domain": domain, - "name": name, + "domain": domain_base, + "name": domain_path, "type": type_, "content": content, } @@ -308,153 +417,324 @@ def api_macro_save( -def args( +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.deleteRecord +''' +def api_macro_delete( + environment : str, + username : str, + password : str, + domain_base : str, + domain_path, + type_ ): - argumentparser = _argparse.ArgumentParser( + accesstoken = api_macro_login(environment, username, password) + info = api_call( + environment, + accesstoken, + "nameserver", + "info", + { + "domain": domain_base, + } + ) + matching = list( + filter( + lambda record: ( + ( + ( + (domain_path is None) + and + (record["name"] == domain_base) + ) + or + ( + (domain_path is not None) + and + (record["name"] == (domain_path + "." + domain_base)) + ) + ) + and + ( + (type_ is None) + or + (record["type"] == type_) + ) + ), + info["record"] + ) + ) + for entry in matching: + id_ = entry["id"] + result = api_call( + environment, + accesstoken, + "nameserver", + "deleteRecord", + { + "id": id_, + } + ) + api_macro_logout(environment, accesstoken) + + +def main( +): + ## args + argument_parser = _argparse.ArgumentParser( description = "INWX CLI Frontend" ) - argumentparser.add_argument( + argument_parser.add_argument( "-c", "--conf", + type = str, dest = "conf", default = _os.path.join(str(_pathlib.Path.home()), ".inwx-conf.json"), metavar = "", help = "path to configuration file", ) - argumentparser.add_argument( + argument_parser.add_argument( "-e", "--environment", + type = str, dest = "environment", metavar = "", default = None, - help = "environment to use; one of the keys in the 'url' filed of the configuration; overwrites the configuration value", + help = "environment to use; one of the keys in the 'url' node of the configuration; overwrites the configuration value", ) - argumentparser.add_argument( + argument_parser.add_argument( "-u", "--username", + type = str, dest = "username", metavar = "", default = None, help = "username; overwrites the configuration value", ) - argumentparser.add_argument( + argument_parser.add_argument( "-p", "--password", + type = str, dest = "password", metavar = "", default = None, help = "password; overwrites the configuration value", ) - ''' - argumentparser.add_argument( + argument_parser.add_argument( "-d", "--domain", + type = str, dest = "domain", default = None, metavar = "", help = "the domain to work with" ) - ''' - argumentparser.add_argument( + argument_parser.add_argument( + "-t", + "--type", + type = str, + dest = "type", + default = None, + metavar = "", + help = "the record type (A, AAAA, TXT, …)" + ) + argument_parser.add_argument( + "-v", + "--value", + type = str, + dest = "value", + default = None, + metavar = "", + help = "value for the record" + ) + argument_parser.add_argument( "-x", "--challenge-prefix", + type = str, dest = "challenge_prefix", metavar = "", default = "_acme-challenge", help = "which subdomain to use for ACME challanges", ) - argumentparser.add_argument( + argument_parser.add_argument( "-w", "--delay", - dest = "delay", type = float, + dest = "delay", default = 60.0, metavar = "", help = "seconds to wait at end of certbot auth hook", ) - argumentparser.add_argument( - "action", + argument_parser.add_argument( type = str, - choices = ["info", "list", "save", "certbot-hook"], + dest = "action", + choices = [ + "conf-schema", + "info", + "list", + "save", + "delete", + "certbot-hook", + ], metavar = "", - help = "action to execute", + help = string_coin( + "action to execute; options:\n{{options}}", + { + "options": convey( + [ + {"name": "conf-schema", "requirements": []}, + {"name": "info", "requirements": []}, + {"name": "list", "requirements": [""]}, + {"name": "save", "requirements": ["", "", ""]}, + {"name": "delete", "requirements": [""]}, + {"name": "certbot-hook", "requirements": []}, + ], + [ + lambda x: map( + lambda entry: string_coin( + "{{name}}{{macro_requirements}}", + { + "name": entry["name"], + "macro_requirements": ( + "" + if (len(entry["requirements"]) <= 0) else + string_coin( + " (requires: {{requirements}})", + { + "requirements": ",".join(entry["requirements"]), + } + ) + ), + } + ), + x + ), + " | ".join, + ] + ) + } + ), ) - argumentparser.add_argument( - "parameter", - nargs = "*", - type = str, - metavar = "", - help = "action specific parameters", - ) - arguments = argumentparser.parse_args() - return arguments + args = argument_parser.parse_args() - -def main( -): - arguments = args() + ## conf + conf_load(args.conf) - conf_load(arguments.conf) - if (not (arguments.environment is None)): conf_set("environment", arguments.environment) - if (not (arguments.username is None)): conf_set("account.username", arguments.username) - if (not (arguments.password is None)): conf_set("account.password", arguments.password) + ## vars + environment = (args.environment or conf_get("environment")) + account_username = (args.username or conf_get("account.username")) + account_password = (args.password or conf_get("account.password")) + domain_parts = (None if (args.domain is None) else args.domain.split(".")) + domain_base = (None if (domain_parts is None) else ".".join(domain_parts[-2:])) + domain_path = (None if ((domain_parts is None) or (len(domain_parts[:-2]) <= 0)) else ".".join(domain_parts[:-2])) - if (arguments.action == "info"): - result = api_macro_info( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password") - ) - print(_json.dumps(result, indent = "\t")) - elif (arguments.action == "list"): - domain = arguments.parameter[0] - result = api_macro_list( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password"), - domain - ) - print(_json.dumps(result, indent = "\t")) - elif (arguments.action == "save"): - domain = arguments.parameter[0] - name = arguments.parameter[1] - type_ = arguments.parameter[2] - content = arguments.parameter[3] - api_macro_save( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password"), - domain, - name, - type_, - content - ) - # print(_json.dumps(result, indent = "\t")) - elif (arguments.action == "certbot-hook"): - domain_full_parts = _os.environ["CERTBOT_DOMAIN"].split(".") - account = ".".join(domain_full_parts[-2:]) - concern = ".".join(domain_full_parts[:-2]) - domain = account - name = (arguments.challenge_prefix + "." + concern) - type_ = "TXT" - content = _os.environ["CERTBOT_VALIDATION"] - api_macro_save( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password"), - domain, - name, - type_, - content - ) - _time.sleep(arguments.delay) - # print(_json.dumps(result, indent = "\t")) + ## exec + if (args.action == "conf-schema"): + print(_json.dumps(conf_schema(), indent = "\t")) + elif (args.action == "info"): + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + result = api_macro_info( + environment, + account_username, + account_password + ) + print(_json.dumps(result, indent = "\t")) + elif (args.action == "list"): + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + if (args.domain is None): + raise ValueError("domain required") + else: + result = api_macro_list( + environment, + account_username, + account_password, + domain_base + ) + print(_json.dumps(result, indent = "\t")) + elif (args.action == "save"): + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + if (args.domain is None): + raise ValueError("domain required") + else: + if (args.type is None): + raise ValueError("type required") + else: + if (args.value is None): + raise ValueError("value required") + else: + api_macro_save( + environment, + account_username, + account_password, + domain_base, + domain_path, + args.type, + args.value + ) + elif (args.action == "delete"): + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + if (args.domain is None): + raise ValueError("domain required") + else: + api_macro_delete( + environment, + account_username, + account_password, + domain_base, + domain_path, + args.type + ) + elif (args.action == "certbot-hook"): + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + domain_full_parts = _os.environ["CERTBOT_DOMAIN"].split(".") + domain_base = ".".join(domain_full_parts[-2:]) + domain_path_stripped = ".".join(domain_full_parts[:-2]) + domain_path = (args.challenge_prefix + "." + domain_path_stripped) + type_ = "TXT" + content = _os.environ["CERTBOT_VALIDATION"] + api_macro_save( + environment, + account_username, + account_password, + domain_base, + domain_path, + type_, + content + ) + _time.sleep(args.delay) + # print(_json.dumps(result, indent = "\t")) else: - log("unhandled action '%s'" % (arguments.action, )) + log("unhandled action '%s'" % (args.action, )) try: main() except ValueError as error: - _sys.stderr.write(str(error) + "\n") + _sys.stderr.write("-- %s\n" % str(error))