ansible-base/roles/synapse-with-pav_jsonfile/files/module.py
2024-03-29 17:21:05 +01:00

71 lines
1.7 KiB
Python

#!/usr/bin/env python3
import json as _json
'''
@see https://matrix-org.github.io/synapse/latest/modules/password_auth_provider_callbacks.html
@see https://github.com/matrix-org/synapse/blob/develop/synapse/module_api/__init__.py
'''
class class_pav_jsonfile(object):
'''
implementation
'''
@staticmethod
def parse_config(config_raw):
path = (
config_raw["path"]
if ("path" in config_raw) else
"/etc/matrix-synapse/users.json"
)
data = _json.load(open(path))
for (name, entry, ) in data.items():
if (not ("password" in entry)):
raise ValueError("users json file malformed: missing field 'password' for user '%s'" % name)
return {
"path": path,
}
'''
implementation
'''
def __init__(self, config, account_handler):
self.config = config
self.module_api = account_handler
self.module_api.register_password_auth_provider_callbacks(
auth_checkers = {("m.login.password", ("password",)): self.check_auth}
)
async def private_check_password(self, user_id, password):
name = user_id.split(":", 1)[0][1:]
data = _json.load(open(self.config["path"]))
return (
(name in data)
and
data[name].get("active", True)
and
(data[name].get("password", "") == password)
)
'''
implementation
'''
async def check_auth(self, username, login_type, login_dict):
user_id = self.module_api.get_qualified_user_id(username)
passed = await self.private_check_password(user_id, login_dict["password"])
if (not passed):
return None
else:
canonical_user_id = await self.module_api.check_user_exists(user_id)
if (canonical_user_id is None):
self.module_api.register_user(username, username, None, False)
else:
pass
return (user_id, None, )