Skip to content

Commit adc160d

Browse files
Merge pull request #8 from memory-overflow/develop
Develop
2 parents 598ea66 + 550547d commit adc160d

File tree

23 files changed

+2081
-557
lines changed

23 files changed

+2081
-557
lines changed

actuator/function_actuator.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// 封装好的函数执行器
2+
3+
package actuator
4+
5+
import (
6+
"context"
7+
"errors"
8+
"fmt"
9+
"runtime/debug"
10+
"sync"
11+
"time"
12+
13+
framework "github.com/memory-overflow/light-task-scheduler"
14+
)
15+
16+
// fucntionActuator 函数执行器,同步任务异步化的示例
17+
type fucntionActuator struct {
18+
runFunc RunFunction // 执行函数
19+
initFunc InitFunction // 初始函数
20+
callbackChannel chan framework.Task // 回调队列
21+
22+
runningTask sync.Map // taskId -> [framework.AsyncTaskStatus, cancel function] 映射
23+
datatMap sync.Map // taskId -> interface{} 映射
24+
25+
}
26+
27+
// RunFunction 执行函数的定义,实现该函数的时候,建议使用传入 ctx 进行超时和退出处理
28+
// data 任务执行完成返回的数据
29+
type RunFunction func(ctx context.Context, task *framework.Task) (data interface{}, err error)
30+
31+
// InitFunction 任务执行前的初始工作
32+
type InitFunction func(ctx context.Context, task *framework.Task) (newTask *framework.Task, err error)
33+
34+
// runFunc 待调度的执行函数,注意实现该函数的时候,需要使用传入 ctx 进行超时和退出处理,框架否则无法控制超时时间
35+
// callbackChannel 用来执行器进行任务回调,返回已经完成的任务,如果不需要回调,传入 nil 即可
36+
func MakeFucntionActuator(runFunc RunFunction, initFunc InitFunction) (*fucntionActuator, error) {
37+
if runFunc == nil {
38+
return nil, fmt.Errorf("runFunc is nil")
39+
}
40+
return &fucntionActuator{
41+
runFunc: runFunc,
42+
initFunc: initFunc,
43+
}, nil
44+
}
45+
46+
// SetCallbackChannel 任务配置回调 channel
47+
func (fc *fucntionActuator) SetCallbackChannel(callbackChannel chan framework.Task) {
48+
fc.callbackChannel = callbackChannel
49+
}
50+
51+
// Init 任务在被调度前的初始化工作
52+
func (fc *fucntionActuator) Init(ctx context.Context, task *framework.Task) (
53+
newTask *framework.Task, err error) {
54+
if fc.initFunc != nil {
55+
return fc.initFunc(ctx, task)
56+
}
57+
return task, nil
58+
}
59+
60+
// Start 执行任务
61+
func (fc *fucntionActuator) Start(ctx context.Context, ftask *framework.Task) (
62+
newTask *framework.Task, ignoreErr bool, err error) {
63+
if st, ok := fc.runningTask.Load(ftask.TaskId); ok {
64+
status := st.([]interface{})[0].(framework.AsyncTaskStatus).TaskStatus
65+
if status == framework.TASK_STATUS_RUNNING || status == framework.TASK_STATUS_SUCCESS {
66+
// 任务已经在执行中,不能重复执行
67+
return ftask, false, fmt.Errorf("task %s is running", ftask.TaskId)
68+
}
69+
}
70+
runCtx, cancel := context.WithCancel(ctx)
71+
72+
ftask.TaskStatus = framework.TASK_STATUS_RUNNING
73+
ftask.TaskStartTime = time.Now()
74+
fc.datatMap.Delete(ftask.TaskId)
75+
fc.runningTask.Store(ftask.TaskId,
76+
[]interface{}{
77+
framework.AsyncTaskStatus{
78+
TaskStatus: framework.TASK_STATUS_RUNNING,
79+
Progress: 0.0,
80+
}, cancel})
81+
82+
go func() {
83+
data, err := func() (data interface{}, err error) {
84+
defer func() {
85+
if p := recover(); p != nil {
86+
data = nil
87+
88+
err = fmt.Errorf("panic: %v, stacktrace: %s", p, debug.Stack())
89+
}
90+
}()
91+
return fc.runFunc(runCtx, ftask)
92+
}()
93+
st, ok := fc.runningTask.Load(ftask.TaskId)
94+
if !ok {
95+
// 任务可能因为超时被删除,或者手动暂停、不处理
96+
return
97+
}
98+
(st.([]interface{})[1].(context.CancelFunc))() // 任务执行完,也要执行对应的 cancel 函数
99+
var newStatus framework.AsyncTaskStatus
100+
if err != nil {
101+
newStatus = framework.AsyncTaskStatus{
102+
TaskStatus: framework.TASK_STATUS_FAILED,
103+
Progress: 0.0,
104+
FailedReason: err,
105+
}
106+
} else {
107+
newStatus = framework.AsyncTaskStatus{
108+
TaskStatus: framework.TASK_STATUS_SUCCESS,
109+
Progress: 100.0,
110+
}
111+
fc.datatMap.Store(ftask.TaskId, data) // 先存结果
112+
}
113+
fc.runningTask.Store(ftask.TaskId, []interface{}{newStatus, nil})
114+
if fc.callbackChannel != nil {
115+
// 如果需要回调
116+
callbackTask := *ftask
117+
callbackTask.TaskStatus = newStatus.TaskStatus
118+
callbackTask.TaskEnbTime = time.Now()
119+
if newStatus.FailedReason != nil {
120+
callbackTask.FailedReason = newStatus.FailedReason.Error()
121+
}
122+
fc.callbackChannel <- callbackTask
123+
}
124+
}()
125+
126+
return ftask, false, nil
127+
}
128+
129+
func (fc *fucntionActuator) clear(taskId string) {
130+
fc.runningTask.Delete(taskId)
131+
fc.datatMap.Delete(taskId)
132+
}
133+
134+
// Stop 停止任务
135+
func (fc *fucntionActuator) Stop(ctx context.Context, ftask *framework.Task) error {
136+
st, ok := fc.runningTask.Load(ftask.TaskId)
137+
if !ok {
138+
// 未找到任务
139+
fc.datatMap.Delete(ftask.TaskId)
140+
return nil
141+
}
142+
fc.clear(ftask.TaskId)
143+
(st.([]interface{})[1].(context.CancelFunc))() // cancel 函数 ctx
144+
return nil
145+
}
146+
147+
// GetAsyncTaskStatus 批量获取任务状态
148+
func (fc *fucntionActuator) GetAsyncTaskStatus(ctx context.Context, ftasks []framework.Task) (
149+
status []framework.AsyncTaskStatus, err error) {
150+
for _, ftask := range ftasks {
151+
fstatus, ok := fc.runningTask.Load(ftask.TaskId)
152+
if !ok {
153+
status = append(status, framework.AsyncTaskStatus{
154+
TaskStatus: framework.TASK_STATUS_FAILED,
155+
FailedReason: errors.New("同步任务未找到"),
156+
Progress: float32(0.0),
157+
})
158+
} else {
159+
st := fstatus.([]interface{})[0].(framework.AsyncTaskStatus)
160+
if st.TaskStatus != framework.TASK_STATUS_RUNNING {
161+
fc.runningTask.Delete(ftask.TaskId) // delete task status after query if task finished
162+
}
163+
status = append(status, st)
164+
}
165+
}
166+
return status, nil
167+
}
168+
169+
// GetOutput ...
170+
func (fc *fucntionActuator) GetOutput(ctx context.Context, ftask *framework.Task) (
171+
data interface{}, err error) {
172+
res, ok := fc.datatMap.Load(ftask.TaskId)
173+
if !ok {
174+
return nil, fmt.Errorf("not found result for task %s", ftask.TaskId)
175+
}
176+
fc.clear(ftask.TaskId)
177+
return res, nil
178+
}

callback_receiver.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package lighttaskscheduler
2+
3+
import "context"
4+
5+
// CallbackReceiver 任务状态回调接收器
6+
type CallbackReceiver interface {
7+
GetCallbackChannel(ctx context.Context) (taskChannel chan Task)
8+
}

container/memory_container/queue_container.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package memeorycontainer
33
import (
44
"context"
55
"fmt"
6-
"log"
76
"sync"
87
"sync/atomic"
98
"time"
@@ -95,8 +94,13 @@ func (q *queueContainer) ToRunningStatus(ctx context.Context, task *lighttasksch
9594
newTask *lighttaskscheduler.Task, err error) {
9695
task.TaskStartTime = time.Now()
9796
task.TaskStatus = lighttaskscheduler.TASK_STATUS_RUNNING
98-
if _, ok := q.runningTaskMap.LoadOrStore(task.TaskId, *task); !ok {
97+
t, ok := q.runningTaskMap.LoadOrStore(task.TaskId, *task)
98+
if !ok {
9999
atomic.AddInt32(&q.runningTaskCount, 1)
100+
} else {
101+
nt := t.(lighttaskscheduler.Task)
102+
nt.TaskAttemptsTime = task.TaskAttemptsTime
103+
q.runningTaskMap.Store(task.TaskId, nt)
100104
}
101105
return task, nil
102106
}
@@ -135,8 +139,8 @@ func (q *queueContainer) ToFailedStatus(ctx context.Context, task *lighttasksche
135139
if _, ok := q.runningTaskMap.LoadAndDelete(task.TaskId); ok {
136140
atomic.AddInt32(&q.runningTaskCount, -1)
137141
}
138-
log.Printf("failed task %s, reason %v", task.TaskId, reason)
139142
task.TaskStatus = lighttaskscheduler.TASK_STATUS_FAILED
143+
task.FailedReason = reason.Error()
140144
return task, nil
141145
}
142146

@@ -154,6 +158,9 @@ func (q *queueContainer) ToExportStatus(ctx context.Context, task *lighttasksche
154158
// ToSuccessStatus 转移到执行成功状态
155159
func (q *queueContainer) ToSuccessStatus(ctx context.Context, task *lighttaskscheduler.Task) (
156160
newTask *lighttaskscheduler.Task, err error) {
161+
if _, ok := q.runningTaskMap.LoadAndDelete(task.TaskId); ok {
162+
atomic.AddInt32(&q.runningTaskCount, -1)
163+
}
157164
task.TaskStatus = lighttaskscheduler.TASK_STATUS_SUCCESS
158165
return task, nil
159166
}

container/persist_container/example_sql_container.go

Lines changed: 0 additions & 92 deletions
This file was deleted.

0 commit comments

Comments
 (0)