From 6ec7290d6614c598800249bfa44d6a21b85c24ed Mon Sep 17 00:00:00 2001 From: "Brian,Kun Liu" Date: Wed, 13 Nov 2024 02:33:41 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E2=9C=A8=20feat:=20avoid=20run=20many=20ti?= =?UTF-8?q?mes=20for=20the=20same=20job=20when=20it's=20deployed=20in=20k8?= =?UTF-8?q?s=20clusters?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent.js | 7 +++++++ config/config.default.js | 13 +++++++++++++ lib/strategy/all.js | 30 ++++++++++++++++++++++++++++-- lib/strategy/worker.js | 30 ++++++++++++++++++++++++++++-- package.json | 1 + 5 files changed, 77 insertions(+), 4 deletions(-) diff --git a/agent.js b/agent.js index 730edfd..c430c11 100644 --- a/agent.js +++ b/agent.js @@ -2,6 +2,7 @@ const WorkerStrategy = require('./lib/strategy/worker'); const AllStrategy = require('./lib/strategy/all'); +const Redis = require('ioredis'); module.exports = agent => { // register built-in strategy @@ -11,6 +12,12 @@ module.exports = agent => { // wait for other plugin to register custom strategy agent.beforeStart(() => { agent.schedule.init(); + if ( + agent.config.schedule?.cluster?.enable && + agent.config.schedule?.cluster?.redis + ) { + agent.redisClient = new Redis(agent.config.schedule.cluster.redis); + } }); // dispatch job finish event to strategy diff --git a/config/config.default.js b/config/config.default.js index 0fba4d8..8f9b6dc 100644 --- a/config/config.default.js +++ b/config/config.default.js @@ -13,6 +13,19 @@ module.exports = () => { config.schedule = { // custom additional directory, full path directory: [], + cluster: { + enable: false, + redis: { + client: { + port: 6379, // Redis port + host: '127.0.0.1', // Redis host + password: 'auth', + db: 0, + }, + }, + }, + default: 'default', + prefix: 'schedule', }; return config; diff --git a/lib/strategy/all.js b/lib/strategy/all.js index 1b7b7df..c3ee377 100644 --- a/lib/strategy/all.js +++ b/lib/strategy/all.js @@ -3,7 +3,33 @@ const Strategy = require('./timer'); module.exports = class AllStrategy extends Strategy { - handler() { - this.sendAll(); + async handler() { + let isLocked = false; + const curConfig = this.agent.config.schedule; + let lockedKey = ''; + if (curConfig?.cluster?.enable) { + this.logger.info('cluster mode'); + const projectName = + curConfig.default === 'default' + ? this.agent.baseDir.split('/').pop() + : curConfig.default; + const prefix = curConfig.prefix; + lockedKey = `${projectName}-${prefix}-${this.key.replace( + this.agent.baseDir, + '' + )}`; + if (await this.agent.redisClient.get(lockedKey)) { + isLocked = true; + } + await this.agent.redisClient.set(lockedKey, true); + } + + if (!isLocked) { + this.sendAll(); + } + + if (curConfig.cluster.enable) { + await this.agent.redisClient.del(lockedKey); + } } }; diff --git a/lib/strategy/worker.js b/lib/strategy/worker.js index 5b83024..f0a4ea2 100644 --- a/lib/strategy/worker.js +++ b/lib/strategy/worker.js @@ -3,7 +3,33 @@ const Strategy = require('./timer'); module.exports = class WorkerStrategy extends Strategy { - handler() { - this.sendOne(); + async handler() { + let isLocked = false; + const curConfig = this.agent?.config?.schedule; + let lockedKey = ''; + if (curConfig?.cluster?.enable) { + this.logger.info('cluster mode'); + const projectName = + curConfig.default === 'default' + ? this.agent.baseDir.split('/').pop() + : curConfig.default; + const prefix = curConfig.prefix; + lockedKey = `${projectName}-${prefix}-${this.key.replace( + this.agent.baseDir, + '' + )}`; + if (await this.agent.redisClient.get(lockedKey)) { + isLocked = true; + } + await this.agent.redisClient.set(lockedKey, true); + } + + if (!isLocked) { + this.sendOne(); + } + + if (curConfig.cluster.enable) { + await this.agent.redisClient.del(lockedKey); + } } }; diff --git a/package.json b/package.json index b5de675..e69f83d 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "dependencies": { "cron-parser": "^2.16.3", "humanize-ms": "^1.2.1", + "ioredis": "^5.4.1", "is-type-of": "^1.2.1", "safe-timers": "^1.1.0", "utility": "^1.16.3" From 4fb1881d8ff07f05f19c3cd93255b6976e6d93fc Mon Sep 17 00:00:00 2001 From: "Brian,Kun Liu" Date: Wed, 13 Nov 2024 02:44:26 +0800 Subject: [PATCH 2/9] docs: add readme for the new feature --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index 76069bf..2a2909a 100644 --- a/README.md +++ b/README.md @@ -261,6 +261,22 @@ config.schedule = { directory: [ path.join(__dirname, '../app/otherSchedule'), ], + // add it when making sure it only running in one cluster. + cluster: { + enable: true, + // add redis for redis lock + redis: { + client: { + port: 6379, // Redis port + host: '127.0.0.1', // Redis host + password: 'auth', + db: 0, + }, + }, + }, + //the prefix for lockedKey for redis lock + default: 'default', // default schedule name,like project-name. + prefix: 'schedule', // default schedule prefix }; ``` From cf4792b0b0bf408247500d84fa68a40be8a4c5d6 Mon Sep 17 00:00:00 2001 From: "Brian,Kun Liu" Date: Wed, 13 Nov 2024 02:56:29 +0800 Subject: [PATCH 3/9] ci: npm test problem in node 14 --- package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index e69f83d..6e03212 100644 --- a/package.json +++ b/package.json @@ -37,7 +37,8 @@ "egg-mock": "^5.3.0", "egg-tracer": "^1.1.0", "eslint": "^8.29.0", - "eslint-config-egg": "^12.1.0" + "eslint-config-egg": "^12.1.0", + "mocha": "^10.8.2" }, "engines": { "node": ">=14.17.0" From 190a189a693557ff81def4fc43f630ad00adc85a Mon Sep 17 00:00:00 2001 From: "Brian,Kun Liu" Date: Wed, 13 Nov 2024 03:22:01 +0800 Subject: [PATCH 4/9] fix: the potential error --- lib/strategy/all.js | 2 +- lib/strategy/worker.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/strategy/all.js b/lib/strategy/all.js index c3ee377..f77a4de 100644 --- a/lib/strategy/all.js +++ b/lib/strategy/all.js @@ -28,7 +28,7 @@ module.exports = class AllStrategy extends Strategy { this.sendAll(); } - if (curConfig.cluster.enable) { + if (curConfig.cluster?.enable) { await this.agent.redisClient.del(lockedKey); } } diff --git a/lib/strategy/worker.js b/lib/strategy/worker.js index f0a4ea2..937eb44 100644 --- a/lib/strategy/worker.js +++ b/lib/strategy/worker.js @@ -28,7 +28,7 @@ module.exports = class WorkerStrategy extends Strategy { this.sendOne(); } - if (curConfig.cluster.enable) { + if (curConfig.cluster?.enable) { await this.agent.redisClient.del(lockedKey); } } From d73517d06c15374060ea076e4641b0637277c855 Mon Sep 17 00:00:00 2001 From: "Brian,Kun Liu" Date: Wed, 13 Nov 2024 03:24:43 +0800 Subject: [PATCH 5/9] fix: occupied system --- lib/strategy/worker.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/strategy/worker.js b/lib/strategy/worker.js index 937eb44..62a647e 100644 --- a/lib/strategy/worker.js +++ b/lib/strategy/worker.js @@ -20,8 +20,9 @@ module.exports = class WorkerStrategy extends Strategy { )}`; if (await this.agent.redisClient.get(lockedKey)) { isLocked = true; + } else { + await this.agent.redisClient.set(lockedKey, true); } - await this.agent.redisClient.set(lockedKey, true); } if (!isLocked) { From 201b1d0f099f4e09b881d75228bc4e2aec93d4b2 Mon Sep 17 00:00:00 2001 From: "Brian,Kun Liu" Date: Wed, 13 Nov 2024 03:28:35 +0800 Subject: [PATCH 6/9] perf: use path.basename to get projectname --- lib/strategy/all.js | 5 +++-- lib/strategy/worker.js | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/strategy/all.js b/lib/strategy/all.js index f77a4de..4d65883 100644 --- a/lib/strategy/all.js +++ b/lib/strategy/all.js @@ -1,6 +1,7 @@ 'use strict'; const Strategy = require('./timer'); +const path = require('path'); module.exports = class AllStrategy extends Strategy { async handler() { @@ -10,8 +11,8 @@ module.exports = class AllStrategy extends Strategy { if (curConfig?.cluster?.enable) { this.logger.info('cluster mode'); const projectName = - curConfig.default === 'default' - ? this.agent.baseDir.split('/').pop() + curConfig.default === "default" + ? path.basename(this.agent.baseDir) : curConfig.default; const prefix = curConfig.prefix; lockedKey = `${projectName}-${prefix}-${this.key.replace( diff --git a/lib/strategy/worker.js b/lib/strategy/worker.js index 62a647e..e8d950a 100644 --- a/lib/strategy/worker.js +++ b/lib/strategy/worker.js @@ -1,6 +1,7 @@ 'use strict'; const Strategy = require('./timer'); +const path = require('path'); module.exports = class WorkerStrategy extends Strategy { async handler() { @@ -11,7 +12,7 @@ module.exports = class WorkerStrategy extends Strategy { this.logger.info('cluster mode'); const projectName = curConfig.default === 'default' - ? this.agent.baseDir.split('/').pop() + ? path.basename(this.agent.baseDir) : curConfig.default; const prefix = curConfig.prefix; lockedKey = `${projectName}-${prefix}-${this.key.replace( From aecee0a9f6c75fae404d627421fd4285d05d65ce Mon Sep 17 00:00:00 2001 From: "Brian,Kun Liu" Date: Sun, 17 Nov 2024 03:50:29 +0800 Subject: [PATCH 7/9] feat: -1.abstract base lock for other kinds of locks.2. implement redisLock of Lock with some security methods --- agent.js | 14 ++++---- config/config.default.js | 2 ++ lib/lock/base.js | 38 ++++++++++++++++++++++ lib/lock/redis_lock.js | 70 ++++++++++++++++++++++++++++++++++++++++ lib/strategy/all.js | 27 +++++----------- lib/strategy/worker.js | 28 +++++----------- 6 files changed, 134 insertions(+), 45 deletions(-) create mode 100644 lib/lock/base.js create mode 100644 lib/lock/redis_lock.js diff --git a/agent.js b/agent.js index c430c11..d8cf2b3 100644 --- a/agent.js +++ b/agent.js @@ -2,7 +2,7 @@ const WorkerStrategy = require('./lib/strategy/worker'); const AllStrategy = require('./lib/strategy/all'); -const Redis = require('ioredis'); +const RedisLock = require('./lib/lock/redis_lock'); module.exports = agent => { // register built-in strategy @@ -12,11 +12,13 @@ module.exports = agent => { // wait for other plugin to register custom strategy agent.beforeStart(() => { agent.schedule.init(); - if ( - agent.config.schedule?.cluster?.enable && - agent.config.schedule?.cluster?.redis - ) { - agent.redisClient = new Redis(agent.config.schedule.cluster.redis); + if (agent?.config?.schedule?.cluster?.enable) { + if ( + agent.config.schedule.cluster.lockType === 'redis' && + agent.config.schedule.cluster.redis + ) { + agent.lockManager = new RedisLock(agent); + } } }); diff --git a/config/config.default.js b/config/config.default.js index 8f9b6dc..f598c6c 100644 --- a/config/config.default.js +++ b/config/config.default.js @@ -15,6 +15,8 @@ module.exports = () => { directory: [], cluster: { enable: false, + lockType: 'redis', + lockedTtl: 60000, redis: { client: { port: 6379, // Redis port diff --git a/lib/lock/base.js b/lib/lock/base.js new file mode 100644 index 0000000..503012a --- /dev/null +++ b/lib/lock/base.js @@ -0,0 +1,38 @@ +'use strict'; + +module.exports = class LockManager { + constructor(agent) { + this.agent = agent; + this.client = null; + this.options = null; + this.prefixKey = `${this.agent?.config?.schedule?.default === 'default' ? this.agent.name : this.agent?.config?.schedule?.default}-${this.agent?.config?.schedule?.prefix}`; + } + + /** + * Require a lock from lock manager + * + * @param {string} _lockedKey - The key to lock + */ + async acquire() { + // Implementation here + } + + /** + * Release a lock from lock manager + * + * @param {string} _lockedKey - The key to unlock + */ + async release() { + // Implementation here + } + + /** + * Try to acquire without waiting + * + * @param {string} _lockedKey - The key to try to lock + */ + async tryAcquire() { + // Use _lockedKey in the implementation + // Implementation here + } +}; diff --git a/lib/lock/redis_lock.js b/lib/lock/redis_lock.js new file mode 100644 index 0000000..cc14d60 --- /dev/null +++ b/lib/lock/redis_lock.js @@ -0,0 +1,70 @@ +'use strict'; +const LockManager = require('./base'); +const Redis = require('ioredis'); + +module.exports = class RedisLock extends LockManager { + constructor(agent) { + super(agent); + this.client = new Redis(this.options); + } + + /** + * Acquire a lock with waiting time + * @param {string} lockedKey - The key to be locked + * @param {number} expiredTime - The duration in milliseconds for which the lock should be expired automatically + */ + async acquire( + lockedKey, + expiredTime = this.agent.config.schedule.cluster.lockedTtl + ) { + // Try again during 5s when it's locked + const start = Date.now(); + while (Date.now() - start < expiredTime) { + if (await this.tryAcquire(lockedKey, expiredTime)) { + return true; + } + // Set random sleep time to avoid lock conflicts, random between 0.1s and 1s + const randomSleepTime = Math.random() * 900 + 100; + await new Promise(resolve => setTimeout(resolve, randomSleepTime)); + } + return false; + } + + /** + * Release a lock from lock manager + * @param {string} lockedKey - The key to be unlocked */ + async release(lockedKey) { + try { + lockedKey = `${this.prefixKey}-${lockedKey}`; + await this.client.del(lockedKey); + } catch (err) { + this.logger.error( + `[egg-schedule] ${this.key} release lock error: ${err.message}` + ); + } + } + + /** + * Try to acquire immediately without waiting + * @param {string} lockedKey + * @param {number} expiredTime + */ + async tryAcquire( + lockedKey, + expiredTime = this.agent.config.schedule.cluster.lockedTtl + ) { + try { + lockedKey = `${this.prefixKey}-${lockedKey}`; + if (await this.client.get(lockedKey)) { + return false; + } + await this.client.set(lockedKey, true, 'PX', expiredTime); + return true; + } catch (err) { + this.logger.error( + `[egg-schedule] ${this.key} try acquire lock error: ${err.message}` + ); + return false; + } + } +}; diff --git a/lib/strategy/all.js b/lib/strategy/all.js index 4d65883..0446872 100644 --- a/lib/strategy/all.js +++ b/lib/strategy/all.js @@ -1,36 +1,25 @@ 'use strict'; const Strategy = require('./timer'); -const path = require('path'); module.exports = class AllStrategy extends Strategy { async handler() { - let isLocked = false; - const curConfig = this.agent.config.schedule; + let canBeLocked = true; + const curConfig = this.agent?.config?.schedule; let lockedKey = ''; if (curConfig?.cluster?.enable) { - this.logger.info('cluster mode'); - const projectName = - curConfig.default === "default" - ? path.basename(this.agent.baseDir) - : curConfig.default; - const prefix = curConfig.prefix; - lockedKey = `${projectName}-${prefix}-${this.key.replace( - this.agent.baseDir, - '' - )}`; - if (await this.agent.redisClient.get(lockedKey)) { - isLocked = true; + lockedKey = this.key.replace(this.agent.baseDir, ''); + if (!(await this.agent.lockManager.tryAcquire(lockedKey))) { + canBeLocked = false; } - await this.agent.redisClient.set(lockedKey, true); } - if (!isLocked) { + if (canBeLocked) { this.sendAll(); } - if (curConfig.cluster?.enable) { - await this.agent.redisClient.del(lockedKey); + if (curConfig?.cluster?.enable) { + await this.agent.lockManager.release(lockedKey); } } }; diff --git a/lib/strategy/worker.js b/lib/strategy/worker.js index e8d950a..b59e9f6 100644 --- a/lib/strategy/worker.js +++ b/lib/strategy/worker.js @@ -1,37 +1,25 @@ 'use strict'; const Strategy = require('./timer'); -const path = require('path'); -module.exports = class WorkerStrategy extends Strategy { +module.exports = class AllStrategy extends Strategy { async handler() { - let isLocked = false; + let canBeLocked = true; const curConfig = this.agent?.config?.schedule; let lockedKey = ''; if (curConfig?.cluster?.enable) { - this.logger.info('cluster mode'); - const projectName = - curConfig.default === 'default' - ? path.basename(this.agent.baseDir) - : curConfig.default; - const prefix = curConfig.prefix; - lockedKey = `${projectName}-${prefix}-${this.key.replace( - this.agent.baseDir, - '' - )}`; - if (await this.agent.redisClient.get(lockedKey)) { - isLocked = true; - } else { - await this.agent.redisClient.set(lockedKey, true); + lockedKey = this.key.replace(this.agent.baseDir, ''); + if (!(await this.agent.lockManager.tryAcquire(lockedKey))) { + canBeLocked = false; } } - if (!isLocked) { + if (canBeLocked) { this.sendOne(); } - if (curConfig.cluster?.enable) { - await this.agent.redisClient.del(lockedKey); + if (curConfig?.cluster?.enable) { + await this.agent.lockManager.release(lockedKey); } } }; From 2f46c2c5947fdd74d334b8da9ddfd826d7a2ebc5 Mon Sep 17 00:00:00 2001 From: "Brian,Kun Liu" Date: Sun, 17 Nov 2024 03:50:59 +0800 Subject: [PATCH 8/9] docs: add usage --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 2a2909a..3bdc566 100644 --- a/README.md +++ b/README.md @@ -263,7 +263,9 @@ config.schedule = { ], // add it when making sure it only running in one cluster. cluster: { - enable: true, + enable: false, // default is false, true is for running + lockType: 'redis', //use redis as lock. reserve for other lock type + lockedTtl: 6000, // the automatic expired time for lock, avoid lock leak // add redis for redis lock redis: { client: { From 176ec2c503611ce281e691c6d865c34bd2530005 Mon Sep 17 00:00:00 2001 From: "Brian,Kun Liu" Date: Sun, 17 Nov 2024 04:08:55 +0800 Subject: [PATCH 9/9] perf: enhanced security with AI suggestions --- README.md | 2 +- lib/strategy/all.js | 3 ++- lib/strategy/worker.js | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 3bdc566..291e324 100644 --- a/README.md +++ b/README.md @@ -276,7 +276,7 @@ config.schedule = { }, }, }, - //the prefix for lockedKey for redis lock + // The final Redis key pattern will be: `${prefix}-${default}-${scheduleName}` default: 'default', // default schedule name,like project-name. prefix: 'schedule', // default schedule prefix }; diff --git a/lib/strategy/all.js b/lib/strategy/all.js index 0446872..5280f46 100644 --- a/lib/strategy/all.js +++ b/lib/strategy/all.js @@ -1,6 +1,7 @@ 'use strict'; const Strategy = require('./timer'); +const path = require('path'); module.exports = class AllStrategy extends Strategy { async handler() { @@ -8,7 +9,7 @@ module.exports = class AllStrategy extends Strategy { const curConfig = this.agent?.config?.schedule; let lockedKey = ''; if (curConfig?.cluster?.enable) { - lockedKey = this.key.replace(this.agent.baseDir, ''); + lockedKey = path.relative(this.agent.baseDir, this.key); if (!(await this.agent.lockManager.tryAcquire(lockedKey))) { canBeLocked = false; } diff --git a/lib/strategy/worker.js b/lib/strategy/worker.js index b59e9f6..1c7552d 100644 --- a/lib/strategy/worker.js +++ b/lib/strategy/worker.js @@ -1,6 +1,7 @@ 'use strict'; const Strategy = require('./timer'); +const path = require('path'); module.exports = class AllStrategy extends Strategy { async handler() { @@ -8,7 +9,7 @@ module.exports = class AllStrategy extends Strategy { const curConfig = this.agent?.config?.schedule; let lockedKey = ''; if (curConfig?.cluster?.enable) { - lockedKey = this.key.replace(this.agent.baseDir, ''); + lockedKey = path.relative(this.agent.baseDir, this.key); if (!(await this.agent.lockManager.tryAcquire(lockedKey))) { canBeLocked = false; }