-
Notifications
You must be signed in to change notification settings - Fork 11
Generalize ASGI Middleware used in Quart to a function #564
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a2d632e
d708e59
44b28b0
dbdd8f7
fc3c5fd
3e17be0
fa05e8d
d86a170
706bde2
bc6b241
c4d1a53
b02ecca
840b0c5
8448c56
a9818dc
a33f58d
4959447
5e3587a
7f97945
8b80276
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| from aikido_zen.cli import main | ||
|
|
||
| if __name__ == "__main__": | ||
| main() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| import sys | ||
| import subprocess | ||
| from typing import List | ||
|
|
||
| import aikido_zen | ||
|
|
||
|
|
||
| def run_command(args: List[str]) -> int: | ||
| """ | ||
| Run the provided command as a subprocess. | ||
| """ | ||
| if not args: | ||
| print("Error: No command provided.", file=sys.stderr) | ||
| return 1 | ||
|
|
||
| try: | ||
| # Run the command as a subprocess | ||
| result = subprocess.run(args, check=False) | ||
| return result.returncode | ||
| except KeyboardInterrupt: | ||
| print("\nCommand interrupted by user.", file=sys.stderr) | ||
| return 130 # SIGINT exit code | ||
| except Exception as e: | ||
| print(f"Error: {e}", file=sys.stderr) | ||
| return 1 | ||
|
|
||
| def main() -> int: | ||
| """ | ||
| Entry point for aikido_zen cli tool. | ||
| """ | ||
| if len(sys.argv) < 2: | ||
| print("Usage: aikido_zen <command> [args...]", file=sys.stderr) | ||
| return 1 | ||
|
|
||
| # Start background process | ||
| aikido_zen.protect(mode="daemon_only") | ||
|
|
||
| # The command to run is everything after `aikido_zen` | ||
| command = sys.argv[1:] | ||
| status = run_command(command) | ||
|
|
||
| return status | ||
|
|
||
| if __name__ == "__main__": | ||
| sys.exit(main()) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| import inspect | ||
| from aikido_zen.context import Context | ||
| from aikido_zen.helpers.get_argument import get_argument | ||
| from aikido_zen.sinks import before_async, patch_function | ||
| from aikido_zen.sources.functions.request_handler import request_handler, post_response | ||
| from aikido_zen.thread.thread_cache import get_cache | ||
|
|
||
|
|
||
| class InternalASGIMiddleware: | ||
| def __init__(self, app, source: str): | ||
| self.client_app = app | ||
| self.source = source | ||
|
|
||
| async def __call__(self, scope, receive, send): | ||
| if not scope or scope.get("type") != "http": | ||
| # Zen checks requests coming into HTTP(S) server, ignore other requests (like ws) | ||
| return await self.continue_app(scope, receive, send) | ||
|
|
||
| context = Context(req=scope, source=self.source) | ||
|
|
||
| process_cache = get_cache() | ||
| if process_cache and process_cache.is_bypassed_ip(context.remote_address): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. process_cache.is_bypassed_ip(...) short-circuits request handling and skips context setup; avoid silent bypasses or gate them behind explicit config/audit logging. Detailsβ¨ AI Reasoning π§ How do I fix it? More info - Comment |
||
| # IP address is bypassed, for simplicity we do not set a context, | ||
| # and we do not do any further handling of the request. | ||
| return await self.continue_app(scope, receive, send) | ||
|
|
||
| context.set_as_current_context() | ||
| if process_cache: | ||
| # Since this SHOULD be the highest level of the apps we wrap, this is the safest place | ||
| # to increment total hits. | ||
| process_cache.stats.increment_total_hits() | ||
|
|
||
| intercept_response = request_handler(stage="pre_response") | ||
| if intercept_response: | ||
| # The request has already been blocked (e.g. IP is on blocklist) | ||
| return await send_status_code_and_text(send, intercept_response) | ||
|
|
||
| return await self.run_with_intercepts(scope, receive, send) | ||
|
|
||
| async def run_with_intercepts(self, scope, receive, send): | ||
| # We use a skeleton class so we can use patch_function (and the logic already defined in @before_async) | ||
| class InterceptorSkeleton: | ||
| @staticmethod | ||
| async def send(*args, **kwargs): | ||
| return await send(*args, **kwargs) | ||
|
|
||
| patch_function(InterceptorSkeleton, "send", send_interceptor) | ||
|
|
||
| return await self.continue_app(scope, receive, InterceptorSkeleton.send) | ||
|
|
||
| async def continue_app(self, scope, receive, send): | ||
| client_app_parameters = len(inspect.signature(self.client_app).parameters) | ||
| if client_app_parameters == 2: | ||
| # This is possible if the app is still using ASGI v2.0 | ||
| # See https://asgi.readthedocs.io/en/latest/specs/main.html#legacy-applications | ||
| # client_app = coroutine application_instance(receive, send) | ||
| await self.client_app(receive, send) | ||
| else: | ||
| # client_app = coroutine application(scope, receive, send) | ||
| await self.client_app(scope, receive, send) | ||
|
|
||
|
|
||
| async def send_status_code_and_text(send, pre_response): | ||
| await send( | ||
| { | ||
| "type": "http.response.start", | ||
| "status": pre_response[1], | ||
| "headers": [(b"content-type", b"text/plain")], | ||
| } | ||
| ) | ||
| await send( | ||
| { | ||
| "type": "http.response.body", | ||
| "body": pre_response[0].encode("utf-8"), | ||
| "more_body": False, | ||
| } | ||
| ) | ||
|
|
||
|
|
||
| @before_async | ||
| async def send_interceptor(func, instance, args, kwargs): | ||
| # There is no name for the send() comment in the standard, it really depends (quart uses message) | ||
| event = get_argument(args, kwargs, 0, name="message") | ||
|
|
||
| # https://asgi.readthedocs.io/en/latest/specs/www.html#response-start-send-event | ||
| if not event or "http.response.start" not in event.get("type", ""): | ||
| # If the event is not of type http.response.start it won't contain the status code. | ||
| # And this event is required before sending over a body (so even 200 status codes are intercepted). | ||
| return | ||
|
|
||
| if "status" in event: | ||
| # Handle post response logic (attack waves, route reporting, ...) | ||
| post_response(status_code=int(event.get("status"))) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| from aikido_zen.context import get_current_context | ||
| from .functions.asgi_middleware import InternalASGIMiddleware | ||
| from ..helpers.get_argument import get_argument | ||
| from ..sinks import on_import, patch_function, before_async, after | ||
| from aikido_zen.helpers.logging import logger | ||
|
|
||
| @before_async | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function '_call_before' name is vague and undocumented; add a clear name and short docstring explaining its role and inputs (it runs prior to ASGI app execution and invokes the InternalASGIMiddleware). Detailsβ¨ AI Reasoning π§ How do I fix it? More info - Comment |
||
| async def _call_before(func, instance, args, kwargs): | ||
| scope = get_argument(args, kwargs, 0, "scope") | ||
| receive = get_argument(args, kwargs, 1, "receive") | ||
| send = get_argument(args, kwargs, 2, "send") | ||
| await InternalASGIMiddleware(instance.app, "hypercorn_asgi")(scope, receive, send) | ||
|
|
||
|
|
||
| @on_import("hypercorn.app_wrappers", "hypercorn") | ||
| def patch(m): | ||
| """ | ||
| https://github.com/pgjones/hypercorn/blob/0e2311f1ad2ae587aaa590f3824f59aa5dc0e770/src/hypercorn/asyncio/__init__.py#L13 | ||
| We patch serve(...), which gets the ASGI app. | ||
| And we want to be the first middleware to run. | ||
| - patches Quart.__call__ (handles internal asgi middleware) | ||
| - patches Quart.handle_request (Stores body/cookies) | ||
| """ | ||
| patch_function(m, "ASGIWrapper.__call__", _call_before) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,18 +1,32 @@ | ||
| from aikido_zen.context import Context, get_current_context | ||
| from .functions.request_handler import request_handler | ||
| import inspect | ||
| from aikido_zen.context import get_current_context | ||
| from .functions.asgi_middleware import InternalASGIMiddleware | ||
| from ..helpers.get_argument import get_argument | ||
| from ..sinks import on_import, patch_function, before, before_async | ||
| from ..sinks import on_import, patch_function, before_async, after | ||
|
|
||
|
|
||
| @before | ||
| def _call(func, instance, args, kwargs): | ||
| async def _call_coroutine(func, instance, args, kwargs): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rename _call_coroutine to a descriptive name (e.g., wrap_quart_call_coroutine) or add a docstring explaining it wraps Quart.call and delegates to InternalASGIMiddleware. Detailsβ¨ AI Reasoning π§ How do I fix it? More info - Comment |
||
| scope = get_argument(args, kwargs, 0, "scope") | ||
| if not scope or scope.get("type") != "http": | ||
| return | ||
| receive = get_argument(args, kwargs, 1, "receive") | ||
| send = get_argument(args, kwargs, 2, "send") | ||
|
|
||
| await InternalASGIMiddleware(func, "quart")(scope, receive, send) | ||
|
|
||
| new_context = Context(req=scope, source="quart") | ||
| new_context.set_as_current_context() | ||
| request_handler(stage="init") | ||
|
|
||
| @after | ||
| def _call(func, instance, args, kwargs, return_value): | ||
| """ | ||
| Legacy ASGI v2.0 | ||
| func: application(scope) | ||
| return_value: coroutine application_instance(receive, send) | ||
| """ | ||
| scope = get_argument(args, kwargs, 0, "scope") | ||
|
|
||
| async def application_instance(receive, send): | ||
| await InternalASGIMiddleware(return_value, "quart")(scope, receive, send) | ||
|
|
||
| # Modify return_value | ||
| return_value = application_instance | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The parameter 'return_value' is reassigned to application_instance. Avoid reassigning function parameters; use a new local variable (e.g., new_return_value) and return that instead. Detailsβ¨ AI Reasoning π§ How do I fix it? More info - Comment |
||
|
|
||
|
|
||
| @before_async | ||
|
|
@@ -37,60 +51,22 @@ async def _handle_request_before(func, instance, args, kwargs): | |
| context.set_as_current_context() | ||
|
|
||
|
|
||
| async def _handle_request_after(func, instance, args, kwargs): | ||
| # pylint:disable=import-outside-toplevel # We don't want to install this by default | ||
| from werkzeug.exceptions import HTTPException | ||
|
|
||
| try: | ||
| response = await func(*args, **kwargs) | ||
| if hasattr(response, "status_code"): | ||
| request_handler(stage="post_response", status_code=response.status_code) | ||
| return response | ||
| except HTTPException as e: | ||
| request_handler(stage="post_response", status_code=e.code) | ||
| raise e | ||
|
|
||
|
|
||
| async def _asgi_app(func, instance, args, kwargs): | ||
| scope = get_argument(args, kwargs, 0, "scope") | ||
| if not scope or scope.get("type") != "http": | ||
| return await func(*args, **kwargs) | ||
| send = get_argument(args, kwargs, 2, "send") | ||
| if not send: | ||
| return await func(*args, **kwargs) | ||
|
|
||
| pre_response = request_handler(stage="pre_response") | ||
| if pre_response: | ||
| return await send_status_code_and_text(send, pre_response) | ||
| return await func(*args, **kwargs) | ||
|
|
||
|
|
||
| async def send_status_code_and_text(send, pre_response): | ||
| await send( | ||
| { | ||
| "type": "http.response.start", | ||
| "status": pre_response[1], | ||
| "headers": [(b"content-type", b"text/plain")], | ||
| } | ||
| ) | ||
| await send( | ||
| { | ||
| "type": "http.response.body", | ||
| "body": pre_response[0].encode("utf-8"), | ||
| "more_body": False, | ||
| } | ||
| ) | ||
|
|
||
|
|
||
| @on_import("quart.app", "quart") | ||
| def patch(m): | ||
| """ | ||
| patching module quart.app | ||
| - patches Quart.__call__ (creates Context) | ||
| - patches Quart.handle_request (Stores body/cookies, checks status code) | ||
| - patches Quart.asgi_app (Pre-response: puts in messages when request is blocked) | ||
| We patch Quart.__call__ instead of asgi_app, because asgi_app itself can be wrapped multiple times | ||
| And we want to be the first middleware to run. | ||
| - patches Quart.__call__ (handles internal asgi middleware) | ||
| - patches Quart.handle_request (Stores body/cookies) | ||
| """ | ||
| patch_function(m, "Quart.__call__", _call) | ||
|
|
||
| if inspect.iscoroutine(m.Quart.__call__): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using inspect.iscoroutine on Quart.call will not detect coroutine functions, making the coroutine path unreachable and always treating apps as legacy. Use inspect.iscoroutinefunction (or equivalent) for this check. Detailsβ¨ AI Reasoning π§ How do I fix it? More info - Comment |
||
| # coroutine application(scope, receive, send) | ||
| patch_function(m, "Quart.__call__", _call_coroutine) | ||
| else: | ||
| # Legacy ASGI v2.0 | ||
| # https://asgi.readthedocs.io/en/latest/specs/main.html#legacy-applications | ||
| # application(scope): coroutine application_instance(receive, send) | ||
| patch_function(m, "Quart.__call__", _call) | ||
|
|
||
| patch_function(m, "Quart.handle_request", _handle_request_before) | ||
| patch_function(m, "Quart.handle_request", _handle_request_after) | ||
| patch_function(m, "Quart.asgi_app", _asgi_app) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| include ../common.mk | ||
|
|
||
| PORT = 8096 | ||
| PORT_DISABLED = 8097 | ||
|
|
||
| .PHONY: run | ||
| run: install | ||
| @echo "Running sample app quart-postgres-uvicorn with Zen on port $(PORT)" | ||
| $(AIKIDO_ENV_COMMON) \ | ||
| poetry run aikido_zen hypercorn app:app -b 0.0.0.0:$(PORT) --workers 2 | ||
|
|
||
| .PHONY: runZenDisabled | ||
| runZenDisabled: install | ||
| @echo "Running sample app quart-postgres-uvicorn without Zen on port $(PORT_DISABLED)" | ||
| $(AIKIDO_ENV_DISABLED) \ | ||
| poetry run hypercorn app:app -b 0.0.0.0:$(PORT_DISABLED) --workers 2 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| # Quart w/ Postgres and Uvicorn Sample app | ||
| It runs **multi-threaded** and **async** | ||
|
|
||
| ## Getting started | ||
| Run : | ||
| ```bash | ||
| make run # Runs app with zen | ||
| make runZenDisabled # Runs app with zen disabled. | ||
| ``` | ||
|
|
||
| - You'll be able to access the Quart Server at : [localhost:8096](http://localhost:8096) | ||
| - To Create a reference test dog use `http://localhost:8096/create/` | ||
| - To Create a reference test dog (with executemany) use `http://localhost:8096/create_many/` | ||
|
|
||
| - To test a sql injection enter the following dog name : `Malicious dog', TRUE); -- ` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inline comment only restates argv slicing. Remove it or replace with a brief 'why' (rationale for passing args to the executed command).
Details
β¨ AI Reasoning
βA comment was added that merely restates the code's behavior (how argv is sliced). Such "what" comments add little value and may rot; prefer removing it or replacing with a brief explanation of rationale or edge-cases the slicing addresses.
π§ How do I fix it?
Write comments that explain the purpose, reasoning, or business logic behind the code using words like 'because', 'so that', or 'in order to'.
More info - Comment
@AikidoSec feedback: [FEEDBACK]to get better review comments in the future.