-
Notifications
You must be signed in to change notification settings - Fork 19
feat: support schedule only executes once in random one cluster when it's deployed in k8s clusters mode. #61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
6ec7290
4fb1881
cf4792b
190a189
d73517d
201b1d0
aecee0a
2f46c2c
176ec2c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| const WorkerStrategy = require('./lib/strategy/worker'); | ||
| const AllStrategy = require('./lib/strategy/all'); | ||
| const RedisLock = require('./lib/lock/redis_lock'); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider using a lock provider interface for better abstraction. Instead of directly importing Consider this approach: -const RedisLock = require('./lib/lock/redis_lock');
+const { createLockManager } = require('./lib/lock');And create a new file const RedisLock = require('./redis_lock');
const LOCK_PROVIDERS = {
redis: RedisLock,
// Add more providers here
};
exports.createLockManager = (type, agent) => {
const Provider = LOCK_PROVIDERS[type];
if (!Provider) {
throw new Error(`Unsupported lock type: ${type}`);
}
return new Provider(agent);
}; |
||
|
|
||
| module.exports = agent => { | ||
| // register built-in strategy | ||
|
|
@@ -11,6 +12,14 @@ module.exports = agent => { | |
| // wait for other plugin to register custom strategy | ||
| agent.beforeStart(() => { | ||
| agent.schedule.init(); | ||
| if (agent?.config?.schedule?.cluster?.enable) { | ||
| if ( | ||
| agent.config.schedule.cluster.lockType === 'redis' && | ||
| agent.config.schedule.cluster.redis | ||
| ) { | ||
| agent.lockManager = new RedisLock(agent); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Based on the gathered information, I can now provide the final response: Lock manager implementation needs error handling improvements The lock manager usage in strategy files (worker.js and all.js) shows:
Required improvements:
🔗 Analysis chainVerify lock manager usage in strategy files. Ensure that the lock manager is properly used in strategy files with appropriate error handling and lock release. Let's verify the implementation in strategy files: Let me gather more information about error handling patterns with a different approach. Let me try one more search to check for error handling with a corrected pattern. 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Check lock manager usage in strategy files
# Check lock acquisition and release patterns in strategy files
echo "Checking lock patterns in strategy files:"
rg -A 10 "lockManager" "lib/strategy/"
# Check error handling patterns
echo "\nChecking error handling patterns:"
ast-grep --pattern 'try {
$$$
this.agent.lockManager
$$$
} catch'
Length of output: 1427 Script: #!/bin/bash
# Check the complete implementation context around lock manager usage
# Get broader context of lock usage in worker.js
echo "Lock usage context in worker.js:"
rg -B 5 -A 15 "lockManager" "lib/strategy/worker.js"
# Get broader context of lock usage in all.js
echo "\nLock usage context in all.js:"
rg -B 5 -A 15 "lockManager" "lib/strategy/all.js"
# Look for try-catch blocks in these files
echo "\nTry-catch blocks in strategy files:"
rg -B 2 -A 5 "try\s*{" "lib/strategy/"
Length of output: 1390 Script: #!/bin/bash
# Check for error handling around the lock manager usage
# Look for try-catch blocks in these files with fixed pattern
echo "Try-catch blocks in strategy files:"
rg -B 2 -A 5 "try {" "lib/strategy/"
# Check if there's any error handling in the parent function
echo "\nFunction context for lock operations:"
ast-grep --pattern 'async function $_($$$) {
$$$
lockManager
$$$
}'
Length of output: 356 |
||
| } | ||
| } | ||
| }); | ||
|
|
||
| // dispatch job finish event to strategy | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,38 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'use strict'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| module.exports = class LockManager { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| constructor(agent) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.agent = agent; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.client = null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.options = null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.prefixKey = `${this.agent?.config?.schedule?.default === 'default' ? this.agent.name : this.agent?.config?.schedule?.default}-${this.agent?.config?.schedule?.prefix}`; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+4
to
+9
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Improve constructor's type safety and validation The constructor needs several improvements:
Consider this improved implementation: +/**
+ * @typedef {Object} Agent
+ * @property {Object} config
+ * @property {Object} config.schedule
+ * @property {string} config.schedule.default
+ * @property {string} config.schedule.prefix
+ * @property {string} name
+ */
+/**
+ * Base class for lock management
+ */
module.exports = class LockManager {
+ /**
+ * @param {Agent} agent - The agent instance
+ * @throws {Error} If agent or required configuration is missing
+ */
constructor(agent) {
+ if (!agent) {
+ throw new Error('Agent is required');
+ }
this.agent = agent;
this.client = null;
this.options = null;
- this.prefixKey = `${this.agent?.config?.schedule?.default === 'default' ? this.agent.name : this.agent?.config?.schedule?.default}-${this.agent?.config?.schedule?.prefix}`;
+ const scheduleConfig = agent.config?.schedule;
+ if (!scheduleConfig?.prefix) {
+ throw new Error('Schedule prefix configuration is required');
+ }
+ const defaultValue = scheduleConfig.default || 'default';
+ const prefix = defaultValue === 'default' ? agent.name : defaultValue;
+ this.prefixKey = `${prefix}-${scheduleConfig.prefix}`;
}📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Require a lock from lock manager | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {string} _lockedKey - The key to lock | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Check warning on line 14 in lib/lock/base.js
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async acquire() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Implementation here | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Release a lock from lock manager | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {string} _lockedKey - The key to unlock | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Check warning on line 23 in lib/lock/base.js
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async release() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Implementation here | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Try to acquire without waiting | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {string} _lockedKey - The key to try to lock | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Check warning on line 32 in lib/lock/base.js
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async tryAcquire() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Use _lockedKey in the implementation | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Implementation here | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+11
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance abstract methods documentation and contract The abstract methods need improvements:
Consider this improved implementation: /**
* Require a lock from lock manager
*
- * @param {string} _lockedKey - The key to lock
+ * @param {string} lockedKey - The key to lock
+ * @returns {Promise<boolean>} True if lock was acquired, false otherwise
+ * @throws {Error} If lock acquisition fails
+ * @abstract
*/
- async acquire() {
- // Implementation here
- }
+ async acquire(lockedKey) {
+ throw new Error('Method not implemented');
+ }
/**
* Release a lock from lock manager
*
- * @param {string} _lockedKey - The key to unlock
+ * @param {string} lockedKey - The key to unlock
+ * @returns {Promise<void>}
+ * @throws {Error} If lock release fails
+ * @abstract
*/
- async release() {
- // Implementation here
- }
+ async release(lockedKey) {
+ throw new Error('Method not implemented');
+ }
/**
* Try to acquire without waiting
*
- * @param {string} _lockedKey - The key to try to lock
+ * @param {string} lockedKey - The key to try to lock
+ * @returns {Promise<boolean>} True if lock was acquired, false if already locked
+ * @throws {Error} If lock attempt fails
+ * @abstract
*/
- async tryAcquire() {
- // Use _lockedKey in the implementation
- // Implementation here
- }
+ async tryAcquire(lockedKey) {
+ throw new Error('Method not implemented');
+ }Additionally, consider adding implementation guidance in the class JSDoc: /**
* Base class for lock management implementations.
*
* Implementations must:
* 1. Handle distributed locking mechanism
* 2. Ensure atomic lock operations
* 3. Handle lock expiration
* 4. Implement proper error handling
*
* @abstract
*/ |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| 'use strict'; | ||
| const LockManager = require('./base'); | ||
| const Redis = require('ioredis'); | ||
|
|
||
| module.exports = class RedisLock extends LockManager { | ||
| constructor(agent) { | ||
| super(agent); | ||
| this.client = new Redis(this.options); | ||
| } | ||
|
|
||
| /** | ||
| * Acquire a lock with waiting time | ||
| * @param {string} lockedKey - The key to be locked | ||
| * @param {number} expiredTime - The duration in milliseconds for which the lock should be expired automatically | ||
| */ | ||
| async acquire( | ||
| lockedKey, | ||
| expiredTime = this.agent.config.schedule.cluster.lockedTtl | ||
| ) { | ||
| // Try again during 5s when it's locked | ||
| const start = Date.now(); | ||
| while (Date.now() - start < expiredTime) { | ||
| if (await this.tryAcquire(lockedKey, expiredTime)) { | ||
| return true; | ||
| } | ||
| // Set random sleep time to avoid lock conflicts, random between 0.1s and 1s | ||
| const randomSleepTime = Math.random() * 900 + 100; | ||
| await new Promise(resolve => setTimeout(resolve, randomSleepTime)); | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| /** | ||
| * Release a lock from lock manager | ||
| * @param {string} lockedKey - The key to be unlocked */ | ||
| async release(lockedKey) { | ||
| try { | ||
| lockedKey = `${this.prefixKey}-${lockedKey}`; | ||
| await this.client.del(lockedKey); | ||
| } catch (err) { | ||
| this.logger.error( | ||
| `[egg-schedule] ${this.key} release lock error: ${err.message}` | ||
| ); | ||
| } | ||
| } | ||
|
Comment on lines
+37
to
+45
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure locks are released only by the owner Currently, the One approach is to store a unique identifier (e.g., a UUID) when setting the lock and check this identifier when releasing the lock. Example: // In tryAcquire
const lockValue = `${this.key}-${Date.now()}`;
const result = await this.client.set(lockedKey, lockValue, 'PX', expiredTime, 'NX');
// In release
async release(lockedKey) {
try {
lockedKey = `${this.prefixKey}-${lockedKey}`;
const lockValue = await this.client.get(lockedKey);
if (lockValue === this.lockValue) {
await this.client.del(lockedKey);
}
} catch (err) {
this.logger.error(
`[egg-schedule] ${this.key} release lock error: ${err.message}`
);
}
}This ensures that a lock is only released by the process that acquired it. |
||
|
|
||
| /** | ||
| * Try to acquire immediately without waiting | ||
| * @param {string} lockedKey | ||
|
Check warning on line 49 in lib/lock/redis_lock.js
|
||
| * @param {number} expiredTime | ||
|
Check warning on line 50 in lib/lock/redis_lock.js
|
||
| */ | ||
| async tryAcquire( | ||
| lockedKey, | ||
| expiredTime = this.agent.config.schedule.cluster.lockedTtl | ||
| ) { | ||
| try { | ||
| lockedKey = `${this.prefixKey}-${lockedKey}`; | ||
| if (await this.client.get(lockedKey)) { | ||
| return false; | ||
| } | ||
| await this.client.set(lockedKey, true, 'PX', expiredTime); | ||
| return true; | ||
| } catch (err) { | ||
oneWalker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| this.logger.error( | ||
| `[egg-schedule] ${this.key} try acquire lock error: ${err.message}` | ||
| ); | ||
| return false; | ||
| } | ||
| } | ||
| }; | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,9 +1,26 @@ | ||||||||||||||||||||||||||
| 'use strict'; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| const Strategy = require('./timer'); | ||||||||||||||||||||||||||
| const path = require('path'); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| module.exports = class AllStrategy extends Strategy { | ||||||||||||||||||||||||||
| handler() { | ||||||||||||||||||||||||||
| this.sendAll(); | ||||||||||||||||||||||||||
| async handler() { | ||||||||||||||||||||||||||
| let canBeLocked = true; | ||||||||||||||||||||||||||
| const curConfig = this.agent?.config?.schedule; | ||||||||||||||||||||||||||
| let lockedKey = ''; | ||||||||||||||||||||||||||
| if (curConfig?.cluster?.enable) { | ||||||||||||||||||||||||||
| lockedKey = path.relative(this.agent.baseDir, this.key); | ||||||||||||||||||||||||||
| if (!(await this.agent.lockManager.tryAcquire(lockedKey))) { | ||||||||||||||||||||||||||
| canBeLocked = false; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+13
to
+15
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling and timeout for lock acquisition. The lock acquisition needs proper error handling and a timeout to prevent indefinite waiting. Consider adding try-catch and timeout: - if (!(await this.agent.lockManager.tryAcquire(lockedKey))) {
- canBeLocked = false;
- }
+ try {
+ const timeout = 5000; // 5 seconds timeout
+ if (!(await this.agent.lockManager.tryAcquire(lockedKey, timeout))) {
+ canBeLocked = false;
+ }
+ } catch (error) {
+ this.agent.logger.error('[egg-schedule] Failed to acquire lock:', error);
+ canBeLocked = false;
+ }📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (canBeLocked) { | ||||||||||||||||||||||||||
| this.sendAll(); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+18
to
+20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling and make execution asynchronous. The execution block needs proper error handling and should handle potential async operations in Consider adding try-catch and making it async: if (canBeLocked) {
- this.sendAll();
+ try {
+ await this.sendAll();
+ } catch (error) {
+ this.agent.logger.error('[egg-schedule] Failed to execute scheduled task:', error);
+ throw error; // Re-throw to ensure lock is released in the finally block
+ }
}
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (curConfig?.cluster?.enable) { | ||||||||||||||||||||||||||
| await this.agent.lockManager.release(lockedKey); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+22
to
+24
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure lock release with proper error handling. The lock release should be in a finally block and include error handling to ensure proper cleanup. Restructure the code: - if (curConfig?.cluster?.enable) {
- await this.agent.lockManager.release(lockedKey);
- }
+ try {
+ if (canBeLocked) {
+ await this.sendAll();
+ }
+ } finally {
+ if (curConfig?.cluster?.enable) {
+ try {
+ await this.agent.lockManager.release(lockedKey);
+ } catch (error) {
+ this.agent.logger.error('[egg-schedule] Failed to release lock:', error);
+ }
+ }
+ }
|
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,9 +1,26 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'use strict'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const Strategy = require('./timer'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const path = require('path'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| module.exports = class WorkerStrategy extends Strategy { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| handler() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.sendOne(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| module.exports = class AllStrategy extends Strategy { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async handler() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let canBeLocked = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const curConfig = this.agent?.config?.schedule; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let lockedKey = ''; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (curConfig?.cluster?.enable) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lockedKey = path.relative(this.agent.baseDir, this.key); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!(await this.agent.lockManager.tryAcquire(lockedKey))) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| canBeLocked = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+11
to
+16
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance lock acquisition robustness and error handling The lock acquisition logic needs improvements in several areas:
Apply this diff: if (curConfig?.cluster?.enable) {
- lockedKey = path.relative(this.agent.baseDir, this.key);
- if (!(await this.agent.lockManager.tryAcquire(lockedKey))) {
- canBeLocked = false;
+ try {
+ // Ensure safe lock key construction
+ lockedKey = path.relative(this.agent.baseDir, this.key)
+ .replace(/[^a-zA-Z0-9]/g, '_'); // sanitize key
+
+ // Add TTL to prevent stale locks
+ if (!(await this.agent.lockManager.tryAcquire(lockedKey, { ttl: 60000 }))) {
+ canBeLocked = false;
+ }
+ } catch (error) {
+ this.agent.logger.error('[egg-schedule] Failed to acquire lock:', error);
+ canBeLocked = false;
}
}📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (canBeLocked) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.sendOne(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+18
to
+20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling and ensure proper async execution The task execution block lacks error handling and might not properly handle async operations. Apply this diff: if (canBeLocked) {
- this.sendOne();
+ try {
+ await this.sendOne();
+ } catch (error) {
+ this.agent.logger.error('[egg-schedule] Task execution failed:', error);
+ throw error; // Re-throw to trigger lock release in the finally block
+ }
}📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (curConfig?.cluster?.enable) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await this.agent.lockManager.release(lockedKey); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+22
to
+24
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure lock release with proper error handling The lock release should be:
Apply this diff to restructure the entire handler method: async handler() {
let canBeLocked = true;
const curConfig = this.agent?.config?.schedule;
let lockedKey = '';
+ let lockAcquired = false;
if (curConfig?.cluster?.enable) {
try {
lockedKey = path.relative(this.agent.baseDir, this.key);
if (await this.agent.lockManager.tryAcquire(lockedKey)) {
+ lockAcquired = true;
+ } else {
canBeLocked = false;
}
+ } catch (error) {
+ this.agent.logger.error('[egg-schedule] Failed to acquire lock:', error);
+ canBeLocked = false;
+ }
}
- if (canBeLocked) {
- this.sendOne();
- }
-
- if (curConfig?.cluster?.enable) {
- await this.agent.lockManager.release(lockedKey);
+ try {
+ if (canBeLocked) {
+ await this.sendOne();
+ }
+ } finally {
+ if (curConfig?.cluster?.enable && lockAcquired) {
+ try {
+ await this.agent.lockManager.release(lockedKey);
+ } catch (error) {
+ this.agent.logger.error('[egg-schedule] Failed to release lock:', error);
+ }
+ }
}
}📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |||||
| "dependencies": { | ||||||
| "cron-parser": "^2.16.3", | ||||||
| "humanize-ms": "^1.2.1", | ||||||
| "ioredis": "^5.4.1", | ||||||
| "is-type-of": "^1.2.1", | ||||||
| "safe-timers": "^1.1.0", | ||||||
| "utility": "^1.16.3" | ||||||
|
|
@@ -36,7 +37,8 @@ | |||||
| "egg-mock": "^5.3.0", | ||||||
| "egg-tracer": "^1.1.0", | ||||||
| "eslint": "^8.29.0", | ||||||
| "eslint-config-egg": "^12.1.0" | ||||||
| "eslint-config-egg": "^12.1.0", | ||||||
| "mocha": "^10.8.2" | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix incorrect mocha version number The specified mocha version Apply this diff to fix the version: - "mocha": "^10.8.2"
+ "mocha": "^10.2.0"Additionally, since the project already uses 📝 Committable suggestion
Suggested change
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It won't pass the ci in node 14 env without mocha module. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @oneWalker, thank you for the clarification. I've noted that the CI in Node 14 environments requires the ✏️ Learnings added
|
||||||
| }, | ||||||
| "engines": { | ||||||
| "node": ">=14.17.0" | ||||||
|
|
||||||
Uh oh!
There was an error while loading. Please reload this page.