diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index ae957e7f..5c92c94d 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -5,6 +5,7 @@ import ( "math/rand" "time" + "errors" "github.com/gomodule/redigo/redis" "github.com/robfig/cron/v3" ) @@ -15,12 +16,13 @@ const ( ) type periodicEnqueuer struct { - namespace string - pool *redis.Pool - periodicJobs []*periodicJob - scheduledPeriodicJobs []*scheduledPeriodicJob - stopChan chan struct{} - doneStoppingChan chan struct{} + namespace string + pool *redis.Pool + periodicJobs []*periodicJob + scheduledPeriodicJobs []*scheduledPeriodicJob + stopChan chan struct{} + doneStoppingChan chan struct{} + periodicEnqueueUniqueInScript *redis.Script } type periodicJob struct { @@ -37,11 +39,12 @@ type scheduledPeriodicJob struct { func newPeriodicEnqueuer(namespace string, pool *redis.Pool, periodicJobs []*periodicJob) *periodicEnqueuer { return &periodicEnqueuer{ - namespace: namespace, - pool: pool, - periodicJobs: periodicJobs, - stopChan: make(chan struct{}), - doneStoppingChan: make(chan struct{}), + namespace: namespace, + pool: pool, + periodicJobs: periodicJobs, + stopChan: make(chan struct{}), + doneStoppingChan: make(chan struct{}), + periodicEnqueueUniqueInScript: redis.NewScript(2, redisLuaPeriodicEnqueueUniqueIn), } } @@ -110,9 +113,19 @@ func (pe *periodicEnqueuer) enqueue() error { return err } - _, err = conn.Do("ZADD", redisKeyScheduled(pe.namespace), epoch, rawJSON) + script := pe.periodicEnqueueUniqueInScript + res, err := redis.String(script.Do(conn, []interface{}{ + redisKeyScheduled(pe.namespace), + redisKeyPeriodicEnqueue(pe.namespace, id), + rawJSON, + epoch, + epoch - now, + }...)) + if err != nil { return err + } else if res == "dup" { + return errors.New("duplicate enqueue") } } } diff --git a/redis.go b/redis.go index 417eb481..f3f43094 100644 --- a/redis.go +++ b/redis.go @@ -72,6 +72,10 @@ func redisKeyJobsConcurrency(namespace, jobName string) string { return redisKeyJobs(namespace, jobName) + ":max_concurrency" } +func redisKeyPeriodicEnqueue(namespace, id string) string { + return redisNamespacePrefix(namespace) + id +} + func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) { var buf bytes.Buffer @@ -373,3 +377,16 @@ else end return 'dup' ` + +// KEYS[1] = scheduled job queue +// KEYS[2] = Unique job's key. Test for existence and set if we push. +// ARGV[1] = job +// ARGV[2] = epoch seconds for job to be run at +// ARGV[3] = seconds key should be alive for +var redisLuaPeriodicEnqueueUniqueIn = ` +if redis.call('set', KEYS[2], 1, 'NX', 'EX', ARGV[3]) then + redis.call('zadd', KEYS[1], ARGV[2], ARGV[1]) + return 'ok' +end +return 'dup' +`