#!/usr/bin/env python3 from typing import List import os as _os import sys as _sys import json as _json import http.client as _http_client import argparse as _argparse import pathlib as _pathlib import time as _time def convey(x, fs): y = x for f in fs: y = f(y) return y def string_coin( template : str, arguments : dict ): result = template for (key, value, ) in arguments.items(): result = result.replace("{{%s}}" % key, value) return result def file_read( path : str ): handle = open(path, "r") content = handle.read() handle.close() return content def log( messsage : str ): _sys.stderr.write("-- %s\n" % messsage) def path_read( thing, steps : List[str] ): position = thing for step in steps: if (not (step in position)): raise ValueError("missing key '%s'" % ".".join(steps)) position = position[step] return position def http_call( request : dict, ) -> dict: connection = ( { "http": (lambda: _http_client.HTTPConnection(request["url"]["host"], request["url"]["port"])), "https": (lambda: _http_client.HTTPSConnection(request["url"]["host"], request["url"]["port"])), }[request["url"]["scheme"]] )() connection.request( request["method"], ("/" + request["url"]["path"]), request["data"], request["headers"] ) response_ = connection.getresponse() response = { "status": response_.status, "headers": dict(response_.getheaders()), "data": response_.read(), } return response _conf_data = None def conf_schema( ): return { "type": "object", "properties": { "url": { "type": "object", "properties": { }, "additionalProperties": { "type": "object", "properties": { "scheme": { "type": "string", }, "host": { "type": "string", }, "port": { "type": "number", }, "path": { "type": "string", }, }, "additionalProperties": False, "required": [ "host", ] }, "required": [ ] }, "environment": { "type": "string", }, "account": { "type": "object", "properties": { "username": { "type": "string", }, "password": { "type": "string", }, }, "additionalProperties": False, "required": [ ] } }, "additionalProperties": False, "required": [ ], } def conf_load( path : str ): global _conf_data conf_data_raw = ( _json.loads(file_read(path)) if _os.path.exists(path) else {} ) for pair in conf_data_raw.get("url", {}).items(): if ("host" in pair[1]): pass else: raise ValueError("flawed conf: missing mandatory value 'host' for url entry '%s'" % pair[0]) _conf_data = { "url": convey( ( { "test": { "scheme": "https", "host": "api.ote.domrobot.com", "port": 443, "path": "jsonrpc/" }, "production": { "scheme": "https", "host": "api.domrobot.com", "port": 443, "path": "jsonrpc/" } } | conf_data_raw.get("url", {}) ), [ lambda x: x.items(), lambda pairs: map( lambda pair: ( pair[0], { "scheme": pair[1].get("scheme", "https"), "host": pair[1]["host"], "port": pair[1].get("port", 443), "path": pair[1].get("path", "jsonrpc/"), } ), pairs ), dict, ] ), "environment": conf_data_raw.get("environment", "production"), "account": { "username": conf_data_raw.get("account", {}).get("username", None), "password": conf_data_raw.get("account", {}).get("password", None), } } # print(_json.dumps(_conf_data, indent = "\t")) def conf_get( path : str ): global _conf_data return path_read(_conf_data, path.split(".")) def api_call( environment : str, accesstoken : str, category : str, action : str, data, ): url = conf_get("url." + environment) # input_["lang"] = "de" request_headers = { "Content-Type": "application/json", } if (accesstoken is not None): request_headers["Cookie"] = ("domrobot=%s" % (accesstoken, )) else: pass request_data_decoded = { "method": (category + "." + action), "params": data, } request = { "url": url, "method": "POST", "headers": request_headers, "data": _json.dumps(request_data_decoded), } # log("[>>] %s" % _json.dumps(request, indent = "\t")) response = http_call(request) # log("[<<] %s" % _json.dumps(response, indent = "\t")) if (not (response["status"] == 200)): raise ValueError("API call failed with status %u: %s" % (response["status"], response["data"], )) else: output_data_decoded = _json.loads(response["data"]) result = (output_data_decoded["resData"] if ("resData" in output_data_decoded) else {}) if ("Set-Cookie" in response["headers"]): result["_accesstoken"] = response["headers"]["Set-Cookie"].split("; ")[0].split("=")[1] else: pass if (output_data_decoded["code"] == 2002): raise ValueError("wrong use: %s" % str(output_data_decoded)) else: return result ''' @see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.login ''' def api_macro_login( environment : str, username : str, password : str ): if ((username is None) or (password is None)): raise ValueError("username or password not given") else: response = ( api_call( environment, None, "account", "login", { "user": username, "pass": password, } ) ) return response["_accesstoken"] ''' @see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.logout ''' def api_macro_logout( environment : str, accesstoken : str ): response = api_call( environment, accesstoken, "account", "logout", { } ) return None ''' @see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.info ''' def api_macro_info( environment : str, username : str, password : str ): accesstoken = api_macro_login(environment, username, password) info = api_call( environment, accesstoken, "account", "info", { } ) api_macro_logout(environment, accesstoken) return info ''' @see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info ''' def api_macro_list( environment : str, username : str, password : str, domain : str ): accesstoken = api_macro_login(environment, username, password) info = api_call( environment, accesstoken, "nameserver", "info", { "domain": domain, } ) api_macro_logout(environment, accesstoken) return info ''' @see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info @see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.createRecord @see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.updateRecord ''' def api_macro_save( environment : str, username : str, password : str, domain_base : str, domain_path, type_ : str, content : str ): accesstoken = api_macro_login(environment, username, password) info = api_call( environment, accesstoken, "nameserver", "info", { "domain": domain_base, } ) matching = list( filter( lambda record: ( ( ( (domain_path is None) and (record["name"] == domain_base) ) or ( (domain_path is not None) and (record["name"] == (domain_path + "." + domain_base)) ) ) and (record["type"] == type_) ), info["record"] ) ) count = len(matching) if (count == 0): result = api_call( environment, accesstoken, "nameserver", "createRecord", { "domain": domain_base, "name": domain_path, "type": type_, "content": content, } ) id_ = result["id"] log("created record %u" % id_) elif (count == 1): id_ = matching[0]["id"] result = api_call( environment, accesstoken, "nameserver", "updateRecord", { "id": id_, "content": content, } ) log("updated record %u" % id_) else: log("found multiple records with this name and type") api_macro_logout(environment, accesstoken) ''' @see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info @see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.deleteRecord ''' def api_macro_delete( environment : str, username : str, password : str, domain_base : str, domain_path, type_ ): accesstoken = api_macro_login(environment, username, password) info = api_call( environment, accesstoken, "nameserver", "info", { "domain": domain_base, } ) matching = list( filter( lambda record: ( ( ( (domain_path is None) and (record["name"] == domain_base) ) or ( (domain_path is not None) and (record["name"] == (domain_path + "." + domain_base)) ) ) and ( (type_ is None) or (record["type"] == type_) ) ), info["record"] ) ) for entry in matching: id_ = entry["id"] result = api_call( environment, accesstoken, "nameserver", "deleteRecord", { "id": id_, } ) api_macro_logout(environment, accesstoken) def main( ): ## args argument_parser = _argparse.ArgumentParser( description = "INWX CLI Frontend" ) argument_parser.add_argument( "-c", "--conf", type = str, dest = "conf", default = _os.path.join(str(_pathlib.Path.home()), ".inwx-conf.json"), metavar = "", help = "path to configuration file", ) argument_parser.add_argument( "-e", "--environment", type = str, dest = "environment", metavar = "", default = None, help = "environment to use; one of the keys in the 'url' node of the configuration; overwrites the configuration value", ) argument_parser.add_argument( "-u", "--username", type = str, dest = "username", metavar = "", default = None, help = "username; overwrites the configuration value", ) argument_parser.add_argument( "-p", "--password", type = str, dest = "password", metavar = "", default = None, help = "password; overwrites the configuration value", ) argument_parser.add_argument( "-d", "--domain", type = str, dest = "domain", default = None, metavar = "", help = "the domain to work with" ) argument_parser.add_argument( "-t", "--type", type = str, dest = "type", default = None, metavar = "", help = "the record type (A, AAAA, TXT, …)" ) argument_parser.add_argument( "-v", "--value", type = str, dest = "value", default = None, metavar = "", help = "value for the record" ) argument_parser.add_argument( "-x", "--challenge-prefix", type = str, dest = "challenge_prefix", metavar = "", default = "_acme-challenge", help = "which subdomain to use for ACME challanges", ) argument_parser.add_argument( "-w", "--delay", type = float, dest = "delay", default = 60.0, metavar = "", help = "seconds to wait at end of certbot auth hook", ) argument_parser.add_argument( type = str, dest = "action", choices = [ "conf-schema", "info", "list", "save", "delete", "certbot-hook", ], metavar = "", help = string_coin( "action to execute; options:\n{{options}}", { "options": convey( [ {"name": "conf-schema", "requirements": []}, {"name": "info", "requirements": []}, {"name": "list", "requirements": [""]}, {"name": "save", "requirements": ["", "", ""]}, {"name": "delete", "requirements": [""]}, {"name": "certbot-hook", "requirements": []}, ], [ lambda x: map( lambda entry: string_coin( "{{name}}{{macro_requirements}}", { "name": entry["name"], "macro_requirements": ( "" if (len(entry["requirements"]) <= 0) else string_coin( " (requires: {{requirements}})", { "requirements": ",".join(entry["requirements"]), } ) ), } ), x ), " | ".join, ] ) } ), ) args = argument_parser.parse_args() ## conf conf_load(args.conf) ## vars environment = (args.environment or conf_get("environment")) account_username = (args.username or conf_get("account.username")) account_password = (args.password or conf_get("account.password")) domain_parts = (None if (args.domain is None) else args.domain.split(".")) domain_base = (None if (domain_parts is None) else ".".join(domain_parts[-2:])) domain_path = (None if ((domain_parts is None) or (len(domain_parts[:-2]) <= 0)) else ".".join(domain_parts[:-2])) ## exec if (args.action == "conf-schema"): print(_json.dumps(conf_schema(), indent = "\t")) elif (args.action == "info"): if (account_username is None): raise ValueError("account username required") else: if (account_password is None): raise ValueError("account password required") else: result = api_macro_info( environment, account_username, account_password ) print(_json.dumps(result, indent = "\t")) elif (args.action == "list"): if (account_username is None): raise ValueError("account username required") else: if (account_password is None): raise ValueError("account password required") else: if (args.domain is None): raise ValueError("domain required") else: result = api_macro_list( environment, account_username, account_password, domain_base ) print(_json.dumps(result, indent = "\t")) elif (args.action == "save"): if (account_username is None): raise ValueError("account username required") else: if (account_password is None): raise ValueError("account password required") else: if (args.domain is None): raise ValueError("domain required") else: if (args.type is None): raise ValueError("type required") else: if (args.value is None): raise ValueError("value required") else: api_macro_save( environment, account_username, account_password, domain_base, domain_path, args.type, args.value ) elif (args.action == "delete"): if (account_username is None): raise ValueError("account username required") else: if (account_password is None): raise ValueError("account password required") else: if (args.domain is None): raise ValueError("domain required") else: api_macro_delete( environment, account_username, account_password, domain_base, domain_path, args.type ) elif (args.action == "certbot-hook"): if (account_username is None): raise ValueError("account username required") else: if (account_password is None): raise ValueError("account password required") else: domain_full_parts = _os.environ["CERTBOT_DOMAIN"].split(".") domain_base = ".".join(domain_full_parts[-2:]) domain_path_stripped = ".".join(domain_full_parts[:-2]) domain_path = (args.challenge_prefix + "." + domain_path_stripped) type_ = "TXT" content = _os.environ["CERTBOT_VALIDATION"] api_macro_save( environment, account_username, account_password, domain_base, domain_path, type_, content ) _time.sleep(args.delay) # print(_json.dumps(result, indent = "\t")) else: log("unhandled action '%s'" % (args.action, )) try: main() except ValueError as error: _sys.stderr.write("-- %s\n" % str(error))