Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions periodic_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"time"

"errors"
"github.com/gomodule/redigo/redis"
"github.com/robfig/cron/v3"
)
Expand All @@ -15,12 +16,13 @@ const (
)

type periodicEnqueuer struct {
namespace string
pool *redis.Pool
periodicJobs []*periodicJob
scheduledPeriodicJobs []*scheduledPeriodicJob
stopChan chan struct{}
doneStoppingChan chan struct{}
namespace string
pool *redis.Pool
periodicJobs []*periodicJob
scheduledPeriodicJobs []*scheduledPeriodicJob
stopChan chan struct{}
doneStoppingChan chan struct{}
periodicEnqueueUniqueInScript *redis.Script
}

type periodicJob struct {
Expand All @@ -37,11 +39,12 @@ type scheduledPeriodicJob struct {

func newPeriodicEnqueuer(namespace string, pool *redis.Pool, periodicJobs []*periodicJob) *periodicEnqueuer {
return &periodicEnqueuer{
namespace: namespace,
pool: pool,
periodicJobs: periodicJobs,
stopChan: make(chan struct{}),
doneStoppingChan: make(chan struct{}),
namespace: namespace,
pool: pool,
periodicJobs: periodicJobs,
stopChan: make(chan struct{}),
doneStoppingChan: make(chan struct{}),
periodicEnqueueUniqueInScript: redis.NewScript(2, redisLuaPeriodicEnqueueUniqueIn),
}
}

Expand Down Expand Up @@ -110,9 +113,19 @@ func (pe *periodicEnqueuer) enqueue() error {
return err
}

_, err = conn.Do("ZADD", redisKeyScheduled(pe.namespace), epoch, rawJSON)
script := pe.periodicEnqueueUniqueInScript
res, err := redis.String(script.Do(conn, []interface{}{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us have a quick chat for me to understand this. May be a huddle

redisKeyScheduled(pe.namespace),
redisKeyPeriodicEnqueue(pe.namespace, id),
rawJSON,
epoch,
epoch - now,
}...))

if err != nil {
return err
} else if res == "dup" {
return errors.New("duplicate enqueue")
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func redisKeyJobsConcurrency(namespace, jobName string) string {
return redisKeyJobs(namespace, jobName) + ":max_concurrency"
}

func redisKeyPeriodicEnqueue(namespace, id string) string {
return redisNamespacePrefix(namespace) + id
}

func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) {
var buf bytes.Buffer

Expand Down Expand Up @@ -373,3 +377,16 @@ else
end
return 'dup'
`

// KEYS[1] = scheduled job queue
// KEYS[2] = Unique job's key. Test for existence and set if we push.
// ARGV[1] = job
// ARGV[2] = epoch seconds for job to be run at
// ARGV[3] = seconds key should be alive for
var redisLuaPeriodicEnqueueUniqueIn = `
if redis.call('set', KEYS[2], 1, 'NX', 'EX', ARGV[3]) then
redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
return 'ok'
end
return 'dup'
`