Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from gg_api_core.mcp_server import AbstractGitGuardianFastMCP
from gg_api_core.schema_utils import compress_pydantic_model_schema
from gg_api_core.tools.find_current_source_id import find_current_source_id
from gg_api_core.tools.generate_honey_token import generate_honeytoken
from gg_api_core.tools.list_honeytokens import list_honeytokens
Expand Down Expand Up @@ -45,15 +46,16 @@


def register_developer_tools(mcp: AbstractGitGuardianFastMCP):
mcp.tool(
remediate_tool = mcp.tool(
remediate_secret_incidents,
description="Find and fix secrets in the current repository using exact match locations (file paths, line numbers, character indices). "
"This tool leverages the occurrences API to provide precise remediation instructions without needing to search for secrets in files. "
"By default, this only shows incidents assigned to the current user. Pass mine=False to get all incidents related to this repo.",
required_scopes=["incidents:read", "sources:read"],
)
remediate_tool.parameters = compress_pydantic_model_schema(remediate_tool.parameters)

mcp.tool(
scan_tool = mcp.tool(
scan_secrets,
description="""
Scan multiple content items for secrets and policy breaks.
Expand All @@ -65,44 +67,51 @@ def register_developer_tools(mcp: AbstractGitGuardianFastMCP):
""",
required_scopes=["scan"],
)
scan_tool.parameters = compress_pydantic_model_schema(scan_tool.parameters)

mcp.tool(
list_incidents_tool = mcp.tool(
list_incidents,
description="List secret incidents or occurrences related to a specific repository"
"With mine=True, this tool only shows incidents assigned to the current user.",
required_scopes=["incidents:read", "sources:read"],
)
list_incidents_tool.parameters = compress_pydantic_model_schema(list_incidents_tool.parameters)

mcp.tool(
list_repo_occurrences_tool = mcp.tool(
list_repo_occurrences,
description="List secret occurrences for a specific repository with exact match locations. "
"Returns detailed occurrence data including file paths, line numbers, and character indices where secrets were detected. "
"Use this tool when you need to locate and remediate secrets in the codebase with precise file locations.",
required_scopes=["incidents:read"],
)
list_repo_occurrences_tool.parameters = compress_pydantic_model_schema(list_repo_occurrences_tool.parameters)

mcp.tool(
find_source_tool = mcp.tool(
find_current_source_id,
description="Find the GitGuardian source_id for the current repository. "
"This tool automatically detects the current git repository and searches for its source_id in GitGuardian. "
"Useful when you need to reference the repository in other API calls.",
required_scopes=["sources:read"],
)
find_source_tool.parameters = compress_pydantic_model_schema(find_source_tool.parameters)

mcp.tool(
generate_honeytoken_tool = mcp.tool(
generate_honeytoken,
description="Generate an AWS GitGuardian honeytoken and get injection recommendations",
required_scopes=["honeytokens:write"],
)
generate_honeytoken_tool.parameters = compress_pydantic_model_schema(generate_honeytoken_tool.parameters)

mcp.tool(
list_honeytokens_tool = mcp.tool(
list_honeytokens,
description="List honeytokens from the GitGuardian dashboard with filtering options",
required_scopes=["honeytokens:read"],
)
list_honeytokens_tool.parameters = compress_pydantic_model_schema(list_honeytokens_tool.parameters)

mcp.tool(
list_users_tool = mcp.tool(
list_users,
description="List users on the workspace/account",
required_scopes=["members:read"],
)
list_users_tool.parameters = compress_pydantic_model_schema(list_users_tool.parameters)
1 change: 1 addition & 0 deletions packages/gg_api_core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"python-dotenv>=1.0.0",
"pydantic-settings>=2.0.0",
"jinja2>=3.1.0",
"jsonref>=1.1.0",
]

[project.optional-dependencies]
Expand Down
100 changes: 98 additions & 2 deletions packages/gg_api_core/src/gg_api_core/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
from enum import Enum
from typing import Any
from typing import Any, Callable

from fastmcp import FastMCP
from fastmcp.exceptions import FastMCPError, ValidationError
from fastmcp.server.dependencies import get_http_headers
from fastmcp.server.middleware import Middleware
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.tools import Tool
from mcp.types import Tool as MCPTool

Expand Down Expand Up @@ -141,6 +141,8 @@ def __init__(self, *args, default_scopes: list[str] | None = None, **kwargs):
# Map each tool to its required scopes (instance attribute)
self._tool_scopes: dict[str, set[str]] = {}

# Add middleware for parameter preprocessing (must be first to preprocess before validation)
self.add_middleware(self._parameter_preprocessing_middleware)
self.add_middleware(ScopeFilteringMiddleware(self))

def clear_cache(self) -> None:
Expand Down Expand Up @@ -248,6 +250,100 @@ async def get_scopes(self) -> set[str]:
logger.debug(f"scopes: {scopes}")
return scopes

async def _parameter_preprocessing_middleware(self, context: MiddlewareContext, call_next: Callable) -> Any:
"""Middleware to preprocess tool parameters to handle Claude Code bug.

Claude Code has a bug where it serializes Pydantic model parameters as JSON strings
instead of proper dictionaries. This middleware intercepts tools/call requests and
converts stringified JSON parameters back to dictionaries before validation.

See: https://github.com/anthropics/claude-code/issues/3084
"""
import json

# Only apply to tools/call requests
if context.method != "tools/call":
return await call_next(context)

# Check if we have arguments to preprocess
if not hasattr(context, "params") or not context.params:
return await call_next(context)

params = context.params
arguments = params.get("arguments", {})

# Log what we received for debugging
logger.debug(f"Middleware received arguments: {arguments} (type: {type(arguments)})")

# If arguments is empty or not a dict, nothing to preprocess
if not isinstance(arguments, dict):
logger.debug("Arguments is not a dict, skipping preprocessing")
return await call_next(context)

# Look for stringified JSON in parameter values
preprocessed_arguments = {}
modified = False

for key, value in arguments.items():
logger.debug(f"Processing parameter '{key}': {repr(value)} (type: {type(value)})")
if isinstance(value, str) and value.strip().startswith("{"):
# Looks like stringified JSON, try to parse it
try:
parsed = json.loads(value)
if isinstance(parsed, dict):
logger.info(f"Preprocessed parameter '{key}': converted JSON string to dict")
preprocessed_arguments[key] = parsed
modified = True
else:
preprocessed_arguments[key] = value
except (json.JSONDecodeError, ValueError) as e:
# Not valid JSON, keep original value
logger.debug(f"Failed to parse '{key}' as JSON: {e}")
preprocessed_arguments[key] = value
else:
preprocessed_arguments[key] = value

# Update context with preprocessed arguments if we modified anything
if modified:
context.params["arguments"] = preprocessed_arguments
# Also update context.message.arguments which FastMCP uses
if hasattr(context, "message") and hasattr(context.message, "arguments"):
context.message.arguments = preprocessed_arguments
logger.debug(f"Updated arguments: {preprocessed_arguments}")

return await call_next(context)

async def _scope_filtering_middleware(self, context: MiddlewareContext, call_next: Callable) -> Any:
"""Middleware to filter tools based on token scopes.

This middleware intercepts tools/list requests and filters the tools
based on the user's API token scopes.

The authentication strategy determines whether to use cached scopes
or fetch them per-request.
"""
# Only apply filtering to tools/list requests
if context.method != "tools/list":
return await call_next(context)

# Get all tools from the next middleware/handler
all_tools = await call_next(context)

# Filter tools by scopes
scopes = await self.get_scopes()
filtered_tools = []
for tool in all_tools:
tool_name = tool.name
required_scopes = self._tool_scopes.get(tool_name, set())

if not required_scopes or required_scopes.issubset(scopes):
filtered_tools.append(tool)
else:
missing_scopes = required_scopes - scopes
logger.info(f"Removing tool '{tool_name}' due to missing scopes: {', '.join(missing_scopes)}")

return filtered_tools

async def list_tools(self) -> list[MCPTool]:
"""
Public method to list tools (for compatibility with tests and external code).
Expand Down
60 changes: 60 additions & 0 deletions packages/gg_api_core/src/gg_api_core/schema_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Utility functions for MCP tool schema manipulation."""

import json
import logging

import jsonref

logger = logging.getLogger(__name__)


def compress_pydantic_model_schema(tool_parameters: dict) -> dict:
"""
Compress a tool schema by flattening nested Pydantic model parameters.
When FastMCP tools use a single Pydantic model as a parameter (e.g., `params: MyParamsModel`),
the generated schema nests all the model's fields under that parameter name. This creates
an extra level of nesting that some MCP clients (like Claude Code) handle incorrectly.
This function flattens the schema by:
1. Resolving all JSON references
2. Extracting the properties from the nested model
3. Promoting them to top-level parameters
Example:
Before compression:
{
"type": "object",
"properties": {
"params": {
"type": "object",
"properties": {
"source_id": {"type": "integer"},
"get_all": {"type": "boolean"}
}
}
}
}
After compression:
{
"type": "object",
"properties": {
"source_id": {"type": "integer"},
"get_all": {"type": "boolean"}
}
}
Args:
tool_parameters: The tool's parameters schema (from tool.parameters)
Returns:
Compressed schema with flattened parameters
"""
try:
# Resolve all JSON references
resolved = jsonref.replace_refs(tool_parameters)

# Convert back to plain dict (jsonref returns a special JsonRef object)
compressed = json.loads(jsonref.dumps(resolved))

logger.debug(f"Compressed tool schema: {json.dumps(compressed, indent=2)}")
return compressed
except Exception as e:
logger.exception(f"Failed to compress schema: {str(e)}")
# Return original schema if compression fails
return tool_parameters
30 changes: 21 additions & 9 deletions packages/secops_mcp_server/src/secops_mcp_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from developer_mcp_server.register_tools import register_developer_tools
from fastmcp.exceptions import ToolError
from gg_api_core.mcp_server import get_mcp_server, register_common_tools
from gg_api_core.schema_utils import compress_pydantic_model_schema
from gg_api_core.scopes import set_secops_scopes
from gg_api_core.tools.assign_incident import assign_incident
from gg_api_core.tools.create_code_fix_request import create_code_fix_request
Expand Down Expand Up @@ -150,59 +151,70 @@ async def get_current_token_info() -> dict[str, Any]:
raise ToolError(f"Error: {str(e)}")


mcp.tool(
# Register SecOps tools with schema compression for Pydantic model parameters

update_tags_tool = mcp.tool(
update_or_create_incident_custom_tags,
description="Update or create custom tags for a secret incident",
required_scopes=["incidents:write", "custom_tags:write"],
)
update_tags_tool.parameters = compress_pydantic_model_schema(update_tags_tool.parameters)

mcp.tool(
update_status_tool = mcp.tool(
update_incident_status,
description="Update a secret incident with status",
required_scopes=["incidents:write"],
)
update_status_tool.parameters = compress_pydantic_model_schema(update_status_tool.parameters)

mcp.tool(
read_tags_tool = mcp.tool(
read_custom_tags,
description="Read custom tags from the GitGuardian dashboard.",
required_scopes=["custom_tags:read"],
)
read_tags_tool.parameters = compress_pydantic_model_schema(read_tags_tool.parameters)

mcp.tool(
write_tags_tool = mcp.tool(
write_custom_tags,
description="Create or delete custom tags in the GitGuardian dashboard.",
required_scopes=["custom_tags:write"],
)
write_tags_tool.parameters = compress_pydantic_model_schema(write_tags_tool.parameters)

mcp.tool(
manage_incident_tool = mcp.tool(
manage_private_incident,
description="Manage a secret incident (assign, unassign, resolve, ignore, reopen)",
required_scopes=["incidents:write"],
)
manage_incident_tool.parameters = compress_pydantic_model_schema(manage_incident_tool.parameters)

mcp.tool(
list_users_tool = mcp.tool(
list_users,
description="List users on the workspace/account",
required_scopes=["members:read"],
)
list_users_tool.parameters = compress_pydantic_model_schema(list_users_tool.parameters)

mcp.tool(
revoke_secret_tool = mcp.tool(
revoke_secret,
description="Revoke a secret by its ID through the GitGuardian API",
required_scopes=["write:secret"],
)
revoke_secret_tool.parameters = compress_pydantic_model_schema(revoke_secret_tool.parameters)

mcp.tool(
assign_incident_tool = mcp.tool(
assign_incident,
description="Assign a secret incident to a specific member or to the current user",
required_scopes=["incidents:write"],
)
assign_incident_tool.parameters = compress_pydantic_model_schema(assign_incident_tool.parameters)

mcp.tool(
create_fix_tool = mcp.tool(
create_code_fix_request,
description="Create code fix requests for multiple secret incidents with their locations. This will generate pull requests to automatically remediate the detected secrets.",
required_scopes=["incidents:write"],
)
create_fix_tool.parameters = compress_pydantic_model_schema(create_fix_tool.parameters)

register_common_tools(mcp)

Expand Down
3 changes: 3 additions & 0 deletions scripts/call_mcp_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ async def main():
tools = await client.list_tools()
print(tools)

users = await client.call_tool("list_users", {"params": {}})
print(users)


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading