Skip to content

Commit cde2a90

Browse files
author
willzhen
committed
Adjust framework
1 parent 8ecc454 commit cde2a90

File tree

10 files changed

+173
-164
lines changed

10 files changed

+173
-164
lines changed

container/memory_container/queue_container.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package memeorycontainer
33
import (
44
"context"
55
"fmt"
6-
"log"
76
"sync"
87
"sync/atomic"
98
"time"
109

1110
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
1211
)
1312

13+
type SaveDataFunc func(context.Context, *lighttaskscheduler.Task, interface{}) error
14+
1415
// queueContainer 队列型容器,任务无状态,无优先级,先进先出,任务数据,多进程数据无法共享数据
1516
type queueContainer struct {
1617
MemeoryContainer
@@ -21,13 +22,16 @@ type queueContainer struct {
2122

2223
waitingTasks chan lighttaskscheduler.Task
2324
timeout time.Duration
25+
savefunc SaveDataFunc
2426
}
2527

2628
// MakeQueueContainer 构造队列型任务容器, size 表示队列的大小, timeout 表示队列读取的超时时间
27-
func MakeQueueContainer(size uint32, timeout time.Duration) *queueContainer {
29+
// 需要提供一个 savefunc 处理任务结束以后,任务数据如何处理,如何存储,如果不传,默认不处理任务结果
30+
func MakeQueueContainer(size uint32, timeout time.Duration, savefunc SaveDataFunc) *queueContainer {
2831
return &queueContainer{
2932
waitingTasks: make(chan lighttaskscheduler.Task, size),
3033
timeout: timeout,
34+
savefunc: savefunc,
3135
}
3236
}
3337

@@ -135,8 +139,8 @@ func (q *queueContainer) ToFailedStatus(ctx context.Context, task *lighttasksche
135139
if _, ok := q.runningTaskMap.LoadAndDelete(task.TaskId); ok {
136140
atomic.AddInt32(&q.runningTaskCount, -1)
137141
}
138-
log.Printf("failed task %s, reason %v", task.TaskId, reason)
139142
task.TaskStatus = lighttaskscheduler.TASK_STATUS_FAILED
143+
task.FailedReason = reason.Error()
140144
return task, nil
141145
}
142146

@@ -163,3 +167,12 @@ func (q *queueContainer) UpdateRunningTaskStatus(ctx context.Context,
163167
task *lighttaskscheduler.Task, status lighttaskscheduler.AsyncTaskStatus) error {
164168
return nil
165169
}
170+
171+
// ExportOutput 导出任务输出,自行处理任务结果
172+
func (q *queueContainer) SaveData(ctx context.Context, ftask *lighttaskscheduler.Task,
173+
data interface{}) error {
174+
if q.savefunc != nil {
175+
return q.savefunc(ctx, ftask, data)
176+
}
177+
return nil
178+
}

container/persist_container/example_sql_container.go

Lines changed: 0 additions & 92 deletions
This file was deleted.

example/add_example/add/add_actuator.go

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -61,21 +61,6 @@ func (a *AddActuator) Start(ctx context.Context, ftask *framework.Task) (
6161
return ftask, false, nil
6262
}
6363

64-
// ExportOutput 导出任务输出,自行处理任务结果
65-
func (a *AddActuator) ExportOutput(ctx context.Context, ftask *framework.Task) error {
66-
task, ok := ftask.TaskItem.(AddTask)
67-
if !ok {
68-
return fmt.Errorf("TaskItem not be set to AddTask")
69-
}
70-
res, ok := a.resultMap.Load(ftask.TaskId)
71-
if !ok {
72-
return fmt.Errorf("not found result for task %s", ftask.TaskId)
73-
}
74-
log.Printf("finished task %s: %d + %d = %d \n", ftask.TaskId, task.A, task.B, res)
75-
a.resultMap.Delete(ftask.TaskId) // delete result after export
76-
return nil
77-
}
78-
7964
// Stop 停止任务
8065
func (a *AddActuator) Stop(ctx context.Context, ftask *framework.Task) error {
8166
// 同步任务无法真正暂停,只能删除状态
@@ -106,12 +91,13 @@ func (a *AddActuator) GetAsyncTaskStatus(ctx context.Context, ftasks []framework
10691
}
10792

10893
// GetOutput ...
109-
func (e *AddActuator) GetOutput(ctx context.Context, task *framework.Task) (
94+
func (a *AddActuator) GetOutput(ctx context.Context, ftask *framework.Task) (
11095
data interface{}, err error) {
111-
return nil, nil
112-
}
113-
114-
// GetOutput ...
115-
func (e *AddActuator) Delete(ctx context.Context, task *framework.Task) (err error) {
116-
return nil
96+
res, ok := a.resultMap.Load(ftask.TaskId)
97+
if !ok {
98+
return nil, fmt.Errorf("not found result for task %s", ftask.TaskId)
99+
}
100+
a.resultMap.Delete(ftask.TaskId) // delete result after get
101+
// reutrn int32
102+
return res, nil
117103
}

example/add_example/main.go

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"log"
66
"math/rand"
7-
"os"
87
"strconv"
98
"time"
109

@@ -15,39 +14,41 @@ import (
1514
)
1615

1716
func main() {
18-
container := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond)
17+
save := func(ctx context.Context, ftask *lighttaskscheduler.Task, data interface{}) error {
18+
log.Printf("save task %s: result = %d \n", ftask.TaskId, data.(int32))
19+
return nil
20+
}
21+
container := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond, save)
1922
actuator := &addexample.AddActuator{}
2023
sch := lighttaskscheduler.MakeNewScheduler(
2124
context.Background(),
2225
container, actuator,
2326
lighttaskscheduler.Config{
24-
TaskLimit: 5,
25-
ScanInterval: 50 * time.Millisecond,
26-
TaskTimeout: 5 * time.Second, // 5s 超时
27+
TaskLimit: 5,
28+
ScanInterval: 50 * time.Millisecond,
29+
TaskTimeout: 5 * time.Second, // 5s 超时
30+
EnableFinshedTaskList: true, // 开启已完成任务返回功能
2731
},
2832
)
2933

30-
var c chan os.Signal
3134
r := rand.New(rand.NewSource(time.Now().UnixNano()))
3235
for i := 0; i < 10; i++ {
33-
select {
34-
case <-c:
35-
return
36-
default:
37-
sch.AddTask(context.Background(),
38-
lighttaskscheduler.Task{
39-
TaskId: strconv.Itoa(i),
40-
TaskItem: addexample.AddTask{
41-
A: r.Int31() % 1000,
42-
B: r.Int31() % 1000,
43-
},
44-
})
45-
}
36+
sch.AddTask(context.Background(),
37+
lighttaskscheduler.Task{
38+
TaskId: strconv.Itoa(i),
39+
TaskItem: addexample.AddTask{
40+
A: r.Int31() % 1000,
41+
B: r.Int31() % 1000,
42+
},
43+
})
4644
}
47-
48-
for range c {
49-
log.Println("stop Scheduling")
50-
sch.Close()
51-
return
45+
for task := range sch.FinshedTasks() {
46+
if task.TaskStatus == lighttaskscheduler.TASK_STATUS_FAILED {
47+
log.Printf("failed task %s, reason: %s, timecost: %dms\n",
48+
task.TaskId, task.FailedReason, task.TaskEnbTime.Sub(task.TaskStartTime).Milliseconds())
49+
} else if task.TaskStatus == lighttaskscheduler.TASK_STATUS_SUCCESS {
50+
log.Printf("success task %s, timecost: %dms\n", task.TaskId,
51+
task.TaskEnbTime.Sub(task.TaskStartTime).Milliseconds())
52+
}
5253
}
5354
}

go.mod

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,8 @@ module github.com/memory-overflow/light-task-scheduler
22

33
go 1.16
44

5-
require github.com/memory-overflow/go-common-library v0.0.0-20230426144745-a649dc22d6ab
5+
require (
6+
github.com/memory-overflow/go-common-library v0.0.0-20230426144745-a649dc22d6ab
7+
gorm.io/driver/mysql v1.5.0
8+
gorm.io/gorm v1.25.0
9+
)

go.sum

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
22
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
33
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
4+
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
5+
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
46
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
57
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
8+
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
9+
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
10+
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
11+
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
612
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
713
github.com/memory-overflow/go-common-library v0.0.0-20230426144745-a649dc22d6ab h1:DrN+ZDowgkIyBMVdxXihzrW/l7NOsXH13hSpmjd2XVY=
814
github.com/memory-overflow/go-common-library v0.0.0-20230426144745-a649dc22d6ab/go.mod h1:Bqm7BH39OJ4Igz9jYs4494mkEmXiBeSsVZhWnj6KKU8=
@@ -14,3 +20,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
1420
github.com/tidwall/gjson v1.14.4/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
1521
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
1622
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
23+
gorm.io/driver/mysql v1.5.0 h1:6hSAT5QcyIaty0jfnff0z0CLDjyRgZ8mlMHLqSt7uXM=
24+
gorm.io/driver/mysql v1.5.0/go.mod h1:FFla/fJuCvyTi7rJQd27qlNX2v3L6deTR1GgTjSOLPo=
25+
gorm.io/gorm v1.24.7-0.20230306060331-85eaf9eeda11/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
26+
gorm.io/gorm v1.25.0 h1:+KtYtb2roDz14EQe4bla8CbQlmb9dN3VejSai3lprfU=
27+
gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=

task.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@ const (
1919

2020
// Task 通用的任务结构
2121
type Task struct {
22-
TaskId string // 该任务的唯一标识id
23-
TaskPriority int // 任务优先级
24-
TaskItem interface{}
25-
TaskStartTime time.Time
26-
TaskStatus TaskStatus
22+
TaskId string // 该任务的唯一标识id,创建任务的时候赋予
23+
TaskPriority int // 任务优先级, 创建任务的时候可选
24+
TaskItem interface{} // 任务对象,创建任务的时候赋予
25+
26+
TaskStartTime time.Time // 框架赋予值
27+
TaskEnbTime time.Time // 框架赋予值
28+
TaskStatus TaskStatus // 任务容器负责赋予值
29+
FailedReason string // 任务容器负责赋予值
2730
}
2831

2932
// AsyncTaskStatus 异步任务状态

task_actuator.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,12 @@ type TaskActuator interface {
1313
// ignoreErr 是否忽略任务调度的错误,等待恢复,如果 ignoreErr = false, Start 返回 error 任务会失败
1414
Start(ctx context.Context, task *Task) (newTask *Task, ignoreErr bool, err error)
1515

16-
// ExportOutput 导出任务输出,自行处理任务结果
17-
ExportOutput(ctx context.Context, task *Task) error
18-
19-
// GetOutput 获取任务数据,调度框架不会调用该接口,提供给用户自由选择是否实现
16+
// GetOutput 从任务执行器获取任务执行的结果
2017
GetOutput(ctx context.Context, task *Task) (data interface{}, err error)
2118

2219
// Stop 停止任务
2320
Stop(ctx context.Context, task *Task) error
2421

2522
// GetTaskStatus 获取异步执行中的任务的状态
2623
GetAsyncTaskStatus(ctx context.Context, tasks []Task) (status []AsyncTaskStatus, err error)
27-
28-
// Delete 删除任务
29-
Delete(ctx context.Context, task *Task) error
3024
}

task_container.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,8 @@ type TaskContainer interface {
3737

3838
// UpdateRunningTaskStatus 更新执行中的任务执行进度状态
3939
UpdateRunningTaskStatus(ctx context.Context, task *Task, status AsyncTaskStatus) error
40+
41+
// SaveData 提供一个把从任务执行器获取的任务执行的结果进行存储的机会
42+
// data 协议保持和 TaskActuator.GetOutput 一样
43+
SaveData(ctx context.Context, task *Task, data interface{}) (err error)
4044
}

0 commit comments

Comments
 (0)