Skip to content

Commit d90f8e7

Browse files
Update code
1 parent fc75752 commit d90f8e7

File tree

5 files changed

+104
-173
lines changed

5 files changed

+104
-173
lines changed

actuator/function_actuator.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@ import (
66
"context"
77
"errors"
88
"fmt"
9+
"runtime/debug"
910
"sync"
1011
"time"
1112

1213
framework "github.com/memory-overflow/light-task-scheduler"
13-
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
1414
)
1515

1616
// fucntionActuator 函数执行器,同步任务异步化的示例
1717
type fucntionActuator struct {
1818
runFunc RunFunction // 执行函数
19+
initFunc InitFunction // 初始函数
1920
callbackChannel chan framework.Task // 回调队列
2021

2122
runningTask sync.Map // taskId -> [framework.AsyncTaskStatus, cancel function] 映射
@@ -27,22 +28,32 @@ type fucntionActuator struct {
2728
// data 任务执行完成返回的数据
2829
type RunFunction func(ctx context.Context, task *framework.Task) (data interface{}, err error)
2930

31+
// InitFunction 任务执行前的初始工作
32+
type InitFunction func(ctx context.Context, task *framework.Task) (newTask *framework.Task, err error)
33+
3034
// runFunc 待调度的执行函数,注意实现该函数的时候,需要使用传入 ctx 进行超时和退出处理,框架否则无法控制超时时间
3135
// callbackChannel 用来执行器进行任务回调,返回已经完成的任务,如果不需要回调,传入 nil 即可
32-
func MakeFucntionActuator(runFunc RunFunction, callbackChannel chan framework.Task) (*fucntionActuator, error) {
36+
func MakeFucntionActuator(runFunc RunFunction, initFunc InitFunction) (*fucntionActuator, error) {
3337
if runFunc == nil {
3438
return nil, fmt.Errorf("runFunc is nil")
3539
}
3640
return &fucntionActuator{
37-
runFunc: runFunc,
38-
callbackChannel: callbackChannel,
41+
runFunc: runFunc,
42+
initFunc: initFunc,
3943
}, nil
4044
}
4145

46+
// SetCallbackChannel 任务配置回调 channel
47+
func (fc *fucntionActuator) SetCallbackChannel(callbackChannel chan framework.Task) {
48+
fc.callbackChannel = callbackChannel
49+
}
50+
4251
// Init 任务在被调度前的初始化工作
4352
func (fc *fucntionActuator) Init(ctx context.Context, task *framework.Task) (
4453
newTask *framework.Task, err error) {
45-
// 无初始化工作
54+
if fc.initFunc != nil {
55+
return fc.initFunc(ctx, task)
56+
}
4657
return task, nil
4758
}
4859

@@ -51,7 +62,7 @@ func (fc *fucntionActuator) Start(ctx context.Context, ftask *framework.Task) (
5162
newTask *framework.Task, ignoreErr bool, err error) {
5263
if st, ok := fc.runningTask.Load(ftask.TaskId); ok {
5364
status := st.([]interface{})[0].(framework.AsyncTaskStatus).TaskStatus
54-
if status == lighttaskscheduler.TASK_STATUS_RUNNING || status == lighttaskscheduler.TASK_STATUS_SUCCESS {
65+
if status == framework.TASK_STATUS_RUNNING || status == framework.TASK_STATUS_SUCCESS {
5566
// 任务已经在执行中,不能重复执行
5667
return ftask, false, fmt.Errorf("task %s is running", ftask.TaskId)
5768
}
@@ -69,7 +80,16 @@ func (fc *fucntionActuator) Start(ctx context.Context, ftask *framework.Task) (
6980
}, cancel})
7081

7182
go func() {
72-
data, err := fc.runFunc(runCtx, ftask)
83+
data, err := func() (data interface{}, err error) {
84+
defer func() {
85+
if p := recover(); p != nil {
86+
data = nil
87+
88+
err = fmt.Errorf("panic: %v, stacktrace: %s", p, debug.Stack())
89+
}
90+
}()
91+
return fc.runFunc(runCtx, ftask)
92+
}()
7393
st, ok := fc.runningTask.Load(ftask.TaskId)
7494
if !ok {
7595
// 任务可能因为超时被删除,或者手动暂停、不处理

example/add_example/main.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,16 @@ import (
1111
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
1212
"github.com/memory-overflow/light-task-scheduler/actuator"
1313
memeorycontainer "github.com/memory-overflow/light-task-scheduler/container/memory_container"
14-
addexample "github.com/memory-overflow/light-task-scheduler/example/add_example/add"
1514
)
1615

16+
// AddTask add 任务结构
17+
type AddTask struct {
18+
StartTime time.Time
19+
A, B int32
20+
}
21+
1722
func add(ctx context.Context, ftask *lighttaskscheduler.Task) (data interface{}, err error) {
18-
task, ok := ftask.TaskItem.(addexample.AddTask)
23+
task, ok := ftask.TaskItem.(AddTask)
1924
if !ok {
2025
return nil, fmt.Errorf("TaskItem not be set to AddTask")
2126
}
@@ -69,7 +74,8 @@ func (rec addCallbackReceiver) GetCallbackChannel(ctx context.Context) chan ligh
6974
func modeCallback() *lighttaskscheduler.TaskScheduler {
7075
container := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond)
7176
taskChannel := make(chan lighttaskscheduler.Task, 10000)
72-
actuator, err := actuator.MakeFucntionActuator(add, taskChannel)
77+
actuator, err := actuator.MakeFucntionActuator(add, nil)
78+
actuator.SetCallbackChannel(taskChannel)
7379
if err != nil {
7480
log.Fatal("make fucntionActuato error: ", err)
7581
}
@@ -99,7 +105,8 @@ func modeCallback() *lighttaskscheduler.TaskScheduler {
99105
func modePollingCallback() *lighttaskscheduler.TaskScheduler {
100106
container := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond)
101107
taskChannel := make(chan lighttaskscheduler.Task, 10000)
102-
actuator, err := actuator.MakeFucntionActuator(add, taskChannel)
108+
actuator, err := actuator.MakeFucntionActuator(add, nil)
109+
actuator.SetCallbackChannel(taskChannel)
103110
if err != nil {
104111
log.Fatal("make fucntionActuato error: ", err)
105112
}
@@ -141,7 +148,7 @@ func main() {
141148
sch.AddTask(context.Background(),
142149
lighttaskscheduler.Task{
143150
TaskId: strconv.Itoa(i),
144-
TaskItem: addexample.AddTask{
151+
TaskItem: AddTask{
145152
A: r.Int31() % 1000,
146153
B: r.Int31() % 1000,
147154
},

0 commit comments

Comments
 (0)