From f2133176b57189a0840e04ec222d774568f5a8f0 Mon Sep 17 00:00:00 2001 From: Gerald Baulig Date: Fri, 28 Nov 2025 13:02:16 +0100 Subject: [PATCH 1/2] fix(kafka-client): ensure bigint at all cost --- packages/kafka-client/src/events/provider/kafka/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka-client/src/events/provider/kafka/index.ts b/packages/kafka-client/src/events/provider/kafka/index.ts index 1a76eb81..4a82fcc9 100644 --- a/packages/kafka-client/src/events/provider/kafka/index.ts +++ b/packages/kafka-client/src/events/provider/kafka/index.ts @@ -323,7 +323,7 @@ export class Topic { offsets: [{ topic: this.name, partition: 0, - offset: offsetValue + offset: BigInt(offsetValue) }], }).then(stream => { this.provider.logger.info(`Consumer for topic '${this.name}' subscribed`); From 2d85d41f6b980f80847fbc3b1d597f1d979e7730 Mon Sep 17 00:00:00 2001 From: Gerald Baulig Date: Fri, 28 Nov 2025 14:40:14 +0100 Subject: [PATCH 2/2] fix(resource-base-interface): experimental WorkerBase init topics sequential --- .../src/experimental/WorkerBase.ts | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/packages/resource-base-interface/src/experimental/WorkerBase.ts b/packages/resource-base-interface/src/experimental/WorkerBase.ts index 3e34793b..315b87ad 100644 --- a/packages/resource-base-interface/src/experimental/WorkerBase.ts +++ b/packages/resource-base-interface/src/experimental/WorkerBase.ts @@ -342,27 +342,25 @@ export abstract class WorkerBase { await this.events.start(); this.offsetStore = new OffsetStore(this.events, this.cfg, this.logger); - await Promise.all(Object.entries(kafkaCfg.topics).map(async ([key, value]: any[]) => { + for (const [key, value] of Object.entries(kafkaCfg.topics ?? {})) { const topicName = value.topic; const topic = await this.events.topic(topicName); const offsetValue = await this.offsetStore.getOffset(topicName); this.logger?.verbose('subscribing to topic with offset value', topicName, offsetValue); - Object.entries(value.events as { [key: string]: string } ?? {}).forEach( - ([eventName, handler]) => { - const i = handler.lastIndexOf('.'); - const name = handler.slice(0, i); - const serviceName = serviceNames?.[name] ?? name; - const functionName = handler.slice(i + 1); - this.eventHandlers.set(eventName, this.bindHandler(serviceName, functionName)); - topic.on( - eventName as string, - this.eventHandlers.get(eventName), - { startingOffset: offsetValue } - ); - } - ); + for (const [eventName, handler] of Object.entries(value.events ?? {})) { + const i = handler.lastIndexOf('.'); + const name = handler.slice(0, i); + const serviceName = serviceNames?.[name] ?? name; + const functionName = handler.slice(i + 1); + this.eventHandlers.set(eventName, this.bindHandler(serviceName, functionName)); + await topic.on( + eventName as string, + this.eventHandlers.get(eventName), + { startingOffset: BigInt(offsetValue) } + ); + } this.topics.set(key, topic); - })); + } } protected async bindScheduledJobs() { const job_config = this.cfg.get('scs-jobs');