From f4ef700312d230f52069ec5342efdb38b7b4162e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Fra=C3=9F?= Date: Wed, 21 Aug 2024 15:06:49 +0200 Subject: [PATCH] [upd] plankton --- lib/plankton/plankton.d.ts | 33 ++++-- lib/plankton/plankton.js | 216 ++++++++++++++++++++++++++++--------- 2 files changed, 190 insertions(+), 59 deletions(-) diff --git a/lib/plankton/plankton.d.ts b/lib/plankton/plankton.d.ts index dea5b33..c343138 100644 --- a/lib/plankton/plankton.d.ts +++ b/lib/plankton/plankton.d.ts @@ -634,13 +634,13 @@ declare namespace lib_plankton.http { * @author fenris */ enum enum_method { - get = "get", - post = "post", - patch = "patch", - put = "put", - delete = "delete", options = "options", - head = "head" + head = "head", + get = "get", + delete = "delete", + post = "post", + put = "put", + patch = "patch" } /** * @author fenris @@ -748,16 +748,28 @@ declare namespace lib_plankton.server { * @author fenris */ type type_subject = { + host: string; port: int; - handle: ((input: string, metadata?: type_metadata) => Promise); + threshold: (null | float); + handle: ((input: Buffer, metadata?: type_metadata) => Promise); serverobj: any; }; /** * @author fenris */ - function make(port: int, handle: ((input: string, metadata?: type_metadata) => Promise)): type_subject; + function make(handle: ((input: string, metadata?: type_metadata) => Promise), options?: { + host?: string; + port?: int; + threshold?: (null | float); + }): type_subject; /** * @author fenris + * @deprecated + */ + function make_old(port: int, handle: ((input: string, metadata?: type_metadata) => Promise)): type_subject; + /** + * @author fenris + * @see https://nodejs.org/api/net.html#serverlistenport-host-backlog-callback */ function start(subject: type_subject): Promise; /** @@ -777,7 +789,10 @@ declare namespace lib_plankton.server { /** * @author fenris */ - constructor(port: int, handle: ((input: string) => Promise)); + constructor(handle: ((input: Buffer, metadata?: type_metadata) => Promise), options?: { + host?: string; + port?: int; + }); /** * @author fenris */ diff --git a/lib/plankton/plankton.js b/lib/plankton/plankton.js index 8b9ce06..25ca3e1 100644 --- a/lib/plankton/plankton.js +++ b/lib/plankton/plankton.js @@ -1689,13 +1689,13 @@ var lib_plankton; */ let enum_method; (function (enum_method) { - enum_method["get"] = "get"; - enum_method["post"] = "post"; - enum_method["patch"] = "patch"; - enum_method["put"] = "put"; - enum_method["delete"] = "delete"; enum_method["options"] = "options"; enum_method["head"] = "head"; + enum_method["get"] = "get"; + enum_method["delete"] = "delete"; + enum_method["post"] = "post"; + enum_method["put"] = "put"; + enum_method["patch"] = "patch"; })(enum_method = http.enum_method || (http.enum_method = {})); })(http = lib_plankton.http || (lib_plankton.http = {})); })(lib_plankton || (lib_plankton = {})); @@ -1892,9 +1892,9 @@ var lib_plankton; headers[key.toLowerCase()] = value; } } - const body = ([http.enum_method.get, http.enum_method.head, http.enum_method.options, http.enum_method.delete].includes(method) - ? null - : Buffer["from"](lines.join(linebreak))); + const body = ([http.enum_method.post, http.enum_method.put, http.enum_method.patch].includes(method) + ? Buffer["from"](lines.join(linebreak)) + : null); const request = { // TODO "scheme": "http", @@ -1994,24 +1994,51 @@ var lib_plankton; } case "fetch": { function core(signal) { - return (fetch(target, { + return (fetch(target, Object.assign({ "method": ((method => { switch (method) { - case http.enum_method.get: return "GET"; - case http.enum_method.post: return "POST"; - case http.enum_method.patch: return "PATCH"; - case http.enum_method.put: return "PUT"; - case http.enum_method.delete: return "DELETE"; - case http.enum_method.options: return "OPTIONS"; case http.enum_method.head: return "HEAD"; + case http.enum_method.options: return "OPTIONS"; + case http.enum_method.get: return "GET"; + case http.enum_method.delete: return "DELETE"; + case http.enum_method.post: return "POST"; + case http.enum_method.put: return "PUT"; + case http.enum_method.patch: return "PATCH"; } })(request.method)), "headers": request.headers, - "redirect": (options.follow_redirects ? "follow" : "manual"), - "signal": (signal ?? undefined), - }) + /* + "redirect": ( + options.follow_redirects + ? + "follow" + : + "manual" + ), + */ + "signal": (signal + ?? + undefined), + // "keepalive": false, + }, ((((method => { + switch (method) { + case http.enum_method.head: return false; + case http.enum_method.options: return false; + case http.enum_method.get: return false; + case http.enum_method.delete: return false; + case http.enum_method.post: return true; + case http.enum_method.put: return true; + case http.enum_method.patch: return true; + } + })(request.method)) + && + (request.body !== null)) + ? { + "body": request.body.toString(), + } + : {}))) .catch((reason) => { - console.info(reason); + // console.info(reason); return Promise.reject(reason); }) .then((response_raw) => (response_raw.text() @@ -2057,7 +2084,9 @@ var lib_plankton; break; } case "http_module": { + // @ts-ignore const nm_http = require("http"); + // @ts-ignore const nm_https = require("https"); return (new Promise((resolve, reject) => { const req = ((request.scheme === "https") @@ -2201,9 +2230,16 @@ var lib_plankton; /** * @author fenris */ - function make(port, handle) { + function make(handle, options = {}) { + options = Object.assign({ + "host": "::", + "port": 9999, + "threshold": 0.25, + }, options); return { - "port": port, + "host": options.host, + "port": options.port, + "threshold": options.threshold, "handle": handle, "serverobj": undefined, }; @@ -2211,49 +2247,122 @@ var lib_plankton; server.make = make; /** * @author fenris + * @deprecated + */ + function make_old(port, handle) { + return make(handle, { + "host": "::", + "port": port, + "threshold": 0.25, + }); + } + server.make_old = make_old; + /** + * @author fenris + * @see https://nodejs.org/api/net.html#serverlistenport-host-backlog-callback */ function start(subject) { const net = require("net"); return (new Promise((resolve, reject) => { - subject.serverobj = net.createServer((socket) => { - lib_plankton.log.info("server_client connected", {}); - socket.on("readable", () => { - let chunk; - while (!((chunk = socket.read()) === null)) { - const input = chunk.toString(); - lib_plankton.log.debug("server_reading", { - "input": input, + // @ts-ignore + let input_chunks = []; + subject.serverobj = net.createServer({ + "allowHalfOpen": false, + }, (socket) => { + let timeout_handler = null; + let ended = false; + const process_input = function () { + // @ts-ignore + const input = Buffer.concat(input_chunks); + /* + const metadata : type_metadata = { + "ip_address": socket.remoteAddress, + }; + */ + lib_plankton.log.debug("server_process_input", { + "input": input, + }); + (subject.handle(input /*, metadata*/) + .then((output) => { + lib_plankton.log.debug("server_writing", { + "output": output, }); - const metadata = { - "ip_address": socket.remoteAddress, - }; - subject.handle(input, metadata) - .then((output) => { - lib_plankton.log.debug("server_writing", { - "output": output, - }); - socket.write(output); - socket.end(); - }) - .catch((error) => { - lib_plankton.log.warning("server_handle_failed", { - "error": error.toString(), - }); - // socket.write(""); - socket.end(); + socket.write(output); + socket.end(); + }) + .catch((error) => { + lib_plankton.log.warning("server_handle_failed", { + "error": error.toString(), }); + // socket.write(""); + socket.end(); + }) + .then(() => { + input_chunks = []; + })); + }; + const timeout_stop = function () { + if (timeout_handler === null) { + // do nothing } + else { + lib_plankton.log.debug("server_timeout_cancelling"); + clearTimeout(timeout_handler); + timeout_handler = null; + } + }; + const timeout_start = function () { + if (subject.threshold === null) { + process_input(); + } + else { + if (timeout_handler === null) { + timeout_handler = setTimeout(() => { + lib_plankton.log.debug("server_timeout_reached"); + timeout_handler = null; + process_input(); + }, (subject.threshold * 1000)); + } + else { + lib_plankton.log.warning("server_timeout_already_started"); + // do nothing + } + } + }; + lib_plankton.log.info("server_client connected", {}); + socket.on("data", (input_chunk_raw) => { + lib_plankton.log.debug("server_reading_chunk", { + "chunk_raw": input_chunk_raw, + }); + timeout_stop(); + const input_chunk = ((input_chunk_raw instanceof Buffer) + ? + input_chunk_raw + : + // @ts-ignore + Buffer.from(input_chunk_raw)); + input_chunks.push(input_chunk); + timeout_start(); }); socket.on("end", () => { - lib_plankton.log.info("server_client_disconnected", {}); + if (!ended) { + lib_plankton.log.info("server_client_disconnected", {}); + ended = true; + timeout_stop(); + } + else { + lib_plankton.log.info("server_socket_already_ended"); + // do nothing + } }); }); subject.serverobj.on("error", (error) => { // throw error; process.stderr.write("net_error: " + String(error) + "\n\n"); }); - subject.serverobj.listen(subject.port, () => { + subject.serverobj.listen(subject.port, subject.host, 511, () => { lib_plankton.log.info("server_listenting", { + "host": subject.host, "port": subject.port, }); resolve(undefined); @@ -2301,8 +2410,15 @@ var lib_plankton; /** * @author fenris */ - constructor(port, handle) { - this.subject = server.make(port, handle); + constructor(handle, options = {}) { + options = Object.assign({ + "host": "::", + "port": 9999, + }, options); + this.subject = server.make(handle, { + "host": options.host, + "port": options.port, + }); } /** * @author fenris