Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions packages/api/src/client/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ import type {
modifyWatchlistAssetsResponse,
getTeamAllowanceResponse,
getPermissionsResponse,
createChatCompletionOpenAIResponse,
} from "../types";
import { LogLevel, type Logger, makeConsoleLogger, createFilteredLogger, noOpLogger } from "../logging";
import type { PaginatedResult, RequestOptions, ClientEventMap, ClientEventType, ClientEventHandler } from "./types";
Expand All @@ -106,12 +107,23 @@ import type { PaginatedResult, RequestOptions, ClientEventMap, ClientEventType,
*/
export interface AIInterface {
/**
* Creates a chat completion using Messari's AI
* Creates a chat completion using OpenAI's API
* @param params Parameters for the chat completion request
* @param options Optional request configuration
* @returns A promise resolving to the chat completion response
*/
createChatCompletion(params: createChatCompletionParameters, options?: RequestOptions): Promise<createChatCompletionResponse>;
createChatCompletion(params: Omit<createChatCompletionParameters, "stream">, options?: RequestOptions): Promise<createChatCompletionOpenAIResponse>;

/**
* Creates a streaming chat completion using OpenAI's API
* @param params Parameters for the chat completion request
* @param options Optional request configuration
* @returns A promise resolving to a readable stream of chat completion chunks
*/
createChatCompletionStream(
params: Omit<createChatCompletionParameters, "stream">,
options?: RequestOptions,
): Promise<ReadableStream<createChatCompletionOpenAIResponse>>;

/**
* Extracts entities from text content
Expand Down
228 changes: 207 additions & 21 deletions packages/api/src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
createChatCompletion,
extractEntities,
createChatCompletionOpenAI,
getNewsFeed,
getNewsSources,
getNewsFeedAssets,
Expand Down Expand Up @@ -146,6 +147,8 @@ import type {
getWatchlistResponse,
updateWatchlistParameters,
updateWatchlistResponse,
createChatCompletionOpenAIResponse,
createChatCompletionOpenAIParameters,
} from "../types";
import type { Agent } from "node:http";
import { pick } from "../utils";
Expand Down Expand Up @@ -324,27 +327,197 @@ export class MessariClient extends MessariClientBase {

// Check if the response is JSON or text based on Content-Type header
const contentType = response.headers.get("Content-Type");
let responseData: { data: T };

if (contentType?.toLowerCase().includes("application/json")) {
responseData = await response.json();
} else {
responseData = { data: await response.text() } as { data: T };
const jsonResponse = await response.json();
// If response has data field and no error, unwrap it, otherwise use the whole response
const data = jsonResponse.data && !jsonResponse.error ? jsonResponse.data : jsonResponse;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of parsing it into a data object and then unwrapping it later we just check if a data object already exists and there's no error and if so we return that raw object or if there is no data but also no error then we just parse the JSON response as is.

return data as T;
}

this.logger(LogLevel.DEBUG, "request success", { responseData });
const text = await response.text();
return text as T;
} catch (error) {
this.logger(LogLevel.ERROR, "request failed", { error });

// Emit response event
this.emit("response", {
method,
path,
status: response.status,
data: responseData,
// Emit error event
this.emit("error", {
error: error as Error,
request: {
method,
path,
queryParams,
},
});

throw error;
}
}

private async requestStream<T>({ method, path, body, queryParams = {}, options = {} }: RequestParameters): Promise<ReadableStream<T>> {
this.logger(LogLevel.DEBUG, "stream request start", {
method,
url: `${this.baseUrl}${path}`,
queryParams,
});

this.emit("request", {
method,
path,
queryParams,
});

const queryString = Object.entries(queryParams)
.filter(([_, value]) => value !== undefined)
.map(([key, value]) => {
if (Array.isArray(value)) {
return value.map((item) => `${encodeURIComponent(key)}=${encodeURIComponent(String(item))}`).join("&");
}
return `${encodeURIComponent(key)}=${encodeURIComponent(String(value))}`;
})
.join("&");

const url = `${this.baseUrl}${path}${queryString ? `?${queryString}` : ""}`;

const headers = {
...this.defaultHeaders,
...options.headers,
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
};

const timeoutMs = options.timeoutMs || this.timeoutMs;

try {
const response = await RequestTimeoutError.rejectAfterTimeout(
this.fetchFn(url, {
method,
headers,
body: body ? JSON.stringify(body) : undefined,
signal: options.signal,
cache: options.cache,
credentials: options.credentials,
integrity: options.integrity,
keepalive: options.keepalive,
mode: options.mode,
redirect: options.redirect,
referrer: options.referrer,
referrerPolicy: options.referrerPolicy,
// @ts-ignore - Next.js specific options
next: options.next,
// Node.js specific option
agent: this.agent,
}),
timeoutMs,
);

if (!response.ok) {
const errorData = await response.json();
this.logger(LogLevel.ERROR, "request error", {
status: response.status,
statusText: response.statusText,
error: errorData,
});

const error = new Error(errorData.error || "An error occurred");

this.emit("error", {
error,
request: {
method,
path,
queryParams,
},
});

throw error;
}

// For streaming responses, return a transformed stream that parses the chunks
if (!response.body) {
throw new Error("No reader available for streaming response");
}

let buffer = "";
const decoder = new TextDecoder();

// Create a TransformStream that will parse the raw bytes into the expected type T
const transformer = new TransformStream<Uint8Array, T>({
transform: async (chunk, controller) => {
try {
// Decode the chunk and add to buffer
const text = decoder.decode(chunk, { stream: true });
buffer += text;

// Process any complete lines in the buffer
const lines = buffer.split("\n");
// Keep the last potentially incomplete line in the buffer
buffer = lines.pop() || "";

for (const line of lines) {
if (line.startsWith("data: ")) {
const jsonData = line.slice(6).trim(); // Remove 'data: ' prefix

// Skip [DONE] marker
if (jsonData === "[DONE]") {
continue;
}

if (jsonData) {
try {
const parsed = JSON.parse(jsonData);
controller.enqueue(parsed as T);
} catch (e) {
this.logger(LogLevel.ERROR, "Error parsing JSON from stream", {
error: e,
data: jsonData,
});
}
}
} else if (line.trim() && !line.startsWith(":")) {
// Try to parse non-empty lines that aren't comments
try {
const parsed = JSON.parse(line);
controller.enqueue(parsed as T);
} catch (e) {
// Not JSON, might be part of a multi-line chunk
if (line.trim()) {
this.logger(LogLevel.DEBUG, "Non-JSON line in stream", { line });
}
}
}
}
} catch (error) {
this.logger(LogLevel.ERROR, "Error processing stream chunk", { error });
controller.error(error);
}
},
flush: (controller) => {
// Process any remaining data in the buffer
if (buffer.trim()) {
if (buffer.startsWith("data: ")) {
const jsonData = buffer.slice(6).trim();
if (jsonData && jsonData !== "[DONE]") {
try {
const parsed = JSON.parse(jsonData);
controller.enqueue(parsed as T);
} catch (e) {
this.logger(LogLevel.ERROR, "Error parsing final JSON from stream", {
error: e,
data: jsonData,
});
}
}
}
}
},
});

return responseData.data;
// Pipe the response body through our transformer
return response.body.pipeThrough(transformer);
} catch (error) {
this.logger(LogLevel.ERROR, "request failed", { error });
this.logger(LogLevel.ERROR, "stream request failed", { error });

// Emit error event
this.emit("error", {
Expand Down Expand Up @@ -453,10 +626,16 @@ export class MessariClient extends MessariClientBase {
data: responseData,
});

return {
data: responseData.data,
metadata: responseData.metadata,
};
// If response has data field, return wrapped format, otherwise treat whole response as data
return responseData.data !== undefined
? {
data: responseData.data,
metadata: responseData.metadata,
}
: {
data: responseData,
metadata: {} as M,
};
} catch (error) {
this.logger(LogLevel.ERROR, "request with metadata failed", { error });

Expand Down Expand Up @@ -677,10 +856,17 @@ export class MessariClient extends MessariClientBase {

public readonly ai: AIInterface = {
createChatCompletion: (params: createChatCompletionParameters, options?: RequestOptions) =>
this.request<createChatCompletionResponse>({
method: createChatCompletion.method,
path: createChatCompletion.path(),
body: pick(params, createChatCompletion.bodyParams),
this.request<createChatCompletionOpenAIResponse>({
method: createChatCompletionOpenAI.method,
path: createChatCompletionOpenAI.path(),
body: pick(params, createChatCompletionOpenAI.bodyParams) as createChatCompletionOpenAIParameters & { stream: false },
options,
}),
createChatCompletionStream: (params: createChatCompletionParameters, options?: RequestOptions) =>
this.requestStream<createChatCompletionOpenAIResponse>({
method: createChatCompletionOpenAI.method,
path: createChatCompletionOpenAI.path(),
body: { ...pick(params, createChatCompletionOpenAI.bodyParams), stream: true },
options,
}),
extractEntities: (params: extractEntitiesParameters, options?: RequestOptions) =>
Expand Down
15 changes: 15 additions & 0 deletions packages/api/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ export const extractEntities = {
} as const;


export type createChatCompletionOpenAIResponse = components['schemas']['ChatCompletionResponseOpenAI'];
export type createChatCompletionOpenAIError = components['schemas']['APIError'];

export type createChatCompletionOpenAIParameters = components['schemas']['ChatCompletionRequest'];


export const createChatCompletionOpenAI = {
method: 'POST' as const,
pathParams: [] as const,
queryParams: [] as const,
bodyParams: ['messages', 'verbosity', 'response_format', 'inline_citations', 'stream'] as const,
path: () => '/ai/openai/chat/completions'
} as const;


export type getAssetsV2Response = components['schemas']['V2AssetListItem'][];
export type getAssetsV2Error = components['schemas']['APIError'];

Expand Down
Loading