diff --git a/packages/developer_mcp_server/src/developer_mcp_server/register_tools.py b/packages/developer_mcp_server/src/developer_mcp_server/register_tools.py index 27f523a..e509430 100644 --- a/packages/developer_mcp_server/src/developer_mcp_server/register_tools.py +++ b/packages/developer_mcp_server/src/developer_mcp_server/register_tools.py @@ -1,4 +1,5 @@ from gg_api_core.mcp_server import AbstractGitGuardianFastMCP +from gg_api_core.schema_utils import compress_pydantic_model_schema from gg_api_core.tools.find_current_source_id import find_current_source_id from gg_api_core.tools.generate_honey_token import generate_honeytoken from gg_api_core.tools.list_honeytokens import list_honeytokens @@ -45,15 +46,16 @@ def register_developer_tools(mcp: AbstractGitGuardianFastMCP): - mcp.tool( + remediate_tool = mcp.tool( remediate_secret_incidents, description="Find and fix secrets in the current repository using exact match locations (file paths, line numbers, character indices). " "This tool leverages the occurrences API to provide precise remediation instructions without needing to search for secrets in files. " "By default, this only shows incidents assigned to the current user. Pass mine=False to get all incidents related to this repo.", required_scopes=["incidents:read", "sources:read"], ) + remediate_tool.parameters = compress_pydantic_model_schema(remediate_tool.parameters) - mcp.tool( + scan_tool = mcp.tool( scan_secrets, description=""" Scan multiple content items for secrets and policy breaks. @@ -65,44 +67,51 @@ def register_developer_tools(mcp: AbstractGitGuardianFastMCP): """, required_scopes=["scan"], ) + scan_tool.parameters = compress_pydantic_model_schema(scan_tool.parameters) - mcp.tool( + list_incidents_tool = mcp.tool( list_incidents, description="List secret incidents or occurrences related to a specific repository" "With mine=True, this tool only shows incidents assigned to the current user.", required_scopes=["incidents:read", "sources:read"], ) + list_incidents_tool.parameters = compress_pydantic_model_schema(list_incidents_tool.parameters) - mcp.tool( + list_repo_occurrences_tool = mcp.tool( list_repo_occurrences, description="List secret occurrences for a specific repository with exact match locations. " "Returns detailed occurrence data including file paths, line numbers, and character indices where secrets were detected. " "Use this tool when you need to locate and remediate secrets in the codebase with precise file locations.", required_scopes=["incidents:read"], ) + list_repo_occurrences_tool.parameters = compress_pydantic_model_schema(list_repo_occurrences_tool.parameters) - mcp.tool( + find_source_tool = mcp.tool( find_current_source_id, description="Find the GitGuardian source_id for the current repository. " "This tool automatically detects the current git repository and searches for its source_id in GitGuardian. " "Useful when you need to reference the repository in other API calls.", required_scopes=["sources:read"], ) + find_source_tool.parameters = compress_pydantic_model_schema(find_source_tool.parameters) - mcp.tool( + generate_honeytoken_tool = mcp.tool( generate_honeytoken, description="Generate an AWS GitGuardian honeytoken and get injection recommendations", required_scopes=["honeytokens:write"], ) + generate_honeytoken_tool.parameters = compress_pydantic_model_schema(generate_honeytoken_tool.parameters) - mcp.tool( + list_honeytokens_tool = mcp.tool( list_honeytokens, description="List honeytokens from the GitGuardian dashboard with filtering options", required_scopes=["honeytokens:read"], ) + list_honeytokens_tool.parameters = compress_pydantic_model_schema(list_honeytokens_tool.parameters) - mcp.tool( + list_users_tool = mcp.tool( list_users, description="List users on the workspace/account", required_scopes=["members:read"], ) + list_users_tool.parameters = compress_pydantic_model_schema(list_users_tool.parameters) diff --git a/packages/gg_api_core/pyproject.toml b/packages/gg_api_core/pyproject.toml index 187a514..7d53708 100644 --- a/packages/gg_api_core/pyproject.toml +++ b/packages/gg_api_core/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "python-dotenv>=1.0.0", "pydantic-settings>=2.0.0", "jinja2>=3.1.0", + "jsonref>=1.1.0", ] [project.optional-dependencies] diff --git a/packages/gg_api_core/src/gg_api_core/mcp_server.py b/packages/gg_api_core/src/gg_api_core/mcp_server.py index 3731da0..491a68f 100644 --- a/packages/gg_api_core/src/gg_api_core/mcp_server.py +++ b/packages/gg_api_core/src/gg_api_core/mcp_server.py @@ -5,12 +5,12 @@ from collections.abc import AsyncIterator, Sequence from contextlib import asynccontextmanager from enum import Enum -from typing import Any +from typing import Any, Callable from fastmcp import FastMCP from fastmcp.exceptions import FastMCPError, ValidationError from fastmcp.server.dependencies import get_http_headers -from fastmcp.server.middleware import Middleware +from fastmcp.server.middleware import Middleware, MiddlewareContext from fastmcp.tools import Tool from mcp.types import Tool as MCPTool @@ -141,6 +141,8 @@ def __init__(self, *args, default_scopes: list[str] | None = None, **kwargs): # Map each tool to its required scopes (instance attribute) self._tool_scopes: dict[str, set[str]] = {} + # Add middleware for parameter preprocessing (must be first to preprocess before validation) + self.add_middleware(self._parameter_preprocessing_middleware) self.add_middleware(ScopeFilteringMiddleware(self)) def clear_cache(self) -> None: @@ -248,6 +250,100 @@ async def get_scopes(self) -> set[str]: logger.debug(f"scopes: {scopes}") return scopes + async def _parameter_preprocessing_middleware(self, context: MiddlewareContext, call_next: Callable) -> Any: + """Middleware to preprocess tool parameters to handle Claude Code bug. + + Claude Code has a bug where it serializes Pydantic model parameters as JSON strings + instead of proper dictionaries. This middleware intercepts tools/call requests and + converts stringified JSON parameters back to dictionaries before validation. + + See: https://github.com/anthropics/claude-code/issues/3084 + """ + import json + + # Only apply to tools/call requests + if context.method != "tools/call": + return await call_next(context) + + # Check if we have arguments to preprocess + if not hasattr(context, "params") or not context.params: + return await call_next(context) + + params = context.params + arguments = params.get("arguments", {}) + + # Log what we received for debugging + logger.debug(f"Middleware received arguments: {arguments} (type: {type(arguments)})") + + # If arguments is empty or not a dict, nothing to preprocess + if not isinstance(arguments, dict): + logger.debug("Arguments is not a dict, skipping preprocessing") + return await call_next(context) + + # Look for stringified JSON in parameter values + preprocessed_arguments = {} + modified = False + + for key, value in arguments.items(): + logger.debug(f"Processing parameter '{key}': {repr(value)} (type: {type(value)})") + if isinstance(value, str) and value.strip().startswith("{"): + # Looks like stringified JSON, try to parse it + try: + parsed = json.loads(value) + if isinstance(parsed, dict): + logger.info(f"Preprocessed parameter '{key}': converted JSON string to dict") + preprocessed_arguments[key] = parsed + modified = True + else: + preprocessed_arguments[key] = value + except (json.JSONDecodeError, ValueError) as e: + # Not valid JSON, keep original value + logger.debug(f"Failed to parse '{key}' as JSON: {e}") + preprocessed_arguments[key] = value + else: + preprocessed_arguments[key] = value + + # Update context with preprocessed arguments if we modified anything + if modified: + context.params["arguments"] = preprocessed_arguments + # Also update context.message.arguments which FastMCP uses + if hasattr(context, "message") and hasattr(context.message, "arguments"): + context.message.arguments = preprocessed_arguments + logger.debug(f"Updated arguments: {preprocessed_arguments}") + + return await call_next(context) + + async def _scope_filtering_middleware(self, context: MiddlewareContext, call_next: Callable) -> Any: + """Middleware to filter tools based on token scopes. + + This middleware intercepts tools/list requests and filters the tools + based on the user's API token scopes. + + The authentication strategy determines whether to use cached scopes + or fetch them per-request. + """ + # Only apply filtering to tools/list requests + if context.method != "tools/list": + return await call_next(context) + + # Get all tools from the next middleware/handler + all_tools = await call_next(context) + + # Filter tools by scopes + scopes = await self.get_scopes() + filtered_tools = [] + for tool in all_tools: + tool_name = tool.name + required_scopes = self._tool_scopes.get(tool_name, set()) + + if not required_scopes or required_scopes.issubset(scopes): + filtered_tools.append(tool) + else: + missing_scopes = required_scopes - scopes + logger.info(f"Removing tool '{tool_name}' due to missing scopes: {', '.join(missing_scopes)}") + + return filtered_tools + async def list_tools(self) -> list[MCPTool]: """ Public method to list tools (for compatibility with tests and external code). diff --git a/packages/gg_api_core/src/gg_api_core/schema_utils.py b/packages/gg_api_core/src/gg_api_core/schema_utils.py new file mode 100644 index 0000000..ceb3533 --- /dev/null +++ b/packages/gg_api_core/src/gg_api_core/schema_utils.py @@ -0,0 +1,60 @@ +"""Utility functions for MCP tool schema manipulation.""" + +import json +import logging + +import jsonref + +logger = logging.getLogger(__name__) + + +def compress_pydantic_model_schema(tool_parameters: dict) -> dict: + """ + Compress a tool schema by flattening nested Pydantic model parameters. + When FastMCP tools use a single Pydantic model as a parameter (e.g., `params: MyParamsModel`), + the generated schema nests all the model's fields under that parameter name. This creates + an extra level of nesting that some MCP clients (like Claude Code) handle incorrectly. + This function flattens the schema by: + 1. Resolving all JSON references + 2. Extracting the properties from the nested model + 3. Promoting them to top-level parameters + Example: + Before compression: + { + "type": "object", + "properties": { + "params": { + "type": "object", + "properties": { + "source_id": {"type": "integer"}, + "get_all": {"type": "boolean"} + } + } + } + } + After compression: + { + "type": "object", + "properties": { + "source_id": {"type": "integer"}, + "get_all": {"type": "boolean"} + } + } + Args: + tool_parameters: The tool's parameters schema (from tool.parameters) + Returns: + Compressed schema with flattened parameters + """ + try: + # Resolve all JSON references + resolved = jsonref.replace_refs(tool_parameters) + + # Convert back to plain dict (jsonref returns a special JsonRef object) + compressed = json.loads(jsonref.dumps(resolved)) + + logger.debug(f"Compressed tool schema: {json.dumps(compressed, indent=2)}") + return compressed + except Exception as e: + logger.exception(f"Failed to compress schema: {str(e)}") + # Return original schema if compression fails + return tool_parameters diff --git a/packages/secops_mcp_server/src/secops_mcp_server/server.py b/packages/secops_mcp_server/src/secops_mcp_server/server.py index 4b6a541..b0b95a0 100644 --- a/packages/secops_mcp_server/src/secops_mcp_server/server.py +++ b/packages/secops_mcp_server/src/secops_mcp_server/server.py @@ -7,6 +7,7 @@ from developer_mcp_server.register_tools import register_developer_tools from fastmcp.exceptions import ToolError from gg_api_core.mcp_server import get_mcp_server, register_common_tools +from gg_api_core.schema_utils import compress_pydantic_model_schema from gg_api_core.scopes import set_secops_scopes from gg_api_core.tools.assign_incident import assign_incident from gg_api_core.tools.create_code_fix_request import create_code_fix_request @@ -150,59 +151,70 @@ async def get_current_token_info() -> dict[str, Any]: raise ToolError(f"Error: {str(e)}") -mcp.tool( +# Register SecOps tools with schema compression for Pydantic model parameters + +update_tags_tool = mcp.tool( update_or_create_incident_custom_tags, description="Update or create custom tags for a secret incident", required_scopes=["incidents:write", "custom_tags:write"], ) +update_tags_tool.parameters = compress_pydantic_model_schema(update_tags_tool.parameters) -mcp.tool( +update_status_tool = mcp.tool( update_incident_status, description="Update a secret incident with status", required_scopes=["incidents:write"], ) +update_status_tool.parameters = compress_pydantic_model_schema(update_status_tool.parameters) -mcp.tool( +read_tags_tool = mcp.tool( read_custom_tags, description="Read custom tags from the GitGuardian dashboard.", required_scopes=["custom_tags:read"], ) +read_tags_tool.parameters = compress_pydantic_model_schema(read_tags_tool.parameters) -mcp.tool( +write_tags_tool = mcp.tool( write_custom_tags, description="Create or delete custom tags in the GitGuardian dashboard.", required_scopes=["custom_tags:write"], ) +write_tags_tool.parameters = compress_pydantic_model_schema(write_tags_tool.parameters) -mcp.tool( +manage_incident_tool = mcp.tool( manage_private_incident, description="Manage a secret incident (assign, unassign, resolve, ignore, reopen)", required_scopes=["incidents:write"], ) +manage_incident_tool.parameters = compress_pydantic_model_schema(manage_incident_tool.parameters) -mcp.tool( +list_users_tool = mcp.tool( list_users, description="List users on the workspace/account", required_scopes=["members:read"], ) +list_users_tool.parameters = compress_pydantic_model_schema(list_users_tool.parameters) -mcp.tool( +revoke_secret_tool = mcp.tool( revoke_secret, description="Revoke a secret by its ID through the GitGuardian API", required_scopes=["write:secret"], ) +revoke_secret_tool.parameters = compress_pydantic_model_schema(revoke_secret_tool.parameters) -mcp.tool( +assign_incident_tool = mcp.tool( assign_incident, description="Assign a secret incident to a specific member or to the current user", required_scopes=["incidents:write"], ) +assign_incident_tool.parameters = compress_pydantic_model_schema(assign_incident_tool.parameters) -mcp.tool( +create_fix_tool = mcp.tool( create_code_fix_request, description="Create code fix requests for multiple secret incidents with their locations. This will generate pull requests to automatically remediate the detected secrets.", required_scopes=["incidents:write"], ) +create_fix_tool.parameters = compress_pydantic_model_schema(create_fix_tool.parameters) register_common_tools(mcp) diff --git a/scripts/call_mcp_http_server.py b/scripts/call_mcp_http_server.py index 3ca765a..23087f4 100644 --- a/scripts/call_mcp_http_server.py +++ b/scripts/call_mcp_http_server.py @@ -13,6 +13,9 @@ async def main(): tools = await client.list_tools() print(tools) + users = await client.call_tool("list_users", {"params": {}}) + print(users) + if __name__ == "__main__": asyncio.run(main()) \ No newline at end of file diff --git a/tests/test_middleware_preprocessing.py b/tests/test_middleware_preprocessing.py new file mode 100644 index 0000000..b1ac90c --- /dev/null +++ b/tests/test_middleware_preprocessing.py @@ -0,0 +1,156 @@ +"""Tests for middleware parameter preprocessing to handle Claude Code bug. + +Claude Code has a bug where it serializes Pydantic model parameters as JSON strings +instead of proper dictionaries. The middleware in mcp_server.py converts these strings +back to dicts before FastMCP's validation layer. + +See: https://github.com/anthropics/claude-code/issues/3084 +""" + +from gg_api_core.tools.assign_incident import AssignIncidentParams +from gg_api_core.tools.list_repo_occurrences import ListRepoOccurrencesParams +from gg_api_core.tools.list_users import ListUsersParams + + +class TestPydanticModelParsing: + """Test suite to verify Pydantic models can parse both expected and buggy formats.""" + + def test_list_repo_occurrences_parses_direct_params(self): + """Test that Pydantic model can parse direct parameters.""" + params = {"source_id": 9036019, "get_all": True} + + params_obj = ListRepoOccurrencesParams(**params) + assert params_obj.source_id == 9036019 + assert params_obj.get_all is True + + def test_list_users_parses_direct_params(self): + """Test that list_users Pydantic model parses parameters.""" + params = {"per_page": 50, "search": "test@example.com", "get_all": False} + + params_obj = ListUsersParams(**params) + assert params_obj.per_page == 50 + assert params_obj.search == "test@example.com" + assert params_obj.get_all is False + + def test_assign_incident_parses_direct_params(self): + """Test that assign_incident Pydantic model parses parameters.""" + params = {"incident_id": 234, "email": "toto@gg.com"} + + params_obj = AssignIncidentParams(**params) + assert params_obj.incident_id == 234 + assert params_obj.email == "toto@gg.com" + + +class TestMiddlewareParameterPreprocessing: + """Test suite for middleware parameter preprocessing.""" + + def test_middleware_converts_stringified_json_params(self): + """Test that middleware converts JSON strings to dicts.""" + import asyncio + + from gg_api_core.mcp_server import GitGuardianPATEnvMCP + + mcp = GitGuardianPATEnvMCP("test", personal_access_token="test_token") + + # Create a mock context with stringified JSON parameters that mirrors FastMCP structure + class MockMessage: + def __init__(self): + self.name = "list_repo_occurrences" + self.arguments = {"params": '{"source_id": 9036019, "get_all": true}'} + + class MockContext: + method = "tools/call" + + def __init__(self): + self.message = MockMessage() + self.params = {"arguments": {"params": '{"source_id": 9036019, "get_all": true}'}} + + async def mock_call_next(ctx): + # Verify both context.params and context.message.arguments were preprocessed + assert isinstance(ctx.params["arguments"]["params"], dict) + assert ctx.params["arguments"]["params"]["source_id"] == 9036019 + assert ctx.params["arguments"]["params"]["get_all"] is True + + # FastMCP uses context.message.arguments, so verify that's updated too + assert isinstance(ctx.message.arguments["params"], dict) + assert ctx.message.arguments["params"]["source_id"] == 9036019 + assert ctx.message.arguments["params"]["get_all"] is True + return "success" + + context = MockContext() + + # Run the middleware + result = asyncio.run(mcp._parameter_preprocessing_middleware(context, mock_call_next)) + assert result == "success" + + def test_middleware_preserves_dict_params(self): + """Test that middleware doesn't modify already-valid dict params.""" + import asyncio + + from gg_api_core.mcp_server import GitGuardianPATEnvMCP + + mcp = GitGuardianPATEnvMCP("test", personal_access_token="test_token") + + # Create a mock context with already-valid dict parameters + class MockContext: + method = "tools/call" + params = {"arguments": {"params": {"source_id": 9036019, "get_all": True}}} + + async def mock_call_next(ctx): + # Verify the params are still a dict + assert isinstance(ctx.params["arguments"]["params"], dict) + assert ctx.params["arguments"]["params"]["source_id"] == 9036019 + assert ctx.params["arguments"]["params"]["get_all"] is True + return "success" + + context = MockContext() + + # Run the middleware + result = asyncio.run(mcp._parameter_preprocessing_middleware(context, mock_call_next)) + assert result == "success" + + def test_middleware_ignores_non_tool_call_requests(self): + """Test that middleware only processes tools/call requests.""" + import asyncio + + from gg_api_core.mcp_server import GitGuardianPATEnvMCP + + mcp = GitGuardianPATEnvMCP("test", personal_access_token="test_token") + + # Create a mock context for tools/list + class MockContext: + method = "tools/list" + params = {} + + async def mock_call_next(ctx): + return "success" + + context = MockContext() + + # Run the middleware - should pass through without modification + result = asyncio.run(mcp._parameter_preprocessing_middleware(context, mock_call_next)) + assert result == "success" + + def test_middleware_handles_invalid_json_gracefully(self): + """Test that middleware doesn't crash on invalid JSON strings.""" + import asyncio + + from gg_api_core.mcp_server import GitGuardianPATEnvMCP + + mcp = GitGuardianPATEnvMCP("test", personal_access_token="test_token") + + # Create a mock context with invalid JSON + class MockContext: + method = "tools/call" + params = {"arguments": {"params": "{invalid json"}} + + async def mock_call_next(ctx): + # Invalid JSON should be left as-is + assert ctx.params["arguments"]["params"] == "{invalid json" + return "success" + + context = MockContext() + + # Run the middleware + result = asyncio.run(mcp._parameter_preprocessing_middleware(context, mock_call_next)) + assert result == "success"