461 lines
9.2 KiB
Text
461 lines
9.2 KiB
Text
![]() |
#!/usr/bin/env python3
|
||
|
|
||
|
from typing import List
|
||
|
|
||
|
import os as _os
|
||
|
import sys as _sys
|
||
|
import json as _json
|
||
|
import http.client as _http_client
|
||
|
import argparse as _argparse
|
||
|
import pathlib as _pathlib
|
||
|
import time as _time
|
||
|
|
||
|
def log(
|
||
|
messsage : str
|
||
|
):
|
||
|
_sys.stderr.write("-- %s\n" % messsage)
|
||
|
|
||
|
|
||
|
def path_read(
|
||
|
thing,
|
||
|
steps : List[str]
|
||
|
):
|
||
|
position = thing
|
||
|
for step in steps:
|
||
|
if (not (step in position)):
|
||
|
raise ValueError("missing key '%s'" % ".".join(steps))
|
||
|
position = position[step]
|
||
|
return position
|
||
|
|
||
|
|
||
|
def path_write(
|
||
|
thing,
|
||
|
steps : List[str],
|
||
|
value
|
||
|
):
|
||
|
steps_first = steps[:-1]
|
||
|
step_last = steps[-1]
|
||
|
position = thing
|
||
|
for step in steps_first:
|
||
|
if (not (step in position)):
|
||
|
position[step] = {}
|
||
|
position = position[step]
|
||
|
position[step_last] = value
|
||
|
|
||
|
|
||
|
def merge(
|
||
|
core,
|
||
|
mantle
|
||
|
):
|
||
|
result = core.copy()
|
||
|
result.update(mantle)
|
||
|
return result
|
||
|
|
||
|
|
||
|
def http_call(
|
||
|
request : dict,
|
||
|
) -> dict:
|
||
|
connection = (
|
||
|
{
|
||
|
"http": (lambda: _http_client.HTTPConnection(request["url"]["host"], request["url"]["port"])),
|
||
|
"https": (lambda: _http_client.HTTPSConnection(request["url"]["host"], request["url"]["port"])),
|
||
|
}[request["url"]["scheme"]]
|
||
|
)()
|
||
|
connection.request(
|
||
|
request["method"],
|
||
|
("/" + request["url"]["path"]),
|
||
|
request["data"],
|
||
|
request["headers"]
|
||
|
)
|
||
|
response_ = connection.getresponse()
|
||
|
response = {
|
||
|
"status": response_.status,
|
||
|
"headers": dict(response_.getheaders()),
|
||
|
"data": response_.read(),
|
||
|
}
|
||
|
return response
|
||
|
|
||
|
|
||
|
_conf_data = {
|
||
|
"url": {
|
||
|
"test": {
|
||
|
"scheme": "https",
|
||
|
"host": "api.ote.domrobot.com",
|
||
|
"port": 443,
|
||
|
"path": "jsonrpc/"
|
||
|
},
|
||
|
"production": {
|
||
|
"scheme": "https",
|
||
|
"host": "api.domrobot.com",
|
||
|
"port": 443,
|
||
|
"path": "jsonrpc/"
|
||
|
}
|
||
|
},
|
||
|
"environment": "production",
|
||
|
"account": {
|
||
|
"username": None,
|
||
|
"password": None
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
def conf_load(
|
||
|
path : str
|
||
|
):
|
||
|
global _conf_data
|
||
|
if (not _os.path.exists(path)):
|
||
|
pass
|
||
|
else:
|
||
|
handle = open(path, "r")
|
||
|
content = handle.read()
|
||
|
handle.close()
|
||
|
data = _json.loads(content)
|
||
|
_conf_data = merge(_conf_data, data)
|
||
|
|
||
|
|
||
|
def conf_get(
|
||
|
path : str
|
||
|
):
|
||
|
global _conf_data
|
||
|
return path_read(_conf_data, path.split("."))
|
||
|
|
||
|
|
||
|
def conf_set(
|
||
|
path : str,
|
||
|
value
|
||
|
):
|
||
|
global _conf_data
|
||
|
path_write(_conf_data, path.split("."), value)
|
||
|
|
||
|
|
||
|
def api_call(
|
||
|
environment : str,
|
||
|
accesstoken : str,
|
||
|
category : str,
|
||
|
action : str,
|
||
|
data,
|
||
|
):
|
||
|
url = conf_get("url." + environment)
|
||
|
# input_["lang"] = "de"
|
||
|
request_headers = {
|
||
|
"Content-Type": "application/json",
|
||
|
}
|
||
|
if (accesstoken is not None):
|
||
|
request_headers["Cookie"] = ("domrobot=%s" % (accesstoken, ))
|
||
|
else:
|
||
|
pass
|
||
|
request_data_decoded = {
|
||
|
"method": (category + "." + action),
|
||
|
"params": data,
|
||
|
}
|
||
|
request = {
|
||
|
"url": url,
|
||
|
"method": "POST",
|
||
|
"headers": request_headers,
|
||
|
"data": _json.dumps(request_data_decoded),
|
||
|
}
|
||
|
# log("[>>] %s" % _json.dumps(request, indent = "\t"))
|
||
|
response = http_call(request)
|
||
|
# log("[<<] %s" % _json.dumps(response, indent = "\t"))
|
||
|
if (not (response["status"] == 200)):
|
||
|
raise ValueError("API call failed with status %u: %s" % (response["status"], response["data"], ))
|
||
|
else:
|
||
|
output_data_decoded = _json.loads(response["data"])
|
||
|
result = (output_data_decoded["resData"] if ("resData" in output_data_decoded) else {})
|
||
|
if ("Set-Cookie" in response["headers"]):
|
||
|
result["_accesstoken"] = response["headers"]["Set-Cookie"].split("; ")[0].split("=")[1]
|
||
|
else:
|
||
|
pass
|
||
|
if (output_data_decoded["code"] == 2002):
|
||
|
raise ValueError("wrong use: %s" % str(output_data_decoded))
|
||
|
else:
|
||
|
return result
|
||
|
|
||
|
|
||
|
def api_macro_login(
|
||
|
environment : str,
|
||
|
username : str,
|
||
|
password : str
|
||
|
):
|
||
|
if ((username is None) or (password is None)):
|
||
|
raise ValueError("username or password not given")
|
||
|
else:
|
||
|
response = (
|
||
|
api_call(
|
||
|
environment,
|
||
|
None,
|
||
|
"account",
|
||
|
"login",
|
||
|
{
|
||
|
"user": username,
|
||
|
"pass": password,
|
||
|
}
|
||
|
)
|
||
|
)
|
||
|
return response["_accesstoken"]
|
||
|
|
||
|
|
||
|
def api_macro_logout(
|
||
|
environment : str,
|
||
|
accesstoken : str
|
||
|
):
|
||
|
response = api_call(
|
||
|
environment,
|
||
|
accesstoken,
|
||
|
"account",
|
||
|
"logout",
|
||
|
{
|
||
|
}
|
||
|
)
|
||
|
return None
|
||
|
|
||
|
|
||
|
def api_macro_info(
|
||
|
environment : str,
|
||
|
username : str,
|
||
|
password : str
|
||
|
):
|
||
|
accesstoken = api_macro_login(environment, username, password)
|
||
|
info = api_call(
|
||
|
environment,
|
||
|
accesstoken,
|
||
|
"account",
|
||
|
"info",
|
||
|
{
|
||
|
}
|
||
|
)
|
||
|
api_macro_logout(environment, accesstoken)
|
||
|
return info
|
||
|
|
||
|
|
||
|
def api_macro_list(
|
||
|
environment : str,
|
||
|
username : str,
|
||
|
password : str,
|
||
|
domain : str
|
||
|
):
|
||
|
accesstoken = api_macro_login(environment, username, password)
|
||
|
info = api_call(
|
||
|
environment,
|
||
|
accesstoken,
|
||
|
"nameserver",
|
||
|
"info",
|
||
|
{
|
||
|
"domain": domain,
|
||
|
}
|
||
|
)
|
||
|
api_macro_logout(environment, accesstoken)
|
||
|
return info
|
||
|
|
||
|
|
||
|
def api_macro_save(
|
||
|
environment : str,
|
||
|
username : str,
|
||
|
password : str,
|
||
|
domain : str,
|
||
|
name : str,
|
||
|
type_ : str,
|
||
|
content : str
|
||
|
):
|
||
|
accesstoken = api_macro_login(environment, username, password)
|
||
|
info = api_call(
|
||
|
environment,
|
||
|
accesstoken,
|
||
|
"nameserver",
|
||
|
"info",
|
||
|
{
|
||
|
"domain": domain,
|
||
|
}
|
||
|
)
|
||
|
matching = list(
|
||
|
filter(
|
||
|
lambda record: ((record["name"] == (name + "." + domain)) and (record["type"] == type_)),
|
||
|
info["record"]
|
||
|
)
|
||
|
)
|
||
|
count = len(matching)
|
||
|
if (count == 0):
|
||
|
result = api_call(
|
||
|
environment,
|
||
|
accesstoken,
|
||
|
"nameserver",
|
||
|
"createRecord",
|
||
|
{
|
||
|
"domain": domain,
|
||
|
"name": name,
|
||
|
"type": type_,
|
||
|
"content": content,
|
||
|
}
|
||
|
)
|
||
|
id_ = result["id"]
|
||
|
log("created record %u" % id_)
|
||
|
elif (count == 1):
|
||
|
id_ = matching[0]["id"]
|
||
|
result = api_call(
|
||
|
environment,
|
||
|
accesstoken,
|
||
|
"nameserver",
|
||
|
"updateRecord",
|
||
|
{
|
||
|
"id": id_,
|
||
|
"content": content,
|
||
|
}
|
||
|
)
|
||
|
log("updated record %u" % id_)
|
||
|
else:
|
||
|
log("found multiple records with this name and type")
|
||
|
api_macro_logout(environment, accesstoken)
|
||
|
|
||
|
|
||
|
|
||
|
def args(
|
||
|
):
|
||
|
argumentparser = _argparse.ArgumentParser(
|
||
|
description = "INWX CLI Frontend"
|
||
|
)
|
||
|
argumentparser.add_argument(
|
||
|
"-c",
|
||
|
"--conf",
|
||
|
dest = "conf",
|
||
|
default = _os.path.join(str(_pathlib.Path.home()), ".inwx-conf.json"),
|
||
|
metavar = "<conf>",
|
||
|
help = "path to configuration file",
|
||
|
)
|
||
|
argumentparser.add_argument(
|
||
|
"-e",
|
||
|
"--environment",
|
||
|
dest = "environment",
|
||
|
metavar = "<environment>",
|
||
|
default = None,
|
||
|
help = "environment to use; one of the keys in the 'url' filed of the configuration; overwrites the configuration value",
|
||
|
)
|
||
|
argumentparser.add_argument(
|
||
|
"-u",
|
||
|
"--username",
|
||
|
dest = "username",
|
||
|
metavar = "<username>",
|
||
|
default = None,
|
||
|
help = "username; overwrites the configuration value",
|
||
|
)
|
||
|
argumentparser.add_argument(
|
||
|
"-p",
|
||
|
"--password",
|
||
|
dest = "password",
|
||
|
metavar = "<password>",
|
||
|
default = None,
|
||
|
help = "password; overwrites the configuration value",
|
||
|
)
|
||
|
'''
|
||
|
argumentparser.add_argument(
|
||
|
"-d",
|
||
|
"--domain",
|
||
|
dest = "domain",
|
||
|
default = None,
|
||
|
metavar = "<domain>",
|
||
|
help = "the domain to work with"
|
||
|
)
|
||
|
'''
|
||
|
argumentparser.add_argument(
|
||
|
"-x",
|
||
|
"--challenge-prefix",
|
||
|
dest = "challenge_prefix",
|
||
|
metavar = "<challenge-prefix>",
|
||
|
default = "_acme-challenge",
|
||
|
help = "which subdomain to use for ACME challanges",
|
||
|
)
|
||
|
argumentparser.add_argument(
|
||
|
"-w",
|
||
|
"--delay",
|
||
|
dest = "delay",
|
||
|
type = float,
|
||
|
default = 60.0,
|
||
|
metavar = "<delay>",
|
||
|
help = "seconds to wait at end of certbot auth hook",
|
||
|
)
|
||
|
argumentparser.add_argument(
|
||
|
"action",
|
||
|
type = str,
|
||
|
choices = ["info", "list", "save", "certbot-hook"],
|
||
|
metavar = "<action>",
|
||
|
help = "action to execute",
|
||
|
)
|
||
|
argumentparser.add_argument(
|
||
|
"parameter",
|
||
|
nargs = "*",
|
||
|
type = str,
|
||
|
metavar = "<parameters>",
|
||
|
help = "action specific parameters",
|
||
|
)
|
||
|
arguments = argumentparser.parse_args()
|
||
|
return arguments
|
||
|
|
||
|
|
||
|
def main(
|
||
|
):
|
||
|
arguments = args()
|
||
|
|
||
|
conf_load(arguments.conf)
|
||
|
if (not (arguments.environment is None)): conf_set("environment", arguments.environment)
|
||
|
if (not (arguments.username is None)): conf_set("account.username", arguments.username)
|
||
|
if (not (arguments.password is None)): conf_set("account.password", arguments.password)
|
||
|
|
||
|
if (arguments.action == "info"):
|
||
|
result = api_macro_info(
|
||
|
conf_get("environment"),
|
||
|
conf_get("account.username"),
|
||
|
conf_get("account.password")
|
||
|
)
|
||
|
print(_json.dumps(result, indent = "\t"))
|
||
|
elif (arguments.action == "list"):
|
||
|
domain = arguments.parameter[0]
|
||
|
result = api_macro_list(
|
||
|
conf_get("environment"),
|
||
|
conf_get("account.username"),
|
||
|
conf_get("account.password"),
|
||
|
domain
|
||
|
)
|
||
|
print(_json.dumps(result, indent = "\t"))
|
||
|
elif (arguments.action == "save"):
|
||
|
domain = arguments.parameter[0]
|
||
|
name = arguments.parameter[1]
|
||
|
type_ = arguments.parameter[2]
|
||
|
content = arguments.parameter[3]
|
||
|
api_macro_save(
|
||
|
conf_get("environment"),
|
||
|
conf_get("account.username"),
|
||
|
conf_get("account.password"),
|
||
|
domain,
|
||
|
name,
|
||
|
type_,
|
||
|
content
|
||
|
)
|
||
|
# print(_json.dumps(result, indent = "\t"))
|
||
|
elif (arguments.action == "certbot-hook"):
|
||
|
domain_full_parts = _os.environ["CERTBOT_DOMAIN"].split(".")
|
||
|
account = ".".join(domain_full_parts[-2:])
|
||
|
concern = ".".join(domain_full_parts[:-2])
|
||
|
domain = account
|
||
|
name = (arguments.challenge_prefix + "." + concern)
|
||
|
type_ = "TXT"
|
||
|
content = _os.environ["CERTBOT_VALIDATION"]
|
||
|
api_macro_save(
|
||
|
conf_get("environment"),
|
||
|
conf_get("account.username"),
|
||
|
conf_get("account.password"),
|
||
|
domain,
|
||
|
name,
|
||
|
type_,
|
||
|
content
|
||
|
)
|
||
|
_time.sleep(arguments.delay)
|
||
|
# print(_json.dumps(result, indent = "\t"))
|
||
|
else:
|
||
|
log("unhandled action '%s'" % (arguments.action, ))
|
||
|
|
||
|
|
||
|
try:
|
||
|
main()
|
||
|
except ValueError as error:
|
||
|
_sys.stderr.write(str(error) + "\n")
|
||
|
|