Skip to content

Commit 3f43571

Browse files
author
Nikhil
committed
redisqueue package
1 parent 70acb3b commit 3f43571

File tree

4 files changed

+140
-115
lines changed

4 files changed

+140
-115
lines changed

main.go

Lines changed: 49 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,81 +2,75 @@ package main
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"log"
78
"net/http"
8-
"os"
9-
"os/signal"
10-
"sync"
11-
"syscall"
129

1310
"github.com/go-redis/redis/v8"
14-
)
15-
16-
var (
17-
ctx context.Context
18-
cancel context.CancelFunc
11+
"go-redis-queue/redisqueue"
1912
)
2013

2114
func main() {
22-
// Create a new Redis client
23-
redisClient := NewRedisClient()
24-
25-
// Create a context to handle cancellation
26-
ctx, cancel = context.WithCancel(context.Background())
15+
// Create a new Redis queue
16+
rq, err := redisqueue.NewRedisQueue("localhost:6379", "", 0)
17+
if err != nil {
18+
log.Fatal("Failed to create Redis queue:", err)
19+
}
2720

28-
// Create a wait group to wait for all workers to finish
29-
var wg sync.WaitGroup
21+
// Register worker functions for different queues
22+
rq.RegisterWorker("queue1", workerFunction1)
23+
rq.RegisterWorker("queue2", workerFunction2)
3024

31-
// Create a worker
32-
worker := NewWorker(redisClient)
25+
// Start the Redis queue and workers
26+
go rq.Start(context.Background())
3327

34-
// Start the worker goroutine
35-
wg.Add(1)
36-
go worker.Start(&wg)
28+
// Set up API routes
29+
http.HandleFunc("/push", handlePushRequest)
3730

3831
// Start the API server
39-
go startAPIServer(redisClient)
40-
41-
fmt.Println("Server started.")
42-
43-
// Wait for termination signal
44-
waitForTerminationSignal()
45-
46-
// Stop the worker
47-
cancel()
48-
wg.Wait()
49-
50-
fmt.Println("Server stopped.")
32+
log.Fatal(http.ListenAndServe(":8080", nil))
5133
}
5234

53-
func startAPIServer(redisClient *redis.Client) {
54-
http.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {
55-
task := r.URL.Query().Get("task")
56-
if task == "" {
57-
http.Error(w, "Missing task parameter", http.StatusBadRequest)
58-
return
59-
}
60-
61-
// Push the task to the Redis queue
62-
err := redisClient.RPush(ctx, "myqueue", task).Err()
63-
if err != nil {
64-
log.Println("Failed to push task to Redis queue:", err)
65-
http.Error(w, "Failed to push task to Redis queue", http.StatusInternalServerError)
66-
return
67-
}
35+
func handlePushRequest(w http.ResponseWriter, r *http.Request) {
36+
// Parse the query parameter "task" from the URL
37+
task := r.URL.Query().Get("task")
38+
if task == "" {
39+
http.Error(w, "Missing task parameter", http.StatusBadRequest)
40+
return
41+
}
6842

69-
fmt.Fprintf(w, "Task %s pushed to the Redis queue", task)
43+
// Create a Redis client to push the task into the queue
44+
client := redis.NewClient(&redis.Options{
45+
Addr: "localhost:6379",
46+
Password: "",
47+
DB: 0,
7048
})
7149

72-
err := http.ListenAndServe(":8080", nil)
50+
// Push the task into the queue
51+
err := client.RPush(context.Background(), "queue1", task).Err()
7352
if err != nil {
74-
log.Fatal("Failed to start API server:", err)
53+
http.Error(w, "Failed to push task into the queue", http.StatusInternalServerError)
54+
return
7555
}
56+
57+
// Return a success response
58+
response := map[string]interface{}{
59+
"status": "success",
60+
"message": "Task added to the queue",
61+
}
62+
w.Header().Set("Content-Type", "application/json")
63+
json.NewEncoder(w).Encode(response)
64+
}
65+
66+
func workerFunction1(ctx context.Context, task string) {
67+
// Implement the worker function for queue1
68+
fmt.Printf("Worker1 processing task from queue1: %s\n", task)
69+
// Perform the required task processing logic for queue1
7670
}
7771

78-
func waitForTerminationSignal() {
79-
signals := make(chan os.Signal, 1)
80-
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
81-
<-signals
72+
func workerFunction2(ctx context.Context, task string) {
73+
// Implement the worker function for queue2
74+
fmt.Printf("Worker2 processing task from queue2: %s\n", task)
75+
// Perform the required task processing logic for queue2
8276
}

redis_client.go

Lines changed: 0 additions & 13 deletions
This file was deleted.

redisqueue/redisqueue.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package redisqueue
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"sync"
8+
"time"
9+
10+
"github.com/go-redis/redis/v8"
11+
)
12+
13+
// WorkerFunc is a function type for worker functions that process tasks
14+
type WorkerFunc func(ctx context.Context, task string)
15+
16+
// RedisQueue represents a Redis queue
17+
type RedisQueue struct {
18+
client *redis.Client
19+
workers map[string]*Worker
20+
}
21+
22+
// Worker represents a worker that processes tasks from a Redis queue
23+
type Worker struct {
24+
queueName string
25+
function WorkerFunc
26+
}
27+
28+
// NewRedisQueue creates a new RedisQueue instance
29+
func NewRedisQueue(addr, password string, db int) (*RedisQueue, error) {
30+
client := redis.NewClient(&redis.Options{
31+
Addr: addr,
32+
Password: password,
33+
DB: db,
34+
})
35+
36+
_, err := client.Ping(context.Background()).Result()
37+
if err != nil {
38+
return nil, fmt.Errorf("failed to connect to Redis: %v", err)
39+
}
40+
41+
return &RedisQueue{
42+
client: client,
43+
workers: make(map[string]*Worker),
44+
}, nil
45+
}
46+
47+
// RegisterWorker registers a worker with a specific queue and worker function
48+
func (rq *RedisQueue) RegisterWorker(queueName string, function WorkerFunc) {
49+
worker := &Worker{
50+
queueName: queueName,
51+
function: function,
52+
}
53+
54+
rq.workers[queueName] = worker
55+
}
56+
57+
// Start starts all the workers
58+
func (rq *RedisQueue) Start(ctx context.Context) {
59+
var wg sync.WaitGroup
60+
61+
for _, worker := range rq.workers {
62+
wg.Add(1)
63+
go rq.startWorker(ctx, worker, &wg)
64+
}
65+
66+
wg.Wait()
67+
}
68+
69+
func (rq *RedisQueue) startWorker(ctx context.Context, worker *Worker, wg *sync.WaitGroup) {
70+
defer wg.Done()
71+
72+
for {
73+
select {
74+
case <-ctx.Done():
75+
return
76+
default:
77+
// Retrieve a task from the Redis queue
78+
task, err := rq.client.BLPop(ctx, 0, worker.queueName).Result()
79+
if err != nil {
80+
log.Printf("Failed to retrieve task from queue %s: %v\n", worker.queueName, err)
81+
continue
82+
}
83+
84+
// Process the task
85+
worker.function(ctx, task[1])
86+
87+
// Simulate some work
88+
time.Sleep(1 * time.Second)
89+
}
90+
}
91+
}

worker.go

Lines changed: 0 additions & 47 deletions
This file was deleted.

0 commit comments

Comments
 (0)