diff --git a/README.md b/README.md index 76069bf..291e324 100644 --- a/README.md +++ b/README.md @@ -261,6 +261,24 @@ config.schedule = { directory: [ path.join(__dirname, '../app/otherSchedule'), ], + // add it when making sure it only running in one cluster. + cluster: { + enable: false, // default is false, true is for running + lockType: 'redis', //use redis as lock. reserve for other lock type + lockedTtl: 6000, // the automatic expired time for lock, avoid lock leak + // add redis for redis lock + redis: { + client: { + port: 6379, // Redis port + host: '127.0.0.1', // Redis host + password: 'auth', + db: 0, + }, + }, + }, + // The final Redis key pattern will be: `${prefix}-${default}-${scheduleName}` + default: 'default', // default schedule name,like project-name. + prefix: 'schedule', // default schedule prefix }; ``` diff --git a/agent.js b/agent.js index 730edfd..d8cf2b3 100644 --- a/agent.js +++ b/agent.js @@ -2,6 +2,7 @@ const WorkerStrategy = require('./lib/strategy/worker'); const AllStrategy = require('./lib/strategy/all'); +const RedisLock = require('./lib/lock/redis_lock'); module.exports = agent => { // register built-in strategy @@ -11,6 +12,14 @@ module.exports = agent => { // wait for other plugin to register custom strategy agent.beforeStart(() => { agent.schedule.init(); + if (agent?.config?.schedule?.cluster?.enable) { + if ( + agent.config.schedule.cluster.lockType === 'redis' && + agent.config.schedule.cluster.redis + ) { + agent.lockManager = new RedisLock(agent); + } + } }); // dispatch job finish event to strategy diff --git a/config/config.default.js b/config/config.default.js index 0fba4d8..f598c6c 100644 --- a/config/config.default.js +++ b/config/config.default.js @@ -13,6 +13,21 @@ module.exports = () => { config.schedule = { // custom additional directory, full path directory: [], + cluster: { + enable: false, + lockType: 'redis', + lockedTtl: 60000, + redis: { + client: { + port: 6379, // Redis port + host: '127.0.0.1', // Redis host + password: 'auth', + db: 0, + }, + }, + }, + default: 'default', + prefix: 'schedule', }; return config; diff --git a/lib/lock/base.js b/lib/lock/base.js new file mode 100644 index 0000000..503012a --- /dev/null +++ b/lib/lock/base.js @@ -0,0 +1,38 @@ +'use strict'; + +module.exports = class LockManager { + constructor(agent) { + this.agent = agent; + this.client = null; + this.options = null; + this.prefixKey = `${this.agent?.config?.schedule?.default === 'default' ? this.agent.name : this.agent?.config?.schedule?.default}-${this.agent?.config?.schedule?.prefix}`; + } + + /** + * Require a lock from lock manager + * + * @param {string} _lockedKey - The key to lock + */ + async acquire() { + // Implementation here + } + + /** + * Release a lock from lock manager + * + * @param {string} _lockedKey - The key to unlock + */ + async release() { + // Implementation here + } + + /** + * Try to acquire without waiting + * + * @param {string} _lockedKey - The key to try to lock + */ + async tryAcquire() { + // Use _lockedKey in the implementation + // Implementation here + } +}; diff --git a/lib/lock/redis_lock.js b/lib/lock/redis_lock.js new file mode 100644 index 0000000..cc14d60 --- /dev/null +++ b/lib/lock/redis_lock.js @@ -0,0 +1,70 @@ +'use strict'; +const LockManager = require('./base'); +const Redis = require('ioredis'); + +module.exports = class RedisLock extends LockManager { + constructor(agent) { + super(agent); + this.client = new Redis(this.options); + } + + /** + * Acquire a lock with waiting time + * @param {string} lockedKey - The key to be locked + * @param {number} expiredTime - The duration in milliseconds for which the lock should be expired automatically + */ + async acquire( + lockedKey, + expiredTime = this.agent.config.schedule.cluster.lockedTtl + ) { + // Try again during 5s when it's locked + const start = Date.now(); + while (Date.now() - start < expiredTime) { + if (await this.tryAcquire(lockedKey, expiredTime)) { + return true; + } + // Set random sleep time to avoid lock conflicts, random between 0.1s and 1s + const randomSleepTime = Math.random() * 900 + 100; + await new Promise(resolve => setTimeout(resolve, randomSleepTime)); + } + return false; + } + + /** + * Release a lock from lock manager + * @param {string} lockedKey - The key to be unlocked */ + async release(lockedKey) { + try { + lockedKey = `${this.prefixKey}-${lockedKey}`; + await this.client.del(lockedKey); + } catch (err) { + this.logger.error( + `[egg-schedule] ${this.key} release lock error: ${err.message}` + ); + } + } + + /** + * Try to acquire immediately without waiting + * @param {string} lockedKey + * @param {number} expiredTime + */ + async tryAcquire( + lockedKey, + expiredTime = this.agent.config.schedule.cluster.lockedTtl + ) { + try { + lockedKey = `${this.prefixKey}-${lockedKey}`; + if (await this.client.get(lockedKey)) { + return false; + } + await this.client.set(lockedKey, true, 'PX', expiredTime); + return true; + } catch (err) { + this.logger.error( + `[egg-schedule] ${this.key} try acquire lock error: ${err.message}` + ); + return false; + } + } +}; diff --git a/lib/strategy/all.js b/lib/strategy/all.js index 1b7b7df..5280f46 100644 --- a/lib/strategy/all.js +++ b/lib/strategy/all.js @@ -1,9 +1,26 @@ 'use strict'; const Strategy = require('./timer'); +const path = require('path'); module.exports = class AllStrategy extends Strategy { - handler() { - this.sendAll(); + async handler() { + let canBeLocked = true; + const curConfig = this.agent?.config?.schedule; + let lockedKey = ''; + if (curConfig?.cluster?.enable) { + lockedKey = path.relative(this.agent.baseDir, this.key); + if (!(await this.agent.lockManager.tryAcquire(lockedKey))) { + canBeLocked = false; + } + } + + if (canBeLocked) { + this.sendAll(); + } + + if (curConfig?.cluster?.enable) { + await this.agent.lockManager.release(lockedKey); + } } }; diff --git a/lib/strategy/worker.js b/lib/strategy/worker.js index 5b83024..1c7552d 100644 --- a/lib/strategy/worker.js +++ b/lib/strategy/worker.js @@ -1,9 +1,26 @@ 'use strict'; const Strategy = require('./timer'); +const path = require('path'); -module.exports = class WorkerStrategy extends Strategy { - handler() { - this.sendOne(); +module.exports = class AllStrategy extends Strategy { + async handler() { + let canBeLocked = true; + const curConfig = this.agent?.config?.schedule; + let lockedKey = ''; + if (curConfig?.cluster?.enable) { + lockedKey = path.relative(this.agent.baseDir, this.key); + if (!(await this.agent.lockManager.tryAcquire(lockedKey))) { + canBeLocked = false; + } + } + + if (canBeLocked) { + this.sendOne(); + } + + if (curConfig?.cluster?.enable) { + await this.agent.lockManager.release(lockedKey); + } } }; diff --git a/package.json b/package.json index b5de675..6e03212 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "dependencies": { "cron-parser": "^2.16.3", "humanize-ms": "^1.2.1", + "ioredis": "^5.4.1", "is-type-of": "^1.2.1", "safe-timers": "^1.1.0", "utility": "^1.16.3" @@ -36,7 +37,8 @@ "egg-mock": "^5.3.0", "egg-tracer": "^1.1.0", "eslint": "^8.29.0", - "eslint-config-egg": "^12.1.0" + "eslint-config-egg": "^12.1.0", + "mocha": "^10.8.2" }, "engines": { "node": ">=14.17.0"