Skip to content

Commit fc75752

Browse files
Update demo
1 parent c015d6c commit fc75752

File tree

4 files changed

+76
-55
lines changed

4 files changed

+76
-55
lines changed

example/videocut_example/main_demo/main.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,36 +23,40 @@ func main() {
2323

2424
// 构建队列容器,队列长度 10000
2525
var container lighttaskscheduler.TaskContainer
26+
var persistencer lighttaskscheduler.TaskdataPersistencer
2627
var scanInterval time.Duration // 调度器扫描间隔时间
28+
2729
if mode == "queue" {
28-
save := func(ctx context.Context, ftask *lighttaskscheduler.Task, data interface{}) error {
29-
log.Printf("save task %s: output_video = %s \n", ftask.TaskId, data.(string))
30-
return nil
31-
} // 处理结果的回调函数
32-
container = memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond, save)
30+
persistencer = nil
31+
container = memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond)
3332
scanInterval = 50 * time.Millisecond
3433
} else if mode == "sql" {
3534
var err error
36-
container, err = videocut.MakeVideoCutSqlContainer("127.0.0.1", "3306", "root", "Zgh123456789.", "test")
35+
videocutContainer, err := videocut.MakeVideoCutSqlContainer("127.0.0.1", "3306", "root", "", "test")
3736
if err != nil {
3837
log.Fatalf("build container failed: %v\n", err)
3938
}
39+
persistencer = videocutContainer
40+
container = videocutContainer
4041
scanInterval = 2 * time.Second
4142
}
4243

4344
// 构建裁剪任务执行器
4445
actuator := videocut.MakeVideoCutActuator()
45-
sch := lighttaskscheduler.MakeNewScheduler(
46-
context.Background(),
47-
container, actuator,
46+
sch, err := lighttaskscheduler.MakeScheduler(
47+
container, actuator, persistencer,
4848
lighttaskscheduler.Config{
49-
TaskLimit: 2, // 2 并发
50-
ScanInterval: scanInterval,
51-
TaskTimeout: 20 * time.Second, // 20s 超时时间
52-
EnableFinshedTaskList: true, // 开启已完成任务返回功能
49+
TaskLimit: 2, // 2 并发
50+
TaskTimeout: 20 * time.Second, // 20s 超时时间
51+
EnableFinshedTaskList: true, // 开启已完成任务返回功能
52+
SchedulingPollInterval: scanInterval,
53+
DisableStatePoll: false,
54+
StatePollInterval: scanInterval,
5355
},
5456
)
55-
57+
if err != nil {
58+
log.Fatalf("make schedule failed: %v\n", err)
59+
}
5660
// 添加任务,把视频裁前 100s 剪成 10s 一个的视频
5761
for i := 0; i < 100; i += 10 {
5862
// 这里的任务 ID 是为了调度框架方便标识唯一任务的ID, 和微服务的任务ID不同,是映射关系
Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package main
22

33
import (
4-
"context"
54
"log"
65
"time"
76

@@ -14,28 +13,30 @@ import (
1413
func main() {
1514

1615
// 构建队列容器,队列长度 10000
17-
queueContainer := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond, nil)
16+
queueContainer := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond)
1817
scanInterval := 50 * time.Millisecond
1918
// 替换自己的数据库
2019
sqlContainer, err := videocut.MakeVideoCutSqlContainer("127.0.0.1", "3306", "root", "", "test")
2120
if err != nil {
2221
log.Fatalf("build container failed: %v\n", err)
2322
}
24-
2523
comb := container.MakeCombinationContainer(queueContainer, sqlContainer)
2624
// 构建裁剪任务执行器
2725
actuator := videocut.MakeVideoCutActuator()
28-
sch := lighttaskscheduler.MakeNewScheduler(
29-
context.Background(),
30-
comb, actuator,
26+
sch, err := lighttaskscheduler.MakeScheduler(
27+
comb, actuator, sqlContainer,
3128
lighttaskscheduler.Config{
32-
TaskLimit: 2, // 2 并发
33-
ScanInterval: scanInterval,
34-
TaskTimeout: 120 * time.Second, // 120s 超时时间
35-
EnableFinshedTaskList: false, // 开启已完成任务返回功能
29+
TaskLimit: 2, // 2 并发
30+
TaskTimeout: 60 * time.Second, // 20s 超时时间
31+
EnableFinshedTaskList: true, // 开启已完成任务返回功能
32+
SchedulingPollInterval: scanInterval,
33+
DisableStatePoll: false,
34+
StatePollInterval: scanInterval, // 开启已完成任务返回功能
3635
},
3736
)
38-
37+
if err != nil {
38+
log.Fatalf("make schedule failed: %v\n", err)
39+
}
3940
go videocut.StartServer() // start video cut microservice
4041
videocut.StartWebServer(sqlContainer, sch) // start web server
4142
}

example/videocut_example/video_cut/example_sql_container.go

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,10 @@ func (e *videoCutSqlContainer) ToRunningStatus(ctx context.Context, ftask *frame
164164
t := time.Now()
165165
sql := db.Model(&VideoCutTask{}).Where("task_id = ? and status = ?", ftask.TaskId, ftask.TaskStatus).
166166
Updates(map[string]interface{}{
167-
"status": framework.TASK_STATUS_RUNNING,
168-
"start_time": t,
169-
"work_task_id": task.WorkTaskId,
167+
"status": framework.TASK_STATUS_RUNNING,
168+
"start_time": t,
169+
"work_task_id": task.WorkTaskId,
170+
"attempts_time": ftask.TaskAttemptsTime,
170171
})
171172
if sql.Error != nil {
172173
return ftask, fmt.Errorf("db update error: %v", sql.Error)
@@ -194,7 +195,6 @@ func (e *videoCutSqlContainer) ToStopStatus(ctx context.Context, ftask *framewor
194195
return ftask, fmt.Errorf("TaskItem not be set to VideoCutTask")
195196
}
196197
db := e.db
197-
db = db.Debug()
198198
sql := db.Model(&VideoCutTask{}).Where("task_id = ? and status = ?", task.TaskId, task.Status).
199199
Update("status", framework.TASK_STATUS_STOPED)
200200
if sql.Error != nil {
@@ -334,33 +334,7 @@ func (e *videoCutSqlContainer) UpdateRunningTaskStatus(ctx context.Context,
334334
// data 协议保持和 TaskActuator.GetOutput 一样, 一个 string 表示结果路径
335335
func (e *videoCutSqlContainer) SaveData(ctx context.Context, ftask *framework.Task,
336336
data interface{}) (err error) {
337-
defer func() {
338-
if err != nil {
339-
log.Println("SaveData:", err)
340-
}
341-
}()
342337

343-
task, ok := ftask.TaskItem.(VideoCutTask)
344-
if !ok {
345-
return fmt.Errorf("TaskItem not be set to VideoCutTask")
346-
}
347-
db := e.db
348-
outputVideo, ok := data.(string)
349-
if !ok {
350-
return fmt.Errorf("data not be set to string")
351-
}
352-
sql := db.Model(&VideoCutTask{}).Where("task_id = ?", ftask.TaskId).
353-
Updates(map[string]interface{}{
354-
"output_video": outputVideo,
355-
})
356-
if sql.Error != nil {
357-
return fmt.Errorf("db update error: %v", sql.Error)
358-
}
359-
if sql.RowsAffected == 0 {
360-
return fmt.Errorf("task %s not found, may status has been changed", task.TaskId)
361-
}
362-
task.OutputVideo = outputVideo
363-
ftask.TaskItem = task
364338
return nil
365339
}
366340

@@ -409,3 +383,44 @@ func (e *videoCutSqlContainer) GetTask(ctx context.Context, taskId string) (
409383
}
410384
return &tempTask, nil
411385
}
386+
387+
func (e videoCutSqlContainer) DataPersistence(
388+
ctx context.Context, ftask *framework.Task, data interface{}) (err error) {
389+
defer func() {
390+
if err != nil {
391+
log.Println("SaveData:", err)
392+
}
393+
}()
394+
395+
task, ok := ftask.TaskItem.(VideoCutTask)
396+
if !ok {
397+
return fmt.Errorf("TaskItem not be set to VideoCutTask")
398+
}
399+
db := e.db
400+
outputVideo, ok := data.(string)
401+
if !ok {
402+
return fmt.Errorf("data not be set to string")
403+
}
404+
sql := db.Model(&VideoCutTask{}).Where("task_id = ?", ftask.TaskId).
405+
Updates(map[string]interface{}{
406+
"output_video": outputVideo,
407+
})
408+
if sql.Error != nil {
409+
return fmt.Errorf("db update error: %v", sql.Error)
410+
}
411+
if sql.RowsAffected == 0 {
412+
return fmt.Errorf("task %s not found, may status has been changed", task.TaskId)
413+
}
414+
return nil
415+
}
416+
417+
// GetPersistenceData 查询任务持久化结果
418+
func (e videoCutSqlContainer) GetPersistenceData(ctx context.Context, task *framework.Task) (
419+
data interface{}, err error) {
420+
return nil, nil
421+
}
422+
423+
// DeletePersistenceData 删除任务的此久化结果
424+
func (e videoCutSqlContainer) DeletePersistenceData(ctx context.Context, task *framework.Task) (err error) {
425+
return nil
426+
}

example/videocut_example/video_cut/video_cut_task.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type VideoCutTask struct {
2222
DeletedAt gorm.DeletedAt `gorm:"default:NULL;column:delete_time"`
2323
StartAt *time.Time `gorm:"default:NULL;column:start_time"` // 任务开始时间
2424
EndAt *time.Time `gorm:"default:NULL;column:end_time"` // 任务结束时间
25+
AttemptsTime int `gorm:"default:0"` // 重试次数
2526
}
2627

2728
// TableName 更改数据库表名

0 commit comments

Comments
 (0)