Skip to content

Commit e8a2144

Browse files
author
Nikhil
committed
Go redis queue worker
0 parents  commit e8a2144

File tree

10 files changed

+249
-0
lines changed

10 files changed

+249
-0
lines changed

.idea/.gitignore

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/go-redis-queue.iml

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Redis Queue Implementation in Go
2+
3+
This repository contains an implementation of a Redis queue in Go. It includes separate files for the Redis client, worker, and an API to push data into the queue.
4+
5+
## Prerequisites
6+
7+
- Go 1.16 or higher installed
8+
- Redis installed and running on your local machine
9+
10+
## Getting Started
11+
12+
1. Clone the repository:
13+
14+
```
15+
git clone <repository_url>
16+
```
17+
18+
2. Install the required dependencies:
19+
20+
```
21+
go get github.com/go-redis/redis/v8
22+
```
23+
24+
3. Start the Redis server on your local machine. If Redis is not installed, you can install it using Homebrew on macOS:
25+
26+
```
27+
brew install redis
28+
```
29+
30+
4. Start the Redis server:
31+
32+
```
33+
brew services start redis
34+
```
35+
36+
5. Build and run the Go program:
37+
38+
```
39+
go build
40+
./<binary_file_name>
41+
```
42+
43+
The server and worker will start running. The worker will listen for tasks in the Redis queue, and the API server will be available on port 8080.
44+
45+
## Pushing Tasks to the Queue
46+
47+
To push tasks to the Redis queue, you can make HTTP requests to the `/push` endpoint of the API server. The tasks will be added to the Redis queue and processed by the worker.
48+
49+
Send a POST request to `http://localhost:8080/push?task=<your_task_here>` to push a task to the queue. Replace `<your_task_here>` with the task you want to add.
50+
51+
Example using cURL:
52+
53+
```
54+
curl -X POST "http://localhost:8080/push?task=example_task"
55+
```
56+
57+
## Stopping the Program
58+
59+
To stop the program gracefully, send a termination signal to the process. You can do this by pressing Ctrl+C in the terminal where the program is running.
60+
61+
## Customization
62+
63+
Feel free to customize the code according to your requirements. You can add error handling, authentication, or enhance the worker's processing logic.
64+
65+
## License
66+
67+
This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.

go-redis-queue

6.58 MB
Binary file not shown.

go.mod

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module go-redis-queue
2+
3+
go 1.20
4+
5+
require (
6+
github.com/cespare/xxhash/v2 v2.1.2 // indirect
7+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
8+
github.com/go-redis/redis/v8 v8.11.5 // indirect
9+
)

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
2+
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
3+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
4+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
5+
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
6+
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=

main.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"net/http"
8+
"os"
9+
"os/signal"
10+
"sync"
11+
"syscall"
12+
13+
"github.com/go-redis/redis/v8"
14+
)
15+
16+
var (
17+
ctx context.Context
18+
cancel context.CancelFunc
19+
)
20+
21+
func main() {
22+
// Create a new Redis client
23+
redisClient := NewRedisClient()
24+
25+
// Create a context to handle cancellation
26+
ctx, cancel = context.WithCancel(context.Background())
27+
28+
// Create a wait group to wait for all workers to finish
29+
var wg sync.WaitGroup
30+
31+
// Create a worker
32+
worker := NewWorker(redisClient)
33+
34+
// Start the worker goroutine
35+
wg.Add(1)
36+
go worker.Start(&wg)
37+
38+
// Start the API server
39+
go startAPIServer(redisClient)
40+
41+
fmt.Println("Server started.")
42+
43+
// Wait for termination signal
44+
waitForTerminationSignal()
45+
46+
// Stop the worker
47+
cancel()
48+
wg.Wait()
49+
50+
fmt.Println("Server stopped.")
51+
}
52+
53+
func startAPIServer(redisClient *redis.Client) {
54+
http.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {
55+
task := r.URL.Query().Get("task")
56+
if task == "" {
57+
http.Error(w, "Missing task parameter", http.StatusBadRequest)
58+
return
59+
}
60+
61+
// Push the task to the Redis queue
62+
err := redisClient.RPush(ctx, "myqueue", task).Err()
63+
if err != nil {
64+
log.Println("Failed to push task to Redis queue:", err)
65+
http.Error(w, "Failed to push task to Redis queue", http.StatusInternalServerError)
66+
return
67+
}
68+
69+
fmt.Fprintf(w, "Task %s pushed to the Redis queue", task)
70+
})
71+
72+
err := http.ListenAndServe(":8080", nil)
73+
if err != nil {
74+
log.Fatal("Failed to start API server:", err)
75+
}
76+
}
77+
78+
func waitForTerminationSignal() {
79+
signals := make(chan os.Signal, 1)
80+
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
81+
<-signals
82+
}

redis_client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package main
2+
3+
import "github.com/go-redis/redis/v8"
4+
5+
// NewRedisClient creates a new Redis client instance
6+
func NewRedisClient() *redis.Client {
7+
client := redis.NewClient(&redis.Options{
8+
Addr: "localhost:6379", // Redis server address
9+
Password: "", // Redis password
10+
DB: 0, // Redis database index
11+
})
12+
return client
13+
}

worker.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"sync"
7+
"time"
8+
9+
"github.com/go-redis/redis/v8"
10+
)
11+
12+
// Worker represents a worker that processes tasks from the Redis queue
13+
type Worker struct {
14+
client *redis.Client
15+
}
16+
17+
// NewWorker creates a new Worker instance
18+
func NewWorker(client *redis.Client) *Worker {
19+
return &Worker{
20+
client: client,
21+
}
22+
}
23+
24+
// Start starts the worker and listens for tasks in the Redis queue
25+
func (w *Worker) Start(wg *sync.WaitGroup) {
26+
defer wg.Done()
27+
28+
for {
29+
select {
30+
case <-ctx.Done():
31+
return
32+
default:
33+
// Retrieve a task from the Redis queue
34+
task, err := w.client.BLPop(ctx, 0, "myqueue").Result()
35+
if err != nil {
36+
log.Println("Failed to retrieve task:", err)
37+
continue
38+
}
39+
40+
// Process the task
41+
fmt.Println("Processing task:", task[1])
42+
43+
// Simulate some work
44+
time.Sleep(1 * time.Second)
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)