From f9f60b58a3130a0a4340656e381eeec2beb32b03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Morten=20Due=20Esh=C3=B8j?= Date: Tue, 20 May 2025 10:11:36 +0200 Subject: [PATCH 1/2] CY-2196 have to manually remember to build. so did that --- dist/src/ChannelPool.d.ts | 2 +- dist/src/CoinifyRabbit.d.ts | 3 +- dist/src/CoinifyRabbit.js | 53 +++++++++++++++++++++--- dist/src/CoinifyRabbitConfiguration.d.ts | 1 + package.json | 4 +- 5 files changed, 54 insertions(+), 9 deletions(-) diff --git a/dist/src/ChannelPool.d.ts b/dist/src/ChannelPool.d.ts index d4bffd8..0af5619 100644 --- a/dist/src/ChannelPool.d.ts +++ b/dist/src/ChannelPool.d.ts @@ -8,7 +8,7 @@ export default class ChannelPool { private onChannelClosed; private channels; private getChannelPromise; - constructor(logger: Logger, getConnection: () => Promise, onChannelOpened: (channel: amqplib.Channel, type: ChannelType) => Promise, onChannelClosed: (type: ChannelType, err?: Error) => Promise); + constructor(logger: Logger, getConnection: () => Promise, onChannelOpened: (channel: amqplib.Channel, type: ChannelType) => Promise, onChannelClosed: (type: ChannelType, err?: Error) => Promise); getConsumerChannel(): Promise; getPublisherChannel(usePublisherConfirm: boolean): Promise; close(): Promise; diff --git a/dist/src/CoinifyRabbit.d.ts b/dist/src/CoinifyRabbit.d.ts index 3baa85d..62bc1a7 100644 --- a/dist/src/CoinifyRabbit.d.ts +++ b/dist/src/CoinifyRabbit.d.ts @@ -27,7 +27,8 @@ export default class CoinifyRabbit extends EventEmitter { assertConnection(): Promise; private _conn?; private _getConnectionPromise?; - _getConnection(): Promise; + _getConnection(): Promise; + private onConnectionOpened; static _generateConnectionUrl(connectionConfig: CoinifyRabbitConnectionConfiguration): string; private _onChannelOpened; private _recreateRegisteredConsumers; diff --git a/dist/src/CoinifyRabbit.js b/dist/src/CoinifyRabbit.js index 46e70b3..fa7cabd 100644 --- a/dist/src/CoinifyRabbit.js +++ b/dist/src/CoinifyRabbit.js @@ -84,6 +84,12 @@ class CoinifyRabbit extends events_1.EventEmitter { if (uniqueQueue) { queueOptions.autoDelete = true; } + if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) { + queueOptions.arguments = { + ...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments, + 'x-queue-type': 'quorum' + }; + } const q = await channel.assertQueue(eventQueueName, queueOptions); await channel.bindQueue(q.queue, exchangeName, eventKey); const prefetch = (_d = (_c = options === null || options === void 0 ? void 0 : options.consumer) === null || _c === void 0 ? void 0 : _c.prefetch) !== null && _d !== void 0 ? _d : this.config.consumer.prefetch; @@ -141,6 +147,12 @@ class CoinifyRabbit extends events_1.EventEmitter { if (uniqueQueue) { queueOptions.autoDelete = true; } + if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) { + queueOptions.arguments = { + ...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments, + 'x-queue-type': 'quorum' + }; + } const q = await channel.assertQueue(taskQueueName, queueOptions); await channel.bindQueue(q.queue, exchangeName, fullTaskName); const prefetch = (_d = (_c = options === null || options === void 0 ? void 0 : options.consumer) === null || _c === void 0 ? void 0 : _c.prefetch) !== null && _d !== void 0 ? _d : this.config.consumer.prefetch; @@ -154,7 +166,14 @@ class CoinifyRabbit extends events_1.EventEmitter { const channel = await this.channels.getConsumerChannel(); const queueName = this.config.queues.failed; this.logger.trace({ queueName }, 'registerFailedMessageConsumer()'); - const q = await channel.assertQueue(queueName, options === null || options === void 0 ? void 0 : options.queue); + const queueOptions = { ...options === null || options === void 0 ? void 0 : options.queue }; + if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) { + queueOptions.arguments = { + ...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments, + 'x-queue-type': 'quorum' + }; + } + const q = await channel.assertQueue(queueName, queueOptions); const prefetch = (_b = (_a = options === null || options === void 0 ? void 0 : options.consumer) === null || _a === void 0 ? void 0 : _a.prefetch) !== null && _b !== void 0 ? _b : this.config.consumer.prefetch; await channel.prefetch(prefetch, false); const { consumerTag } = await channel.consume(q.queue, (message) => message && this._handleFailedMessage(message, { ...options, queueName }, consumeFn), { consumerTag: options === null || options === void 0 ? void 0 : options.consumerTag }); @@ -181,9 +200,9 @@ class CoinifyRabbit extends events_1.EventEmitter { }); this._conn.on('close', err => { delete this._conn; - this.logger.info({ err }, 'Connection closed'); + this.logger.info({ err }, 'RabbitMQ Connection closed'); }); - this.logger.info({}, 'Connection opened'); + await this.onConnectionOpened(); } catch (err) { this.logger.error({ err }, 'Error connecting to RabbitMQ'); @@ -200,6 +219,12 @@ class CoinifyRabbit extends events_1.EventEmitter { } return this._conn; } + async onConnectionOpened() { + this.logger.info({}, 'RabbitMQ Connection opened'); + if (this.consumers.length) { + await this._recreateRegisteredConsumers(); + } + } static _generateConnectionUrl(connectionConfig) { if (!['amqp', 'amqps'].includes(connectionConfig.protocol)) { throw new Error(`Invalid protocol '${connectionConfig.protocol}'. Must be 'amqp' or 'amqps'`); @@ -226,7 +251,6 @@ class CoinifyRabbit extends events_1.EventEmitter { if (type === 'consumer') { const prefetch = this.config.channel.prefetch; await channel.prefetch(prefetch, true); - await this._recreateRegisteredConsumers(); } } async _recreateRegisteredConsumers() { @@ -527,6 +551,12 @@ class CoinifyRabbit extends events_1.EventEmitter { deadLetterExchange: '', messageTtl: delayMs }); + if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) { + queueOptions.arguments = { + ...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments, + 'x-queue-type': 'quorum' + }; + } const q = await channel.assertQueue(retryQueueName, queueOptions); await channel.bindQueue(q.queue, retryExchangeName, q.queue); return { retryExchangeName, retryQueueName }; @@ -544,6 +574,12 @@ class CoinifyRabbit extends events_1.EventEmitter { deadLetterExchange: tasksExchangeName, messageTtl: delayMillis }; + if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) { + queueOptions.arguments = { + ...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments, + 'x-queue-type': 'quorum' + }; + } const q = await channel.assertQueue(delayedQueueName, queueOptions); await channel.bindQueue(q.queue, delayedExchangeName, q.queue); return { delayedExchangeName, delayedQueueName }; @@ -552,7 +588,14 @@ class CoinifyRabbit extends events_1.EventEmitter { const deadLetterExchangeName = this.config.exchanges.failed; const deadLetterQueueName = this.config.queues.failed; await channel.assertExchange(deadLetterExchangeName, 'fanout', options === null || options === void 0 ? void 0 : options.exchange); - const q = await channel.assertQueue(deadLetterQueueName, options === null || options === void 0 ? void 0 : options.queue); + const queueOptions = { ...options === null || options === void 0 ? void 0 : options.queue }; + if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) { + queueOptions.arguments = { + ...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments, + 'x-queue-type': 'quorum' + }; + } + const q = await channel.assertQueue(deadLetterQueueName, queueOptions); await channel.bindQueue(q.queue, deadLetterExchangeName, ''); return deadLetterExchangeName; } diff --git a/dist/src/CoinifyRabbitConfiguration.d.ts b/dist/src/CoinifyRabbitConfiguration.d.ts index 9fae849..ad5106a 100644 --- a/dist/src/CoinifyRabbitConfiguration.d.ts +++ b/dist/src/CoinifyRabbitConfiguration.d.ts @@ -26,6 +26,7 @@ export default interface CoinifyRabbitConfiguration { retryPrefix: string; delayedTaskPrefix: string; failed: string; + useQuorumQueues?: boolean; }; publish: { persistentMessages: boolean; diff --git a/package.json b/package.json index 1a95f95..85a2537 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@coinify/rabbitmq", - "version": "3.0.0", + "version": "3.0.1", "description": "Coinify RabbitMQ client with support for events and tasks", "main": "dist/index.js", "types": "dist/index", @@ -62,4 +62,4 @@ "url": "git@github.com:CoinifySoftware/node-rabbitmq.git" }, "license": "MIT" -} +} \ No newline at end of file From 640c78cad1eed58d99e10d0f1e45cfba3eef194f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Morten=20Due=20Esh=C3=B8j?= Date: Tue, 20 May 2025 10:43:45 +0200 Subject: [PATCH 2/2] CY-2196 update documentation --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index a773acd..a70096d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,15 @@ # node-rabbitmq [![npm version](https://badge.fury.io/js/%40coinify%2Frabbitmq.svg)](https://badge.fury.io/js/%40coinify%2Frabbitmq) +## Publising new version +After finisihing the changes to the library the following steps should be taken to publish the package + +1. Update the version number in package.json +2. Run npm install +3. Run npm run build +4. Push changes and make a pr, get it approved and merge +5. When approved run npm publish from local machine + ## Major version changes **Version 2**: * Publishing messages now defaults to using _publisher confirms_