frontend-mondvogel/source/backend.py

192 lines
3.5 KiB
Python
Raw Permalink Normal View History

2024-06-21 14:04:58 +02:00
import typing as _typing
import os as _os
2024-06-21 14:04:58 +02:00
import json as _json
import time as _time
2024-06-21 14:04:58 +02:00
import requests as _requests
from helpers import *
from conf import *
from log import *
2024-06-21 14:26:57 +02:00
2024-06-21 14:04:58 +02:00
def backend_api_call_generic(
session_key : _typing.Optional[str],
http_method : str,
action_path : str,
data,
options : _typing.Optional[dict] = None
2024-06-21 14:04:58 +02:00
):
options = (
{
}
|
(options or {})
)
2024-06-21 14:04:58 +02:00
log_info(
"backend_api_call",
{
"with_session_key": (not (session_key is None)),
2024-06-21 14:04:58 +02:00
"http_method": http_method,
"path": action_path,
}
)
target = string_coin(
"{{scheme}}://{{host}}:{{port}}{{path_base}}{{path_action}}",
{
"scheme": conf_get()["api"]["scheme"],
"host": conf_get()["api"]["host"],
"port": ("%u" % conf_get()["api"]["port"]),
"path_base": conf_get()["api"]["path"],
"path_action": action_path,
}
)
2024-06-21 14:26:57 +02:00
headers_common = (
{}
if (session_key is None) else
{"X-Session-Key": session_key}
2024-06-21 14:26:57 +02:00
)
2024-06-21 14:04:58 +02:00
if (http_method == "GET"):
response_raw = _requests.get(
target,
2024-06-21 14:26:57 +02:00
headers = (
headers_common
)
2024-06-21 14:04:58 +02:00
)
elif (http_method == "POST"):
response_raw = _requests.post(
target,
2024-06-21 14:26:57 +02:00
headers = (
headers_common
|
{
"Content-Type": "application/json",
}
),
2024-06-21 14:04:58 +02:00
json = data
)
elif (http_method == "DELETE"):
response_raw = _requests.delete(
target,
2024-06-21 14:26:57 +02:00
headers = (
headers_common
),
2024-06-21 14:04:58 +02:00
json = data
)
else:
raise NotImplementedError("unhandled HTTP method: %s" % http_method)
2024-06-21 14:26:57 +02:00
if ((response_raw.status_code < 200) or (response_raw.status_code >= 300)):
raise ValueError("irregular HTTP response status code: %u" % response_raw.status_code)
else:
return _json.loads(response_raw.text)
2024-06-21 14:04:58 +02:00
def backend_api_call_session_begin(
2024-06-21 14:26:57 +02:00
):
return backend_api_call_generic(
None,
2024-06-21 14:04:58 +02:00
"POST",
"/session/begin",
{
"name": conf_get()["account"]["name"],
"password": conf_get()["account"]["password"],
}
)
def get_session_key(
):
2024-06-23 09:31:17 +02:00
path = conf_get()["session"]["key_path"]
if (
_os.path.exists(path)
and
(
(_time.time() - _os.path.getmtime(path))
<
conf_get()["session"]["lifetime"]
)
):
session_key = file_text_read(path)
else:
session_key = backend_api_call_session_begin()
file_text_write(path, session_key)
return session_key
def backend_api_call_wrapped(
needs_session : bool,
http_method : str,
action_path : str,
data
):
return backend_api_call_generic(
(get_session_key() if needs_session else None),
http_method,
action_path,
data
)
2024-06-21 14:04:58 +02:00
def backend_api_call_session_end(
):
return backend_api_call_generic(
True,
2024-06-21 14:04:58 +02:00
"DELETE",
"/session/end",
None
)
def backend_api_call_member_list(
):
return backend_api_call_wrapped(
True,
2024-06-21 14:04:58 +02:00
"GET",
"/member/list",
None
)
2024-06-23 11:44:05 +02:00
def backend_api_call_member_read(
member_id : int
):
return backend_api_call_wrapped(
True,
"GET",
string_coin("/member/read/{{id}}", {"id": "%u" % member_id}),
None
)
2024-06-21 14:04:58 +02:00
def backend_api_call_member_project(
membership_number : _typing.Optional[str],
name_real_value : str,
email_address_private : _typing.Optional[str],
2025-01-17 10:17:00 +01:00
groups : list[str],
2024-06-21 14:04:58 +02:00
notification_target_url_template : _typing.Optional[str]
):
return backend_api_call_wrapped(
True,
2024-06-21 14:04:58 +02:00
"POST",
"/member/project",
{
"membership_number": membership_number,
"name_real_value": name_real_value,
"email_address_private": email_address_private,
2025-01-17 10:17:00 +01:00
"groups": groups,
2024-06-21 14:04:58 +02:00
"notification_target_url_template": notification_target_url_template,
}
)
def backend_api_call_member_delete(
2024-06-23 11:44:05 +02:00
member_id : int
):
return backend_api_call_wrapped(
True,
"DELETE",
2024-06-23 11:44:05 +02:00
string_coin("/member/delete/{{id}}", {"id": ("%u" % member_id)}),
None
)