Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# node-rabbitmq
[![npm version](https://badge.fury.io/js/%40coinify%2Frabbitmq.svg)](https://badge.fury.io/js/%40coinify%2Frabbitmq)

## Publising new version
After finisihing the changes to the library the following steps should be taken to publish the package

1. Update the version number in package.json
2. Run npm install
3. Run npm run build
4. Push changes and make a pr, get it approved and merge
5. When approved run npm publish from local machine

## Major version changes
**Version 2**:
* Publishing messages now defaults to using _publisher confirms_
Expand Down
2 changes: 1 addition & 1 deletion dist/src/ChannelPool.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export default class ChannelPool {
private onChannelClosed;
private channels;
private getChannelPromise;
constructor(logger: Logger, getConnection: () => Promise<amqplib.Connection>, onChannelOpened: (channel: amqplib.Channel, type: ChannelType) => Promise<void>, onChannelClosed: (type: ChannelType, err?: Error) => Promise<void>);
constructor(logger: Logger, getConnection: () => Promise<amqplib.ChannelModel>, onChannelOpened: (channel: amqplib.Channel, type: ChannelType) => Promise<void>, onChannelClosed: (type: ChannelType, err?: Error) => Promise<void>);
getConsumerChannel(): Promise<amqplib.Channel>;
getPublisherChannel(usePublisherConfirm: boolean): Promise<amqplib.Channel>;
close(): Promise<void>;
Expand Down
3 changes: 2 additions & 1 deletion dist/src/CoinifyRabbit.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export default class CoinifyRabbit extends EventEmitter {
assertConnection(): Promise<void>;
private _conn?;
private _getConnectionPromise?;
_getConnection(): Promise<amqplib.Connection>;
_getConnection(): Promise<amqplib.ChannelModel>;
private onConnectionOpened;
static _generateConnectionUrl(connectionConfig: CoinifyRabbitConnectionConfiguration): string;
private _onChannelOpened;
private _recreateRegisteredConsumers;
Expand Down
53 changes: 48 additions & 5 deletions dist/src/CoinifyRabbit.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ class CoinifyRabbit extends events_1.EventEmitter {
if (uniqueQueue) {
queueOptions.autoDelete = true;
}
if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) {
queueOptions.arguments = {
...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments,
'x-queue-type': 'quorum'
};
}
const q = await channel.assertQueue(eventQueueName, queueOptions);
await channel.bindQueue(q.queue, exchangeName, eventKey);
const prefetch = (_d = (_c = options === null || options === void 0 ? void 0 : options.consumer) === null || _c === void 0 ? void 0 : _c.prefetch) !== null && _d !== void 0 ? _d : this.config.consumer.prefetch;
Expand Down Expand Up @@ -141,6 +147,12 @@ class CoinifyRabbit extends events_1.EventEmitter {
if (uniqueQueue) {
queueOptions.autoDelete = true;
}
if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) {
queueOptions.arguments = {
...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments,
'x-queue-type': 'quorum'
};
}
const q = await channel.assertQueue(taskQueueName, queueOptions);
await channel.bindQueue(q.queue, exchangeName, fullTaskName);
const prefetch = (_d = (_c = options === null || options === void 0 ? void 0 : options.consumer) === null || _c === void 0 ? void 0 : _c.prefetch) !== null && _d !== void 0 ? _d : this.config.consumer.prefetch;
Expand All @@ -154,7 +166,14 @@ class CoinifyRabbit extends events_1.EventEmitter {
const channel = await this.channels.getConsumerChannel();
const queueName = this.config.queues.failed;
this.logger.trace({ queueName }, 'registerFailedMessageConsumer()');
const q = await channel.assertQueue(queueName, options === null || options === void 0 ? void 0 : options.queue);
const queueOptions = { ...options === null || options === void 0 ? void 0 : options.queue };
if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) {
queueOptions.arguments = {
...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments,
'x-queue-type': 'quorum'
};
}
const q = await channel.assertQueue(queueName, queueOptions);
const prefetch = (_b = (_a = options === null || options === void 0 ? void 0 : options.consumer) === null || _a === void 0 ? void 0 : _a.prefetch) !== null && _b !== void 0 ? _b : this.config.consumer.prefetch;
await channel.prefetch(prefetch, false);
const { consumerTag } = await channel.consume(q.queue, (message) => message && this._handleFailedMessage(message, { ...options, queueName }, consumeFn), { consumerTag: options === null || options === void 0 ? void 0 : options.consumerTag });
Expand All @@ -181,9 +200,9 @@ class CoinifyRabbit extends events_1.EventEmitter {
});
this._conn.on('close', err => {
delete this._conn;
this.logger.info({ err }, 'Connection closed');
this.logger.info({ err }, 'RabbitMQ Connection closed');
});
this.logger.info({}, 'Connection opened');
await this.onConnectionOpened();
}
catch (err) {
this.logger.error({ err }, 'Error connecting to RabbitMQ');
Expand All @@ -200,6 +219,12 @@ class CoinifyRabbit extends events_1.EventEmitter {
}
return this._conn;
}
async onConnectionOpened() {
this.logger.info({}, 'RabbitMQ Connection opened');
if (this.consumers.length) {
await this._recreateRegisteredConsumers();
}
}
static _generateConnectionUrl(connectionConfig) {
if (!['amqp', 'amqps'].includes(connectionConfig.protocol)) {
throw new Error(`Invalid protocol '${connectionConfig.protocol}'. Must be 'amqp' or 'amqps'`);
Expand All @@ -226,7 +251,6 @@ class CoinifyRabbit extends events_1.EventEmitter {
if (type === 'consumer') {
const prefetch = this.config.channel.prefetch;
await channel.prefetch(prefetch, true);
await this._recreateRegisteredConsumers();
}
}
async _recreateRegisteredConsumers() {
Expand Down Expand Up @@ -527,6 +551,12 @@ class CoinifyRabbit extends events_1.EventEmitter {
deadLetterExchange: '',
messageTtl: delayMs
});
if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) {
queueOptions.arguments = {
...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments,
'x-queue-type': 'quorum'
};
}
const q = await channel.assertQueue(retryQueueName, queueOptions);
await channel.bindQueue(q.queue, retryExchangeName, q.queue);
return { retryExchangeName, retryQueueName };
Expand All @@ -544,6 +574,12 @@ class CoinifyRabbit extends events_1.EventEmitter {
deadLetterExchange: tasksExchangeName,
messageTtl: delayMillis
};
if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) {
queueOptions.arguments = {
...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments,
'x-queue-type': 'quorum'
};
}
const q = await channel.assertQueue(delayedQueueName, queueOptions);
await channel.bindQueue(q.queue, delayedExchangeName, q.queue);
return { delayedExchangeName, delayedQueueName };
Expand All @@ -552,7 +588,14 @@ class CoinifyRabbit extends events_1.EventEmitter {
const deadLetterExchangeName = this.config.exchanges.failed;
const deadLetterQueueName = this.config.queues.failed;
await channel.assertExchange(deadLetterExchangeName, 'fanout', options === null || options === void 0 ? void 0 : options.exchange);
const q = await channel.assertQueue(deadLetterQueueName, options === null || options === void 0 ? void 0 : options.queue);
const queueOptions = { ...options === null || options === void 0 ? void 0 : options.queue };
if (this.config.queues.useQuorumQueues && queueOptions.durable !== false && queueOptions.autoDelete !== true && queueOptions.exclusive !== true) {
queueOptions.arguments = {
...queueOptions === null || queueOptions === void 0 ? void 0 : queueOptions.arguments,
'x-queue-type': 'quorum'
};
}
const q = await channel.assertQueue(deadLetterQueueName, queueOptions);
await channel.bindQueue(q.queue, deadLetterExchangeName, '');
return deadLetterExchangeName;
}
Expand Down
1 change: 1 addition & 0 deletions dist/src/CoinifyRabbitConfiguration.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export default interface CoinifyRabbitConfiguration {
retryPrefix: string;
delayedTaskPrefix: string;
failed: string;
useQuorumQueues?: boolean;
};
publish: {
persistentMessages: boolean;
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@coinify/rabbitmq",
"version": "3.0.0",
"version": "3.0.1",
"description": "Coinify RabbitMQ client with support for events and tasks",
"main": "dist/index.js",
"types": "dist/index",
Expand Down Expand Up @@ -62,4 +62,4 @@
"url": "git@github.com:CoinifySoftware/node-rabbitmq.git"
},
"license": "MIT"
}
}