From 9f6b78baff3793885b8316bedd735ae1c97947ac Mon Sep 17 00:00:00 2001 From: Harendra Chhekur <820121223505e@gmail.com> Date: Tue, 12 Jul 2022 18:01:31 +0530 Subject: [PATCH 1/3] :zap: [PERIODIC ENQUEUER] Add: unique check --- periodic_enqueuer.go | 34 ++++++++++++++++++++++------------ redis.go | 13 +++++++++++++ 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index ae957e7f..e17a46d2 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -15,12 +15,13 @@ const ( ) type periodicEnqueuer struct { - namespace string - pool *redis.Pool - periodicJobs []*periodicJob - scheduledPeriodicJobs []*scheduledPeriodicJob - stopChan chan struct{} - doneStoppingChan chan struct{} + namespace string + pool *redis.Pool + periodicJobs []*periodicJob + scheduledPeriodicJobs []*scheduledPeriodicJob + stopChan chan struct{} + doneStoppingChan chan struct{} + periodicEnqueueUniqueInScript *redis.Script } type periodicJob struct { @@ -37,11 +38,12 @@ type scheduledPeriodicJob struct { func newPeriodicEnqueuer(namespace string, pool *redis.Pool, periodicJobs []*periodicJob) *periodicEnqueuer { return &periodicEnqueuer{ - namespace: namespace, - pool: pool, - periodicJobs: periodicJobs, - stopChan: make(chan struct{}), - doneStoppingChan: make(chan struct{}), + namespace: namespace, + pool: pool, + periodicJobs: periodicJobs, + stopChan: make(chan struct{}), + doneStoppingChan: make(chan struct{}), + periodicEnqueueUniqueInScript: redis.NewScript(2, redisLuaPeriodicEnqueueUniqueIn), } } @@ -110,7 +112,15 @@ func (pe *periodicEnqueuer) enqueue() error { return err } - _, err = conn.Do("ZADD", redisKeyScheduled(pe.namespace), epoch, rawJSON) + script := pe.periodicEnqueueUniqueInScript + _, err = redis.String(script.Do(conn, []interface{}{ + redisKeyScheduled(pe.namespace), + id, + rawJSON, + epoch, + epoch - now, + }...)) + if err != nil { return err } diff --git a/redis.go b/redis.go index 417eb481..80dfec4f 100644 --- a/redis.go +++ b/redis.go @@ -373,3 +373,16 @@ else end return 'dup' ` + +// KEYS[1] = scheduled job queue +// KEYS[2] = Unique job's key. Test for existence and set if we push. +// ARGV[1] = job +// ARGV[2] = epoch seconds for job to be run at +// ARGV[3] = seconds key should be alive for +var redisLuaPeriodicEnqueueUniqueIn = ` +if redis.call('set', KEYS[2], 1, 'NX', 'EX', ARGV[3]) then + redis.call('zadd', KEYS[1], ARGV[2], ARGV[1]) + return 'ok' +end +return 'dup' +` From e9f40b3f5dc07b2a5fa420362f8a212c31905af0 Mon Sep 17 00:00:00 2001 From: Harendra Chhekur <820121223505e@gmail.com> Date: Tue, 12 Jul 2022 18:29:20 +0530 Subject: [PATCH 2/3] :pencil2: [PERIODIC ENQUEUE] Update: Key name --- periodic_enqueuer.go | 2 +- redis.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index e17a46d2..c63ee281 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -115,7 +115,7 @@ func (pe *periodicEnqueuer) enqueue() error { script := pe.periodicEnqueueUniqueInScript _, err = redis.String(script.Do(conn, []interface{}{ redisKeyScheduled(pe.namespace), - id, + redisKeyPeriodicEnqueue(pe.namespace, id), rawJSON, epoch, epoch - now, diff --git a/redis.go b/redis.go index 80dfec4f..f3f43094 100644 --- a/redis.go +++ b/redis.go @@ -72,6 +72,10 @@ func redisKeyJobsConcurrency(namespace, jobName string) string { return redisKeyJobs(namespace, jobName) + ":max_concurrency" } +func redisKeyPeriodicEnqueue(namespace, id string) string { + return redisNamespacePrefix(namespace) + id +} + func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) { var buf bytes.Buffer From 67032231bd1b19b9fc8619b3e6ad66fe238b22b8 Mon Sep 17 00:00:00 2001 From: Harendra Chhekur <820121223505e@gmail.com> Date: Tue, 12 Jul 2022 18:47:56 +0530 Subject: [PATCH 3/3] :goal_net: [PERIODIC ENQUEUE] Raise: Exception in case of duplicate enqueue --- periodic_enqueuer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index c63ee281..5c92c94d 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -5,6 +5,7 @@ import ( "math/rand" "time" + "errors" "github.com/gomodule/redigo/redis" "github.com/robfig/cron/v3" ) @@ -113,7 +114,7 @@ func (pe *periodicEnqueuer) enqueue() error { } script := pe.periodicEnqueueUniqueInScript - _, err = redis.String(script.Do(conn, []interface{}{ + res, err := redis.String(script.Do(conn, []interface{}{ redisKeyScheduled(pe.namespace), redisKeyPeriodicEnqueue(pe.namespace, id), rawJSON, @@ -123,6 +124,8 @@ func (pe *periodicEnqueuer) enqueue() error { if err != nil { return err + } else if res == "dup" { + return errors.New("duplicate enqueue") } } }