Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion common/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ packaging==24.2
pandas==2.2.3
#pathtools==0.1.2
pillow==11.2.1
PyMuPDF==1.26.4
PyMuPDF==1.26.6
pymupdf4llm==0.2.0
platformdirs==4.3.8
pluggy==1.6.0
prometheus_client==0.22.1
Expand Down
151 changes: 151 additions & 0 deletions common/utils/graph_locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""
Graph-level locking mechanism to prevent concurrent operations on the same graph.
Uses threading.Lock for sync operations and asyncio.Lock for async rebuild operations.
"""
import asyncio
import threading
import logging
from typing import Dict, Optional
from fastapi import HTTPException

logger = logging.getLogger(__name__)

# Module-level lock management
_graph_locks: Dict[str, threading.Lock] = {}
_locks_dict_lock = threading.Lock()

# Global rebuild lock (only one rebuild at a time across all graphs)
# Use asyncio.Lock for async operations
_rebuild_lock: Optional[asyncio.Lock] = None
_currently_rebuilding_graph: str = None
_rebuild_graph_lock = threading.Lock() # Protects _currently_rebuilding_graph


def _get_rebuild_lock() -> asyncio.Lock:
"""Get or create the async rebuild lock."""
global _rebuild_lock
if _rebuild_lock is None:
_rebuild_lock = asyncio.Lock()
return _rebuild_lock


def get_graph_lock(graphname: str) -> threading.Lock:
"""Get or create a lock for a specific graph."""
with _locks_dict_lock:
if graphname not in _graph_locks:
_graph_locks[graphname] = threading.Lock()
logger.debug(f"Created new lock for graph: {graphname}")
return _graph_locks[graphname]


def acquire_graph_lock(graphname: str, operation: str = "operation") -> bool:
"""
Try to acquire lock for a graph. Returns True if acquired, False if already locked.

Args:
graphname: Name of the graph to lock
operation: Description of the operation (for logging)
"""
lock = get_graph_lock(graphname)
acquired = lock.acquire(blocking=False)

if acquired:
logger.info(f"Lock acquired for graph '{graphname}' - {operation}")
else:
logger.warning(f"Lock already held for graph '{graphname}' - {operation} blocked")

return acquired


def release_graph_lock(graphname: str, operation: str = "operation"):
"""
Release the lock for a graph.

Args:
graphname: Name of the graph to unlock
operation: Description of the operation (for logging)
"""
lock = get_graph_lock(graphname)
if lock.locked():
lock.release()
logger.info(f"Lock released for graph '{graphname}' - {operation} completed")


def raise_if_locked(graphname: str, operation: str = "operation"):
"""
Try to acquire lock or raise HTTPException with 409 Conflict status.
Used for FastAPI endpoints.

Args:
graphname: Name of the graph to lock
operation: Description of the operation

Raises:
HTTPException: 409 Conflict if lock is already held
"""
if not acquire_graph_lock(graphname, operation):
raise HTTPException(
status_code=409,
detail=f"Another operation is already in progress for graph '{graphname}'. Please wait and try again."
)

# =====================================================
# Global Rebuild Lock Functions
# =====================================================

async def acquire_rebuild_lock(graphname: str) -> bool:
"""
Try to acquire the global rebuild lock (only one rebuild at a time across all graphs).
Returns True if acquired immediately, False if another rebuild is in progress.
Non-blocking operation - returns instantly.

Args:
graphname: Name of the graph requesting rebuild
"""
global _currently_rebuilding_graph

lock = _get_rebuild_lock()

# Non-blocking check: return immediately if lock is busy
if lock.locked():
logger.warning(f"Rebuild lock busy - another graph is rebuilding. Request from: {graphname}")
return False

# Try to acquire the lock (should be instant since we checked it's not locked)
try:
await asyncio.wait_for(lock.acquire(), timeout=0.01) # 10ms safety timeout
with _rebuild_graph_lock:
_currently_rebuilding_graph = graphname
logger.info(f"Global rebuild lock acquired for graph: {graphname}")
return True
except asyncio.TimeoutError:
# Race condition: lock was acquired between check and acquire
logger.warning(f"Rebuild lock busy - another graph is rebuilding. Request from: {graphname}")
return False


def release_rebuild_lock(graphname: str):
"""
Release the global rebuild lock.

Args:
graphname: Name of the graph releasing rebuild lock
"""
global _currently_rebuilding_graph

lock = _get_rebuild_lock()
if lock.locked():
with _rebuild_graph_lock:
_currently_rebuilding_graph = None
lock.release()
logger.info(f"Global rebuild lock released for graph: {graphname}")


def get_rebuilding_graph() -> str:
"""
Get the name of the graph currently being rebuilt.
Returns None if no rebuild is in progress.
"""
with _rebuild_graph_lock:
return _currently_rebuilding_graph

166 changes: 32 additions & 134 deletions common/utils/image_data_extractor.py
Original file line number Diff line number Diff line change
@@ -1,165 +1,63 @@
import base64
import io
import logging
import os
import uuid
import hashlib
from pathlib import Path
from langchain_core.messages import HumanMessage, SystemMessage

from common.config import get_multimodal_service

logger = logging.getLogger(__name__)



def describe_image_with_llm(image_input):
def describe_image_with_llm(file_path):
"""
Send image (pixmap or PIL image) to LLM vision model and return description.
Uses multimodal_service from config if available, otherwise falls back to completion_service.
Currently supports: OpenAI, Azure OpenAI, Google GenAI, and Google VertexAI
Read image file and convert to base64 to send to LLM.
"""
try:
from PIL import Image as PILImage

client = get_multimodal_service()
if not client:
return "[Image: Failed to create multimodal LLM client]"


# Read image and convert to base64
pil_image = PILImage.open(file_path)
buffer = io.BytesIO()
# Convert to RGB if needed for better compatibility
if image_input.mode != 'RGB':
image_input = image_input.convert('RGB')
image_input.save(buffer, format="JPEG", quality=95)
b64_img = base64.b64encode(buffer.getvalue()).decode("utf-8")
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
pil_image.save(buffer, format="JPEG", quality=95)
image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')

# Build messages (system + human)
messages = [
SystemMessage(
content="You are a helpful assistant that describes images concisely for document analysis."
),
HumanMessage(
content=[
{
"type": "text",
"text": (
"Please describe what you see in this image and "
"if the image has scanned text then extract all the text. "
"if the image has any logo, icon, or branding element, try to describe it with text. "
"Focus on any text, diagrams, charts, or other visual elements."
"If the image is purely a logo, icon, or branding element, start your response with 'LOGO:' or 'ICON:'."
),
},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{b64_img}"},
},
]
),
SystemMessage(
content="You are a helpful assistant that describes images concisely for document analysis."
),
HumanMessage(
content=[
{
"type": "text",
"text": (
"Please describe what you see in this image and "
"if the image has scanned text then extract all the text. "
"If the image has any graph, chart, table, or other diagram, describe it. "
"If the image has any logo, identify and describe the logo."
),
},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_base64}"},
},
],
),
]

# Get response from LangChain LLM client
# Access the underlying LangChain client
langchain_client = client.llm
response = langchain_client.invoke(messages)

return response.content if hasattr(response, 'content') else str(response)
return response.content if hasattr(response, "content") else str(response)

except Exception as e:
logger.error(f"Failed to describe image with LLM: {str(e)}")
return "[Image: Error processing image description]"


def save_image_and_get_markdown(image_input, context_info="", graphname=None):
"""
Save image locally to static/images/ folder and return markdown reference with description.

LEGACY/OLD APPROACH: Used for backward compatibility with JSONL-based loading.
Images are saved as files and served via /ui/images/ endpoint with img:// protocol.

For NEW direct loading approach, images are stored in Image vertex as base64
and served via /ui/image_vertex/ endpoint with image:// protocol.

Args:
image_input: PIL Image object
context_info: Optional context (e.g., "page 3 of invoice.pdf")
graphname: Graph name to organize images by graph (optional)

Returns:
dict with:
- 'markdown': Markdown string with img:// reference
- 'image_id': Unique identifier for the saved image
- 'image_path': Path where image was saved to static/images/
"""
try:
# FIRST: Get description from LLM to check if it's a logo
description = describe_image_with_llm(image_input)

# Check if the image is a logo, icon, or decorative element BEFORE saving
# These should be filtered out as they're not content-relevant
description_lower = description.lower()
logo_indicators = ['logo', 'icon', 'branding', 'watermark', 'trademark', 'company logo', 'brand logo']

if any(indicator in description_lower for indicator in logo_indicators):
logger.info(f"Detected logo/icon in image, skipping: {description[:100]}")
return None

# If not a logo, proceed with saving the image
# Generate unique image ID using hash of image content
buffer = io.BytesIO()
if image_input.mode != 'RGB':
image_input = image_input.convert('RGB')
image_input.save(buffer, format="JPEG", quality=95)
image_bytes = buffer.getvalue()

# Create hash-based ID (deterministic for same image)
image_hash = hashlib.sha256(image_bytes).hexdigest()[:16]
image_id = f"{image_hash}.jpg"

# Save image to local storage directory organized by graphname
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

# If graphname is provided, organize images by graph
if graphname:
images_dir = os.path.join(project_root, "static", "images", graphname)
# Include graphname in the image reference for URL construction
image_reference = f"{graphname}/{image_id}"
else:
images_dir = os.path.join(project_root, "static", "images")
image_reference = image_id

os.makedirs(images_dir, exist_ok=True)

image_path = os.path.join(images_dir, image_id)

# Save image file (skip if already exists with same hash)
if not os.path.exists(image_path):
with open(image_path, 'wb') as f:
f.write(image_bytes)
logger.info(f"Saved content image to: {image_path}")
else:
logger.debug(f"Image already exists: {image_path}")

# Generate markdown with custom img:// protocol (will be replaced later)
# Format: ![description](img://graphname/image_id) or ![description](img://image_id)
markdown = f"![{description}](img://{image_reference})"

logger.info(f"Created image reference: {image_reference} with description")

return {
'markdown': markdown,
'image_id': image_reference,
'image_path': image_path,
'description': description
}

except Exception as e:
logger.error(f"Failed to save image and generate markdown: {str(e)}")
# Fallback to text description only
fallback_desc = f"[Image: {context_info} - processing failed]"
return {
'markdown': fallback_desc,
'image_id': None,
'image_path': None,
'description': fallback_desc
}


Loading
Loading