#!/usr/bin/env python3 import json as _json ''' @see https://matrix-org.github.io/synapse/latest/modules/password_auth_provider_callbacks.html @see https://github.com/matrix-org/synapse/blob/develop/synapse/module_api/__init__.py ''' class class_pav_jsonfile(object): ''' implementation ''' @staticmethod def parse_config(config_raw): path = ( config_raw["path"] if ("path" in config_raw) else "/etc/matrix-synapse/users.json" ) data = _json.load(open(path)) for (name, entry, ) in data.items(): if (not ("password" in entry)): raise ValueError("users json file malformed: missing field 'password' for user '%s'" % name) return { "path": path, } ''' implementation ''' def __init__(self, config, account_handler): self.config = config self.module_api = account_handler self.module_api.register_password_auth_provider_callbacks( auth_checkers = {("m.login.password", ("password",)): self.check_auth} ) async def private_check_password(self, user_id, password): name = user_id.split(":", 1)[0][1:] data = _json.load(open(self.config["path"])) return ( (name in data) and data[name].get("active", True) and (data[name].get("password", "") == password) ) ''' implementation ''' async def check_auth(self, username, login_type, login_dict): user_id = self.module_api.get_qualified_user_id(username) passed = await self.private_check_password(user_id, login_dict["password"]) if (not passed): return None else: canonical_user_id = await self.module_api.check_user_exists(user_id) if (canonical_user_id is None): self.module_api.register_user(username, username, None, False) else: pass return (user_id, None, )