#!/usr/bin/env python3 from typing import List import os as _os import sys as _sys import json as _json import http.client as _http_client import argparse as _argparse import pathlib as _pathlib import time as _time def log( messsage : str ): _sys.stderr.write("-- %s\n" % messsage) def path_read( thing, steps : List[str] ): position = thing for step in steps: if (not (step in position)): raise ValueError("missing key '%s'" % ".".join(steps)) position = position[step] return position def path_write( thing, steps : List[str], value ): steps_first = steps[:-1] step_last = steps[-1] position = thing for step in steps_first: if (not (step in position)): position[step] = {} position = position[step] position[step_last] = value def merge( core, mantle ): result = core.copy() result.update(mantle) return result def http_call( request : dict, ) -> dict: connection = ( { "http": (lambda: _http_client.HTTPConnection(request["url"]["host"], request["url"]["port"])), "https": (lambda: _http_client.HTTPSConnection(request["url"]["host"], request["url"]["port"])), }[request["url"]["scheme"]] )() connection.request( request["method"], ("/" + request["url"]["path"]), request["data"], request["headers"] ) response_ = connection.getresponse() response = { "status": response_.status, "headers": dict(response_.getheaders()), "data": response_.read(), } return response _conf_data = { "url": { "test": { "scheme": "https", "host": "api.ote.domrobot.com", "port": 443, "path": "jsonrpc/" }, "production": { "scheme": "https", "host": "api.domrobot.com", "port": 443, "path": "jsonrpc/" } }, "environment": "production", "account": { "username": None, "password": None } } def conf_load( path : str ): global _conf_data if (not _os.path.exists(path)): pass else: handle = open(path, "r") content = handle.read() handle.close() data = _json.loads(content) _conf_data = merge(_conf_data, data) def conf_get( path : str ): global _conf_data return path_read(_conf_data, path.split(".")) def conf_set( path : str, value ): global _conf_data path_write(_conf_data, path.split("."), value) def api_call( environment : str, accesstoken : str, category : str, action : str, data, ): url = conf_get("url." + environment) # input_["lang"] = "de" request_headers = { "Content-Type": "application/json", } if (accesstoken is not None): request_headers["Cookie"] = ("domrobot=%s" % (accesstoken, )) else: pass request_data_decoded = { "method": (category + "." + action), "params": data, } request = { "url": url, "method": "POST", "headers": request_headers, "data": _json.dumps(request_data_decoded), } # log("[>>] %s" % _json.dumps(request, indent = "\t")) response = http_call(request) # log("[<<] %s" % _json.dumps(response, indent = "\t")) if (not (response["status"] == 200)): raise ValueError("API call failed with status %u: %s" % (response["status"], response["data"], )) else: output_data_decoded = _json.loads(response["data"]) result = (output_data_decoded["resData"] if ("resData" in output_data_decoded) else {}) if ("Set-Cookie" in response["headers"]): result["_accesstoken"] = response["headers"]["Set-Cookie"].split("; ")[0].split("=")[1] else: pass if (output_data_decoded["code"] == 2002): raise ValueError("wrong use: %s" % str(output_data_decoded)) else: return result def api_macro_login( environment : str, username : str, password : str ): if ((username is None) or (password is None)): raise ValueError("username or password not given") else: response = ( api_call( environment, None, "account", "login", { "user": username, "pass": password, } ) ) return response["_accesstoken"] def api_macro_logout( environment : str, accesstoken : str ): response = api_call( environment, accesstoken, "account", "logout", { } ) return None def api_macro_info( environment : str, username : str, password : str ): accesstoken = api_macro_login(environment, username, password) info = api_call( environment, accesstoken, "account", "info", { } ) api_macro_logout(environment, accesstoken) return info def api_macro_list( environment : str, username : str, password : str, domain : str ): accesstoken = api_macro_login(environment, username, password) info = api_call( environment, accesstoken, "nameserver", "info", { "domain": domain, } ) api_macro_logout(environment, accesstoken) return info def api_macro_save( environment : str, username : str, password : str, domain : str, name : str, type_ : str, content : str ): accesstoken = api_macro_login(environment, username, password) info = api_call( environment, accesstoken, "nameserver", "info", { "domain": domain, } ) matching = list( filter( lambda record: ((record["name"] == (name + "." + domain)) and (record["type"] == type_)), info["record"] ) ) count = len(matching) if (count == 0): result = api_call( environment, accesstoken, "nameserver", "createRecord", { "domain": domain, "name": name, "type": type_, "content": content, } ) id_ = result["id"] log("created record %u" % id_) elif (count == 1): id_ = matching[0]["id"] result = api_call( environment, accesstoken, "nameserver", "updateRecord", { "id": id_, "content": content, } ) log("updated record %u" % id_) else: log("found multiple records with this name and type") api_macro_logout(environment, accesstoken) def args( ): argumentparser = _argparse.ArgumentParser( description = "INWX CLI Frontend" ) argumentparser.add_argument( "-c", "--conf", dest = "conf", default = _os.path.join(str(_pathlib.Path.home()), ".inwx-conf.json"), metavar = "", help = "path to configuration file", ) argumentparser.add_argument( "-e", "--environment", dest = "environment", metavar = "", default = None, help = "environment to use; one of the keys in the 'url' filed of the configuration; overwrites the configuration value", ) argumentparser.add_argument( "-u", "--username", dest = "username", metavar = "", default = None, help = "username; overwrites the configuration value", ) argumentparser.add_argument( "-p", "--password", dest = "password", metavar = "", default = None, help = "password; overwrites the configuration value", ) ''' argumentparser.add_argument( "-d", "--domain", dest = "domain", default = None, metavar = "", help = "the domain to work with" ) ''' argumentparser.add_argument( "-x", "--challenge-prefix", dest = "challenge_prefix", metavar = "", default = "_acme-challenge", help = "which subdomain to use for ACME challanges", ) argumentparser.add_argument( "-w", "--delay", dest = "delay", type = float, default = 60.0, metavar = "", help = "seconds to wait at end of certbot auth hook", ) argumentparser.add_argument( "action", type = str, choices = ["info", "list", "save", "certbot-hook"], metavar = "", help = "action to execute", ) argumentparser.add_argument( "parameter", nargs = "*", type = str, metavar = "", help = "action specific parameters", ) arguments = argumentparser.parse_args() return arguments def main( ): arguments = args() conf_load(arguments.conf) if (not (arguments.environment is None)): conf_set("environment", arguments.environment) if (not (arguments.username is None)): conf_set("account.username", arguments.username) if (not (arguments.password is None)): conf_set("account.password", arguments.password) if (arguments.action == "info"): result = api_macro_info( conf_get("environment"), conf_get("account.username"), conf_get("account.password") ) print(_json.dumps(result, indent = "\t")) elif (arguments.action == "list"): domain = arguments.parameter[0] result = api_macro_list( conf_get("environment"), conf_get("account.username"), conf_get("account.password"), domain ) print(_json.dumps(result, indent = "\t")) elif (arguments.action == "save"): domain = arguments.parameter[0] name = arguments.parameter[1] type_ = arguments.parameter[2] content = arguments.parameter[3] api_macro_save( conf_get("environment"), conf_get("account.username"), conf_get("account.password"), domain, name, type_, content ) # print(_json.dumps(result, indent = "\t")) elif (arguments.action == "certbot-hook"): domain_full_parts = _os.environ["CERTBOT_DOMAIN"].split(".") account = ".".join(domain_full_parts[-2:]) concern = ".".join(domain_full_parts[:-2]) domain = account name = (arguments.challenge_prefix + "." + concern) type_ = "TXT" content = _os.environ["CERTBOT_VALIDATION"] api_macro_save( conf_get("environment"), conf_get("account.username"), conf_get("account.password"), domain, name, type_, content ) _time.sleep(arguments.delay) # print(_json.dumps(result, indent = "\t")) else: log("unhandled action '%s'" % (arguments.action, )) try: main() except ValueError as error: _sys.stderr.write(str(error) + "\n")