Skip to content

Commit c015d6c

Browse files
Update add example
1 parent c59894b commit c015d6c

File tree

7 files changed

+332
-173
lines changed

7 files changed

+332
-173
lines changed

actuator/function_actuator.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
// 封装好的函数执行器
2+
3+
package actuator
4+
5+
import (
6+
"context"
7+
"errors"
8+
"fmt"
9+
"sync"
10+
"time"
11+
12+
framework "github.com/memory-overflow/light-task-scheduler"
13+
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
14+
)
15+
16+
// fucntionActuator 函数执行器,同步任务异步化的示例
17+
type fucntionActuator struct {
18+
runFunc RunFunction // 执行函数
19+
callbackChannel chan framework.Task // 回调队列
20+
21+
runningTask sync.Map // taskId -> [framework.AsyncTaskStatus, cancel function] 映射
22+
datatMap sync.Map // taskId -> interface{} 映射
23+
24+
}
25+
26+
// RunFunction 执行函数的定义,实现该函数的时候,建议使用传入 ctx 进行超时和退出处理
27+
// data 任务执行完成返回的数据
28+
type RunFunction func(ctx context.Context, task *framework.Task) (data interface{}, err error)
29+
30+
// runFunc 待调度的执行函数,注意实现该函数的时候,需要使用传入 ctx 进行超时和退出处理,框架否则无法控制超时时间
31+
// callbackChannel 用来执行器进行任务回调,返回已经完成的任务,如果不需要回调,传入 nil 即可
32+
func MakeFucntionActuator(runFunc RunFunction, callbackChannel chan framework.Task) (*fucntionActuator, error) {
33+
if runFunc == nil {
34+
return nil, fmt.Errorf("runFunc is nil")
35+
}
36+
return &fucntionActuator{
37+
runFunc: runFunc,
38+
callbackChannel: callbackChannel,
39+
}, nil
40+
}
41+
42+
// Init 任务在被调度前的初始化工作
43+
func (fc *fucntionActuator) Init(ctx context.Context, task *framework.Task) (
44+
newTask *framework.Task, err error) {
45+
// 无初始化工作
46+
return task, nil
47+
}
48+
49+
// Start 执行任务
50+
func (fc *fucntionActuator) Start(ctx context.Context, ftask *framework.Task) (
51+
newTask *framework.Task, ignoreErr bool, err error) {
52+
if st, ok := fc.runningTask.Load(ftask.TaskId); ok {
53+
status := st.([]interface{})[0].(framework.AsyncTaskStatus).TaskStatus
54+
if status == lighttaskscheduler.TASK_STATUS_RUNNING || status == lighttaskscheduler.TASK_STATUS_SUCCESS {
55+
// 任务已经在执行中,不能重复执行
56+
return ftask, false, fmt.Errorf("task %s is running", ftask.TaskId)
57+
}
58+
}
59+
runCtx, cancel := context.WithCancel(ctx)
60+
61+
ftask.TaskStatus = framework.TASK_STATUS_RUNNING
62+
ftask.TaskStartTime = time.Now()
63+
fc.datatMap.Delete(ftask.TaskId)
64+
fc.runningTask.Store(ftask.TaskId,
65+
[]interface{}{
66+
framework.AsyncTaskStatus{
67+
TaskStatus: framework.TASK_STATUS_RUNNING,
68+
Progress: 0.0,
69+
}, cancel})
70+
71+
go func() {
72+
data, err := fc.runFunc(runCtx, ftask)
73+
st, ok := fc.runningTask.Load(ftask.TaskId)
74+
if !ok {
75+
// 任务可能因为超时被删除,或者手动暂停、不处理
76+
return
77+
}
78+
(st.([]interface{})[1].(context.CancelFunc))() // 任务执行完,也要执行对应的 cancel 函数
79+
var newStatus framework.AsyncTaskStatus
80+
if err != nil {
81+
newStatus = framework.AsyncTaskStatus{
82+
TaskStatus: framework.TASK_STATUS_FAILED,
83+
Progress: 0.0,
84+
FailedReason: err,
85+
}
86+
} else {
87+
newStatus = framework.AsyncTaskStatus{
88+
TaskStatus: framework.TASK_STATUS_SUCCESS,
89+
Progress: 100.0,
90+
}
91+
fc.datatMap.Store(ftask.TaskId, data) // 先存结果
92+
}
93+
fc.runningTask.Store(ftask.TaskId, []interface{}{newStatus, nil})
94+
if fc.callbackChannel != nil {
95+
// 如果需要回调
96+
callbackTask := *ftask
97+
callbackTask.TaskStatus = newStatus.TaskStatus
98+
callbackTask.TaskEnbTime = time.Now()
99+
if newStatus.FailedReason != nil {
100+
callbackTask.FailedReason = newStatus.FailedReason.Error()
101+
}
102+
fc.callbackChannel <- callbackTask
103+
}
104+
}()
105+
106+
return ftask, false, nil
107+
}
108+
109+
func (fc *fucntionActuator) clear(taskId string) {
110+
fc.runningTask.Delete(taskId)
111+
fc.datatMap.Delete(taskId)
112+
}
113+
114+
// Stop 停止任务
115+
func (fc *fucntionActuator) Stop(ctx context.Context, ftask *framework.Task) error {
116+
st, ok := fc.runningTask.Load(ftask.TaskId)
117+
if !ok {
118+
// 未找到任务
119+
fc.datatMap.Delete(ftask.TaskId)
120+
return nil
121+
}
122+
fc.clear(ftask.TaskId)
123+
(st.([]interface{})[1].(context.CancelFunc))() // cancel 函数 ctx
124+
return nil
125+
}
126+
127+
// GetAsyncTaskStatus 批量获取任务状态
128+
func (fc *fucntionActuator) GetAsyncTaskStatus(ctx context.Context, ftasks []framework.Task) (
129+
status []framework.AsyncTaskStatus, err error) {
130+
for _, ftask := range ftasks {
131+
fstatus, ok := fc.runningTask.Load(ftask.TaskId)
132+
if !ok {
133+
status = append(status, framework.AsyncTaskStatus{
134+
TaskStatus: framework.TASK_STATUS_FAILED,
135+
FailedReason: errors.New("同步任务未找到"),
136+
Progress: float32(0.0),
137+
})
138+
} else {
139+
st := fstatus.([]interface{})[0].(framework.AsyncTaskStatus)
140+
if st.TaskStatus != framework.TASK_STATUS_RUNNING {
141+
fc.runningTask.Delete(ftask.TaskId) // delete task status after query if task finished
142+
}
143+
status = append(status, st)
144+
}
145+
}
146+
return status, nil
147+
}
148+
149+
// GetOutput ...
150+
func (fc *fucntionActuator) GetOutput(ctx context.Context, ftask *framework.Task) (
151+
data interface{}, err error) {
152+
res, ok := fc.datatMap.Load(ftask.TaskId)
153+
if !ok {
154+
return nil, fmt.Errorf("not found result for task %s", ftask.TaskId)
155+
}
156+
fc.clear(ftask.TaskId)
157+
return res, nil
158+
}

container/combination_container.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -160,15 +160,3 @@ func (c *combinationContainer) UpdateRunningTaskStatus(ctx context.Context,
160160
}
161161
return nil
162162
}
163-
164-
// SaveData 导出任务输出,自行处理任务结果
165-
func (c *combinationContainer) SaveData(ctx context.Context, ftask *lighttaskscheduler.Task,
166-
data interface{}) error {
167-
if err := c.memeoryContainer.SaveData(ctx, ftask, data); err != nil {
168-
return err
169-
}
170-
if err := c.persistContainer.SaveData(ctx, ftask, data); err != nil {
171-
return err
172-
}
173-
return nil
174-
}

container/memory_container/queue_container.go

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
1111
)
1212

13-
type SaveDataFunc func(context.Context, *lighttaskscheduler.Task, interface{}) error
14-
1513
// queueContainer 队列型容器,任务无状态,无优先级,先进先出,任务数据,多进程数据无法共享数据
1614
type queueContainer struct {
1715
MemeoryContainer
@@ -22,16 +20,13 @@ type queueContainer struct {
2220

2321
waitingTasks chan lighttaskscheduler.Task
2422
timeout time.Duration
25-
savefunc SaveDataFunc
2623
}
2724

2825
// MakeQueueContainer 构造队列型任务容器, size 表示队列的大小, timeout 表示队列读取的超时时间
29-
// 需要提供一个 savefunc 处理任务结束以后,任务数据如何处理,如何存储,如果不传,默认不处理任务结果
30-
func MakeQueueContainer(size uint32, timeout time.Duration, savefunc SaveDataFunc) *queueContainer {
26+
func MakeQueueContainer(size uint32, timeout time.Duration) *queueContainer {
3127
return &queueContainer{
3228
waitingTasks: make(chan lighttaskscheduler.Task, size),
3329
timeout: timeout,
34-
savefunc: savefunc,
3530
}
3631
}
3732

@@ -99,8 +94,13 @@ func (q *queueContainer) ToRunningStatus(ctx context.Context, task *lighttasksch
9994
newTask *lighttaskscheduler.Task, err error) {
10095
task.TaskStartTime = time.Now()
10196
task.TaskStatus = lighttaskscheduler.TASK_STATUS_RUNNING
102-
if _, ok := q.runningTaskMap.LoadOrStore(task.TaskId, *task); !ok {
97+
t, ok := q.runningTaskMap.LoadOrStore(task.TaskId, *task)
98+
if !ok {
10399
atomic.AddInt32(&q.runningTaskCount, 1)
100+
} else {
101+
nt := t.(lighttaskscheduler.Task)
102+
nt.TaskAttemptsTime = task.TaskAttemptsTime
103+
q.runningTaskMap.Store(task.TaskId, nt)
104104
}
105105
return task, nil
106106
}
@@ -158,6 +158,9 @@ func (q *queueContainer) ToExportStatus(ctx context.Context, task *lighttasksche
158158
// ToSuccessStatus 转移到执行成功状态
159159
func (q *queueContainer) ToSuccessStatus(ctx context.Context, task *lighttaskscheduler.Task) (
160160
newTask *lighttaskscheduler.Task, err error) {
161+
if _, ok := q.runningTaskMap.LoadAndDelete(task.TaskId); ok {
162+
atomic.AddInt32(&q.runningTaskCount, -1)
163+
}
161164
task.TaskStatus = lighttaskscheduler.TASK_STATUS_SUCCESS
162165
return task, nil
163166
}
@@ -167,12 +170,3 @@ func (q *queueContainer) UpdateRunningTaskStatus(ctx context.Context,
167170
task *lighttaskscheduler.Task, status lighttaskscheduler.AsyncTaskStatus) error {
168171
return nil
169172
}
170-
171-
// SaveData 保存任务结果
172-
func (q *queueContainer) SaveData(ctx context.Context, ftask *lighttaskscheduler.Task,
173-
data interface{}) error {
174-
if q.savefunc != nil {
175-
return q.savefunc(ctx, ftask, data)
176-
}
177-
return nil
178-
}

example/add_example/add/add_actuator.go

Lines changed: 0 additions & 103 deletions
This file was deleted.

example/add_example/add/add_task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package addexample
1+
package add
22

33
import "time"
44

0 commit comments

Comments
 (0)