From 5957137e522634b3acbe80c0aae40ed3d8b01e6e Mon Sep 17 00:00:00 2001 From: Pablo Maldonado Date: Thu, 4 Dec 2025 16:32:42 +0000 Subject: [PATCH 1/5] feat: use subgraph for events --- .../MonitorProposalsOrderBook.ts | 5 +- .../src/monitor-polymarket/common.ts | 256 +++++++++++++++++- 2 files changed, 259 insertions(+), 2 deletions(-) diff --git a/packages/monitor-v2/src/monitor-polymarket/MonitorProposalsOrderBook.ts b/packages/monitor-v2/src/monitor-polymarket/MonitorProposalsOrderBook.ts index 429bd29984..4b4252ce4a 100644 --- a/packages/monitor-v2/src/monitor-polymarket/MonitorProposalsOrderBook.ts +++ b/packages/monitor-v2/src/monitor-polymarket/MonitorProposalsOrderBook.ts @@ -324,5 +324,8 @@ export async function monitorTransactionsProposedOrderBook( }) ); - console.log("All proposals have been checked!"); + Logger.debug({ + at: "PolymarketMonitor", + message: `All ${allProposals.length} proposals have been checked!`, + }); } diff --git a/packages/monitor-v2/src/monitor-polymarket/common.ts b/packages/monitor-v2/src/monitor-polymarket/common.ts index e9685d5ede..6ab5ce1574 100644 --- a/packages/monitor-v2/src/monitor-polymarket/common.ts +++ b/packages/monitor-v2/src/monitor-polymarket/common.ts @@ -76,9 +76,11 @@ export interface MonitoringParams { fillEventsProposalGapSeconds: number; httpClient: ReturnType; orderBookBatchSize: number; + orderBookSubgraphEndpoint: string; ooV2Addresses: string[]; ooV1Addresses: string[]; aiConfig?: AIConfig; + subgraphSyncTolerance: number; } interface PolymarketMarketGraphql { question: string; @@ -220,6 +222,7 @@ export const getPolymarketProposedPriceRequestsOO = async ( proposeEvents .filter((event) => requesterAddresses.map((r) => r.toLowerCase()).includes(event.args.requester.toLowerCase())) .filter((event) => { + return currentTimeBN.lt(event.args.expirationTimestamp); const expirationTime = event.args.expirationTimestamp; const thresholdTime = expirationTime.sub(threshold); // Only keep if current time is greater than (expiration - threshold) but less than expiration. @@ -374,6 +377,38 @@ export const getPolymarketMarketInformation = async ( }); }; +interface OrderFilledEventSubgraph { + id: string; + transactionHash: string; + makerAssetId: string; + takerAssetId: string; + maker: string; + taker: string; + makerAmountFilled: string; + takerAmountFilled: string; + fee: string; + timestamp: string; + orderHash: string; +} + +interface SubgraphOrderFilledResponse { + data?: { + orderFilledEvents: OrderFilledEventSubgraph[]; + }; + errors?: { message: string }[]; +} + +interface SubgraphMetaResponse { + data?: { + _meta: { + block: { + number: number; + }; + }; + }; + errors?: { message: string }[]; +} + const getTradeInfoFromOrderFilledEvent = async ( provider: Provider, event: any @@ -392,7 +427,186 @@ const getTradeInfoFromOrderFilledEvent = async ( }; }; -export const getOrderFilledEvents = async ( +const getTradeInfoFromSubgraphEvent = (event: OrderFilledEventSubgraph): PolymarketTradeInformation => { + const isBuy = event.makerAssetId === "0"; + const makerAmountFilled = BigNumber.from(event.makerAmountFilled); + const takerAmountFilled = BigNumber.from(event.takerAmountFilled); + + const numerator = (isBuy ? makerAmountFilled : takerAmountFilled).mul(1000); + const denominator = isBuy ? takerAmountFilled : makerAmountFilled; + const price = numerator.div(denominator).toNumber() / 1000; + + return { + price, + type: isBuy ? "buy" : "sell", + timestamp: parseInt(event.timestamp), + // Convert to decimal value with 2 decimals + amount: (isBuy ? takerAmountFilled : makerAmountFilled).div(10_000).toNumber() / 100, + }; +}; + +const querySubgraphOrderFilledEvents = async ( + httpClient: AxiosInstance, + subgraphEndpoint: string, + whereField: "takerAssetId" | "makerAssetId", + assetId: string, + pageSize = 1000, + startTimestamp?: number +): Promise => { + const allEvents: OrderFilledEventSubgraph[] = []; + let skip = 0; + let hasMore = true; + + while (hasMore) { + // Build where clause with optional timestamp filter + const whereClause = startTimestamp + ? `{timestamp_gt: ${startTimestamp}, ${whereField}: "${assetId}"}` + : `{${whereField}: "${assetId}"}`; + + const query = ` + { + orderFilledEvents( + where: ${whereClause}, + first: ${pageSize}, + skip: ${skip}, + orderBy: timestamp, + orderDirection: asc + ) { + id + transactionHash + makerAssetId + takerAssetId + maker + taker + makerAmountFilled + takerAmountFilled + fee + timestamp + orderHash + } + } + `; + + const response = await httpClient.post(subgraphEndpoint, { query }); + + if (response.data.errors?.length) { + throw new Error(response.data.errors.map((e) => e.message).join("; ")); + } + + if (!response.data.data?.orderFilledEvents) { + throw new Error("Invalid response from subgraph"); + } + + const events = response.data.data.orderFilledEvents; + allEvents.push(...events); + + // If we got fewer events than pageSize, we've reached the end + hasMore = events.length === pageSize; + skip += pageSize; + } + + return allEvents; +}; + +const getOrderFilledEventsFromSubgraph = async ( + params: MonitoringParams, + clobTokenIds: [string, string], + startTimestamp?: number +): Promise<[PolymarketTradeInformation[], PolymarketTradeInformation[]]> => { + // Query 4 combinations: takerAssetId for both tokens, makerAssetId for both tokens + const queries = [ + { whereField: "takerAssetId" as const, assetId: clobTokenIds[0], tokenIndex: 0 }, + { whereField: "takerAssetId" as const, assetId: clobTokenIds[1], tokenIndex: 1 }, + { whereField: "makerAssetId" as const, assetId: clobTokenIds[0], tokenIndex: 0 }, + { whereField: "makerAssetId" as const, assetId: clobTokenIds[1], tokenIndex: 1 }, + ]; + + // Execute all queries in parallel + const queryResults = await Promise.all( + queries.map((q) => + querySubgraphOrderFilledEvents( + params.httpClient, + params.orderBookSubgraphEndpoint, + q.whereField, + q.assetId, + 1000, + startTimestamp + ) + ) + ); + + // Group events by token index, deduplicating per token (same event can appear for both tokens) + const tokenOneEventIds = new Set(); + const tokenTwoEventIds = new Set(); + const tokenOneEvents: PolymarketTradeInformation[] = []; + const tokenTwoEvents: PolymarketTradeInformation[] = []; + + // Process takerAssetId queries (index 0 and 1) + queryResults[0].forEach((event) => { + if (!tokenOneEventIds.has(event.id)) { + tokenOneEventIds.add(event.id); + tokenOneEvents.push(getTradeInfoFromSubgraphEvent(event)); + } + }); + queryResults[1].forEach((event) => { + if (!tokenTwoEventIds.has(event.id)) { + tokenTwoEventIds.add(event.id); + tokenTwoEvents.push(getTradeInfoFromSubgraphEvent(event)); + } + }); + + // Process makerAssetId queries (index 2 and 3) + queryResults[2].forEach((event) => { + if (!tokenOneEventIds.has(event.id)) { + tokenOneEventIds.add(event.id); + tokenOneEvents.push(getTradeInfoFromSubgraphEvent(event)); + } + }); + queryResults[3].forEach((event) => { + if (!tokenTwoEventIds.has(event.id)) { + tokenTwoEventIds.add(event.id); + tokenTwoEvents.push(getTradeInfoFromSubgraphEvent(event)); + } + }); + + // Sort by timestamp + const sortByTimestamp = (events: PolymarketTradeInformation[]): PolymarketTradeInformation[] => { + return events.sort((a, b) => a.timestamp - b.timestamp); + }; + + return [sortByTimestamp(tokenOneEvents), sortByTimestamp(tokenTwoEvents)]; +}; + +const checkSubgraphSyncStatus = async (httpClient: AxiosInstance, subgraphEndpoint: string): Promise => { + const query = ` + { + _meta { + block { + number + } + } + } + `; + + try { + const response = await httpClient.post(subgraphEndpoint, { query }); + + if (response.data.errors?.length) { + throw new Error(response.data.errors.map((e) => e.message).join("; ")); + } + + if (!response.data.data?._meta?.block?.number) { + throw new Error("Invalid response from subgraph meta query"); + } + + return response.data.data._meta.block.number; + } catch (error) { + // Return null if we can't check sync status, caller should handle gracefully + return null; + } +}; + +const getOrderFilledEventsSlow = async ( params: MonitoringParams, clobTokenIds: [string, string], startBlockNumber: number @@ -439,6 +653,38 @@ export const getOrderFilledEvents = async ( return [outcomeTokenOne, outcomeTokenTwo]; }; +export const getOrderFilledEvents = async ( + params: MonitoringParams, + clobTokenIds: [string, string], + startBlockNumber: number +): Promise<[PolymarketTradeInformation[], PolymarketTradeInformation[]]> => { + try { + // Check subgraph sync status first + const subgraphBlockNumber = await checkSubgraphSyncStatus(params.httpClient, params.orderBookSubgraphEndpoint); + + if (subgraphBlockNumber !== null) { + // Get current block from provider + const currentBlockNumber = await params.provider.getBlockNumber(); + const blockDifference = currentBlockNumber - subgraphBlockNumber; + + // If subgraph is behind by more than tolerance, use slow method + if (blockDifference >= params.subgraphSyncTolerance) { + return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber); + } + } + + // Get the block timestamp from startBlockNumber + const startBlock = await params.provider.getBlock(startBlockNumber); + const startTimestamp = startBlock.timestamp; + + // Try the fast subgraph version + return await getOrderFilledEventsFromSubgraph(params, clobTokenIds, startTimestamp); + } catch (error) { + // Fallback to the slow version if subgraph fails + return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber); + } +}; + export const calculatePolymarketQuestionID = (ancillaryData: string): string => { return ethers.utils.keccak256(ancillaryData); }; @@ -907,6 +1153,12 @@ export const initMonitoringParams = async ( const orderBookBatchSize = env.ORDER_BOOK_BATCH_SIZE ? Number(env.ORDER_BOOK_BATCH_SIZE) : 499; + const orderBookSubgraphEndpoint = + env.ORDER_BOOK_SUBGRAPH_ENDPOINT || + "https://api.goldsky.com/api/public/project_cl6mb8i9h0003e201j6li0diw/subgraphs/orderbook-subgraph/0.0.1/gn"; + + const subgraphSyncTolerance = env.SUBGRAPH_SYNC_TOLERANCE ? Number(env.SUBGRAPH_SYNC_TOLERANCE) : 5; + // Rate limit and retry with exponential backoff and jitter to handle rate limiting and errors from the APIs. const httpClient = createHttpClient({ axios: { timeout: httpTimeout }, @@ -948,9 +1200,11 @@ export const initMonitoringParams = async ( fillEventsProposalGapSeconds, httpClient, orderBookBatchSize, + orderBookSubgraphEndpoint, ooV2Addresses, ooV1Addresses, aiConfig, + subgraphSyncTolerance, }; }; From 236cf381cf06e5b8e2883b5dedd65856e2359377 Mon Sep 17 00:00:00 2001 From: Pablo Maldonado Date: Thu, 4 Dec 2025 17:44:18 +0000 Subject: [PATCH 2/5] fix: filter --- packages/monitor-v2/src/monitor-polymarket/common.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/monitor-v2/src/monitor-polymarket/common.ts b/packages/monitor-v2/src/monitor-polymarket/common.ts index 6ab5ce1574..37be4d0272 100644 --- a/packages/monitor-v2/src/monitor-polymarket/common.ts +++ b/packages/monitor-v2/src/monitor-polymarket/common.ts @@ -222,7 +222,6 @@ export const getPolymarketProposedPriceRequestsOO = async ( proposeEvents .filter((event) => requesterAddresses.map((r) => r.toLowerCase()).includes(event.args.requester.toLowerCase())) .filter((event) => { - return currentTimeBN.lt(event.args.expirationTimestamp); const expirationTime = event.args.expirationTimestamp; const thresholdTime = expirationTime.sub(threshold); // Only keep if current time is greater than (expiration - threshold) but less than expiration. From 951a370e09083514c9262774fd8d9e0990b9b92e Mon Sep 17 00:00:00 2001 From: Pablo Maldonado Date: Thu, 4 Dec 2025 18:01:24 +0000 Subject: [PATCH 3/5] feat: update subgraph sync tolerance --- packages/monitor-v2/src/monitor-polymarket/common.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/monitor-v2/src/monitor-polymarket/common.ts b/packages/monitor-v2/src/monitor-polymarket/common.ts index 37be4d0272..e2ede9454f 100644 --- a/packages/monitor-v2/src/monitor-polymarket/common.ts +++ b/packages/monitor-v2/src/monitor-polymarket/common.ts @@ -1156,7 +1156,7 @@ export const initMonitoringParams = async ( env.ORDER_BOOK_SUBGRAPH_ENDPOINT || "https://api.goldsky.com/api/public/project_cl6mb8i9h0003e201j6li0diw/subgraphs/orderbook-subgraph/0.0.1/gn"; - const subgraphSyncTolerance = env.SUBGRAPH_SYNC_TOLERANCE ? Number(env.SUBGRAPH_SYNC_TOLERANCE) : 5; + const subgraphSyncTolerance = env.SUBGRAPH_SYNC_TOLERANCE ? Number(env.SUBGRAPH_SYNC_TOLERANCE) : 1; // Rate limit and retry with exponential backoff and jitter to handle rate limiting and errors from the APIs. const httpClient = createHttpClient({ From 9d5133b53819a2f69c26b6e2a518cb232a5d03f3 Mon Sep 17 00:00:00 2001 From: Pablo Maldonado Date: Fri, 5 Dec 2025 14:32:19 +0000 Subject: [PATCH 4/5] fix: lint --- .../MonitorProposalsOrderBook.ts | 2 +- .../src/monitor-polymarket/common.ts | 36 ++++++++++++++----- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/packages/monitor-v2/src/monitor-polymarket/MonitorProposalsOrderBook.ts b/packages/monitor-v2/src/monitor-polymarket/MonitorProposalsOrderBook.ts index 4b4252ce4a..ed89e60c53 100644 --- a/packages/monitor-v2/src/monitor-polymarket/MonitorProposalsOrderBook.ts +++ b/packages/monitor-v2/src/monitor-polymarket/MonitorProposalsOrderBook.ts @@ -122,7 +122,7 @@ export async function processProposal( const buyingLoserSide = books[outcome.loser].bids.find((b) => b.price > thresholds.bids); const fromBlock = Math.max(proposalGapStartBlock, currentBlock - lookbackBlocks); - const fills = await getOrderFilledEvents(params, market.clobTokenIds, fromBlock); + const fills = await getOrderFilledEvents(params, market.clobTokenIds, fromBlock, logger); const soldWinner = fills[outcome.winner].filter((f) => f.type === "sell" && f.price < thresholds.asks); const boughtLoser = fills[outcome.loser].filter((f) => f.type === "buy" && f.price > thresholds.bids); diff --git a/packages/monitor-v2/src/monitor-polymarket/common.ts b/packages/monitor-v2/src/monitor-polymarket/common.ts index e2ede9454f..4297e5668c 100644 --- a/packages/monitor-v2/src/monitor-polymarket/common.ts +++ b/packages/monitor-v2/src/monitor-polymarket/common.ts @@ -655,7 +655,8 @@ const getOrderFilledEventsSlow = async ( export const getOrderFilledEvents = async ( params: MonitoringParams, clobTokenIds: [string, string], - startBlockNumber: number + startBlockNumber: number, + logger: typeof Logger ): Promise<[PolymarketTradeInformation[], PolymarketTradeInformation[]]> => { try { // Check subgraph sync status first @@ -667,19 +668,38 @@ export const getOrderFilledEvents = async ( const blockDifference = currentBlockNumber - subgraphBlockNumber; // If subgraph is behind by more than tolerance, use slow method - if (blockDifference >= params.subgraphSyncTolerance) { + if (blockDifference > params.subgraphSyncTolerance) { + logger.debug({ + at: "getOrderFilledEvents", + message: `Falling back to slow method: subgraph is ${blockDifference} blocks behind (tolerance: ${params.subgraphSyncTolerance})`, + }); return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber); } - } - // Get the block timestamp from startBlockNumber - const startBlock = await params.provider.getBlock(startBlockNumber); - const startTimestamp = startBlock.timestamp; + // Get the block timestamp from startBlockNumber + const startBlock = await params.provider.getBlock(startBlockNumber); + const startTimestamp = startBlock.timestamp; - // Try the fast subgraph version - return await getOrderFilledEventsFromSubgraph(params, clobTokenIds, startTimestamp); + // Try the fast subgraph version + logger.debug({ + at: "getOrderFilledEvents", + message: `Using fast subgraph method: subgraph is ${blockDifference} blocks behind (tolerance: ${params.subgraphSyncTolerance})`, + }); + return await getOrderFilledEventsFromSubgraph(params, clobTokenIds, startTimestamp); + } else { + // If subgraphBlockNumber is null, we cannot evaluate sync tolerance, so fallback to slow method + logger.debug({ + at: "getOrderFilledEvents", + message: "Falling back to slow method: subgraph block number is null, cannot evaluate sync tolerance", + }); + return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber); + } } catch (error) { // Fallback to the slow version if subgraph fails + logger.debug({ + at: "getOrderFilledEvents", + message: `Falling back to slow method due to error: ${error instanceof Error ? error.message : String(error)}`, + }); return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber); } }; From 6f6d6f9f845a17d6789188a195abff9260934b07 Mon Sep 17 00:00:00 2001 From: Pablo Maldonado Date: Fri, 5 Dec 2025 14:44:46 +0000 Subject: [PATCH 5/5] fix: test --- packages/monitor-v2/test/PolymarketMonitor.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/monitor-v2/test/PolymarketMonitor.ts b/packages/monitor-v2/test/PolymarketMonitor.ts index 2141f58c37..2d1b4ca3a3 100644 --- a/packages/monitor-v2/test/PolymarketMonitor.ts +++ b/packages/monitor-v2/test/PolymarketMonitor.ts @@ -1166,7 +1166,8 @@ describe("PolymarketNotifier", function () { sandbox.stub(ethers, "Contract").returns({ filters: { OrderFilled: () => ({ topics: [] }) } } as any); - await commonModule.getOrderFilledEvents(params, ["0xdeadbeef", "0xfeedface"], fromBlockParam); + const logger = createNewLogger([new SpyTransport({}, { spy: sinon.spy() })]); + await commonModule.getOrderFilledEvents(params, ["0xdeadbeef", "0xfeedface"], fromBlockParam, logger); assert.equal( capturedFromBlock,