From 1405d67adf3c90958cde669a7a9cdacd92281d94 Mon Sep 17 00:00:00 2001 From: nishika26 Date: Thu, 12 Jun 2025 09:39:49 +0530 Subject: [PATCH 1/2] adding responses api to script --- bin/run-response.sh | 58 +++++++++ src/prompt/response.py | 278 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 336 insertions(+) create mode 100755 bin/run-response.sh create mode 100644 src/prompt/response.py diff --git a/bin/run-response.sh b/bin/run-response.sh new file mode 100755 index 0000000..865446a --- /dev/null +++ b/bin/run-response.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +ROOT=`git rev-parse --show-toplevel` + +export PYTHONPATH=$ROOT + +_repetition=5 +_default_model=gpt-4o-mini + +while getopts 'n:p:d:g:m:e:ch' option; do + case $option in + n) _repetition=$OPTARG ;; + p) _prompts=$OPTARG ;; + d) _documents=$OPTARG ;; + g) _gt=$OPTARG ;; + m) _models=( ${_models[@]} --model $OPTARG ) ;; + e) _extra=( ${_extra[@]} --extra-info $OPTARG ) ;; + h) + cat < Date: Thu, 3 Jul 2025 23:05:00 +0530 Subject: [PATCH 2/2] logging and function docs --- src/prompt/response.py | 263 ++++++++++++++++++++++++++++++----------- 1 file changed, 191 insertions(+), 72 deletions(-) diff --git a/src/prompt/response.py b/src/prompt/response.py index 0c6b77f..3e3cdca 100644 --- a/src/prompt/response.py +++ b/src/prompt/response.py @@ -15,18 +15,48 @@ from mylib import Logger, ExperimentResponse, FileIterator +from __future__ import annotations +from dataclasses import dataclass, astuple +from pathlib import Path +from multiprocessing import Queue, Pool +from argparse import ArgumentParser, Namespace +from uuid import uuid4 +import json +import time +from typing import Generator, Iterable, Any + +# You may need to import your actual Logger and OpenAI implementations +# from myproject.logging import Logger +# from myproject.openai_client import OpenAI, NotFoundError + @dataclass(frozen=True) class Resource: + """ + Container for created OpenAI resources. + """ response: str vector_store: str @dataclass(frozen=True) class Job: + """ + Job to process: a resource, model name, and the configuration dict. + """ resource: Resource model: str config: dict -def vs_ls(vector_store_id, client): +def vs_ls(vector_store_id: str, client: Any) -> Generator: + """ + Iterates over all files in a vector store, yielding each item. + + Args: + vector_store_id: ID of the vector store. + client: OpenAI client. + + Yields: + File entries from the vector store. + """ kwargs = {} while True: page = client.vector_stores.files.list( @@ -38,16 +68,39 @@ def vs_ls(vector_store_id, client): break kwargs['after'] = page.last_id -def scanp(config, root, ptype): - return (root - .joinpath(ptype, config[ptype]) - .read_text()) +def scanp(config: dict, root: Path, ptype: str) -> str: + """ + Reads text content of a file from a path determined by config and ptype. + + Args: + config: A dict mapping ptype to filename. + root: Base directory. + ptype: Subdirectory and config key. + + Returns: + The file's text content. + + Raises: + FileNotFoundError: If the expected file does not exist. + """ + file_path = root.joinpath(ptype, config[ptype]) + + if not file_path.is_file(): + raise FileNotFoundError( + f"File not found: {file_path}. " + f"Check that the config and paths are correct." + ) + + return file_path.read_text() class ResourceCleaner: - def __init__(self, resource): + """ + Base class to clean up a resource. + """ + def __init__(self, resource: str) -> None: self.resource = resource - def __call__(self, client, retries=1): + def __call__(self, client: Any, retries: int = 1) -> None: for _ in range(retries): try: self.clean(client) @@ -57,87 +110,110 @@ def __call__(self, client, retries=1): else: Logger.error('Cannot clean %s', type(self).__name__) - def clean(self, client): + def clean(self, client: Any) -> None: + """ + Implement cleanup logic in subclasses. + """ raise NotImplementedError() class VectorStoreCleaner(ResourceCleaner): - def clean(self, client): + """ + Deletes all files and the vector store itself. + """ + def clean(self, client: Any) -> None: for i in vs_ls(self.resource, client): client.files.delete(i.id) client.vector_stores.delete(self.resource) class ResponseCleaner(ResourceCleaner): - def clean(self, client): + """ + Deletes a response resource. + """ + def clean(self, client: Any) -> None: client.responses.delete(self.resource) class ResourceCreator: - def __init__(self, client, args): + """ + Base class to create a resource. + """ + def __init__(self, client: Any, args: Namespace) -> None: self.client = client self.args = args - def __call__(self, config, **kwargs): + def __call__(self, config: dict, **kwargs) -> str: handle = self.create(config, **kwargs) return handle.id - def create(self, config, **kwargs): + def create(self, config: dict, **kwargs) -> Any: raise NotImplementedError() class VectorStoreCreator(ResourceCreator): - def __init__(self, client, args): + """ + Creates a vector store and uploads files in batches. + """ + def __init__(self, client: Any, args: Namespace) -> None: super().__init__(client, args) self.ls = FileIterator(self.args.upload_batch_size) - def create(self, config, **kwargs): + def create(self, config: dict, **kwargs) -> Any: documents = self.args.document_root.joinpath(config['docs']) vector_store = self.client.vector_stores.create() for paths in self.ls(documents): Logger.info('Uploading %d', len(paths)) - files = [ x.open('rb') for x in paths ] - file_batch = (self - .client - .vector_stores - .file_batches.upload_and_poll( - vector_store_id=vector_store.id, - files=files, - )) + files = [x.open('rb') for x in paths] + file_batch = ( + self.client.vector_stores.file_batches.upload_and_poll( + vector_store_id=vector_store.id, + files=files, + ) + ) for i in files: i.close() self.raise_for_status(file_batch, vector_store, paths) return vector_store - def raise_for_status(self, response, vector_store, paths): + def raise_for_status(self, response: Any, vector_store: Any, paths: list[Path]) -> None: + """ + Verifies upload results, cleans up on failure. + """ assert response.file_counts.total == len(paths) if response.file_counts.completed != response.file_counts.total: - paths = { str(x.name): x for x in paths } + paths_map = {str(x.name): x for x in paths} for i in vs_ls(vector_store.id, self.client): if i.last_error is None: document = self.client.files.retrieve(i.id) - paths.pop(document.filename) - for i in paths.values(): + paths_map.pop(document.filename) + + for i in paths_map.values(): Logger.error('Upload error: %s', i) vector_store_cleaner = VectorStoreCleaner(vector_store.id) vector_store_cleaner(self.client, self.args.cleanup_attempts) - raise IndexError('Upload failure ({} of {}): {}'.format( - response.file_counts.failed, - response.file_counts.total, - ', '.join(map(str, paths.values())), - )) + raise IndexError( + 'Upload failure ({} of {}): {}'.format( + response.file_counts.failed, + response.file_counts.total, + ', '.join(map(str, paths_map.values())), + ) + ) class ResponseCreator(ResourceCreator): + """ + Creates a response using OpenAI client. + """ _kwargs = ( 'model', 'vector_store', 'question', ) - def create(self, config, **kwargs): + def create(self, config: dict, **kwargs) -> Any: model, vector_store_id, question = map(kwargs.get, self._kwargs) instructions = scanp(config, self.args.prompt_root, 'system') @@ -160,36 +236,46 @@ def create(self, config, **kwargs): @dataclass(frozen=True) class ResourceKey: + """ + Key to uniquely identify resource combinations. + """ docs: str model: str class OpenAIResources: + """ + Context manager to create and clean up resources. + """ _resources = ( (ResponseCreator, ResponseCleaner), (VectorStoreCreator, VectorStoreCleaner), ) - def __init__(self, args): + def __init__(self, args: Namespace) -> None: self.args = args - self.client = OpenAI() - self.resources = {} + self.resources: dict[ResourceKey, Resource] = {} + (self.r_creator, self.v_creator) = ( x(self.client, self.args) for (x, _) in self._resources ) - def __enter__(self): + def __enter__(self) -> OpenAIResources: self.resources.clear() return self - def __exit__(self, exc_type, exc_value, traceback): + def __exit__(self, exc_type, exc_value, traceback) -> None: + import operator as op cleaners = list(map(op.itemgetter(1), self._resources)) for resource in self.resources.values(): for (MyCleaner, r) in zip(cleaners, astuple(resource)): cleaner = MyCleaner(r) cleaner(self.client, self.args.cleanup_attempts) - def __call__(self, fp): + def __call__(self, fp: Iterable[str]) -> Generator[Job, None, None]: + """ + Reads configs line by line and yields jobs. + """ for line in fp: config = json.loads(line) docs = config['docs'] @@ -210,42 +296,75 @@ def __call__(self, fp): yield Job(resource, model, config) -def func(incoming, outgoing, session_id, args): +def func( + incoming: Queue, + outgoing: Queue, + session_id: str, + args: Namespace, +) -> None: + """ + Worker function to process jobs and send results. + """ import datetime client = OpenAI() creator = ResponseCreator(client, args) while True: - job = incoming.get() - Logger.info(job) - - question = scanp(job.config, args.prompt_root, 'user') - start = time.time() - - response = creator.create( - job.config, - model=job.model, - vector_store=job.resource.vector_store, - question=question, - ) - - latency = time.time() - start - - outgoing.put({ - "system": job.config["system"], - "user": job.config["user"], - "docs": job.config["docs"], - "sequence": job.config.get("sequence", 0), - "response": [ - { - "message": response.output_text, - "model": job.model, - "latency": latency, - "response_id": response.id, - "date": datetime.datetime.now().ctime() - } - ] - }) + try: + job: Job = incoming.get() + Logger.info("Processing job: %s", job) + + question = scanp(job.config, args.prompt_root, 'user') + start = time.time() + + response = creator.create( + job.config, + model=job.model, + vector_store=job.resource.vector_store, + question=question, + ) + + latency = time.time() - start + + outgoing.put({ + "system": job.config["system"], + "user": job.config["user"], + "docs": job.config["docs"], + "sequence": job.config.get("sequence", 0), + "response": [ + { + "message": response.output_text, + "model": job.model, + "latency": latency, + "response_id": response.id, + "date": datetime.datetime.now().ctime(), + } + ], + }) + + except Exception as e: + Logger.error( + "Error while creating responses using Responses API: %s | Exception: %s", + job if 'job' in locals() else 'Unknown job', + str(e), + exc_info=True + ) + outgoing.put({ + "system": job.config["system"] if 'job' in locals() else "unknown", + "user": job.config["user"] if 'job' in locals() else "unknown", + "docs": job.config["docs"] if 'job' in locals() else "unknown", + "sequence": job.config.get("sequence", 0) if 'job' in locals() else 0, + "response": [ + { + "message": f"Error: {str(e)}", + "model": job.model if 'job' in locals() else "unknown", + "latency": None, + "response_id": None, + "date": datetime.datetime.now().ctime(), + } + ], + }) + if __name__ == '__main__': arguments = ArgumentParser() @@ -257,8 +376,8 @@ def func(incoming, outgoing, session_id, args): arguments.add_argument('--workers', type=int) args = arguments.parse_args() - incoming = Queue() - outgoing = Queue() + incoming: Queue = Queue() + outgoing: Queue = Queue() initargs = ( incoming, outgoing,