Skip to content

Commit 157aff3

Browse files
Add web demo
1 parent a3a6048 commit 157aff3

File tree

11 files changed

+476
-390
lines changed

11 files changed

+476
-390
lines changed

container/combination_container.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,15 @@ func (c *combinationContainer) UpdateRunningTaskStatus(ctx context.Context,
160160
}
161161
return nil
162162
}
163+
164+
// SaveData 导出任务输出,自行处理任务结果
165+
func (c *combinationContainer) SaveData(ctx context.Context, ftask *lighttaskscheduler.Task,
166+
data interface{}) error {
167+
if err := c.memeoryContainer.SaveData(ctx, ftask, data); err != nil {
168+
return err
169+
}
170+
if err := c.persistContainer.SaveData(ctx, ftask, data); err != nil {
171+
return err
172+
}
173+
return nil
174+
}

container/memory_container/queue_container.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (q *queueContainer) UpdateRunningTaskStatus(ctx context.Context,
168168
return nil
169169
}
170170

171-
// ExportOutput 导出任务输出,自行处理任务结果
171+
// SaveData 保存任务结果
172172
func (q *queueContainer) SaveData(ctx context.Context, ftask *lighttaskscheduler.Task,
173173
data interface{}) error {
174174
if q.savefunc != nil {
File renamed without changes.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
8+
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
9+
"github.com/memory-overflow/light-task-scheduler/container"
10+
memeorycontainer "github.com/memory-overflow/light-task-scheduler/container/memory_container"
11+
videocut "github.com/memory-overflow/light-task-scheduler/example/videocut_example/video_cut"
12+
)
13+
14+
func main() {
15+
16+
// 构建队列容器,队列长度 10000
17+
queueContainer := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond, nil)
18+
scanInterval := 50 * time.Millisecond
19+
// 替换自己的数据库
20+
sqlContainer, err := videocut.MakeVideoCutSqlContainer("127.0.0.1", "3306", "root", "", "test")
21+
if err != nil {
22+
log.Fatalf("build container failed: %v\n", err)
23+
}
24+
25+
comb := container.MakeCombinationContainer(queueContainer, sqlContainer)
26+
// 构建裁剪任务执行器
27+
actuator := videocut.MakeVideoCutActuator()
28+
sch := lighttaskscheduler.MakeNewScheduler(
29+
context.Background(),
30+
comb, actuator,
31+
lighttaskscheduler.Config{
32+
TaskLimit: 2, // 2 并发
33+
ScanInterval: scanInterval,
34+
TaskTimeout: 120 * time.Second, // 120s 超时时间
35+
EnableFinshedTaskList: false, // 开启已完成任务返回功能
36+
},
37+
)
38+
39+
go videocut.StartServer() // start video cut microservice
40+
videocut.StartWebServer(sqlContainer, sch) // start web server
41+
}

example/videocut_example/video_cut/example_sql_container.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (e *videoCutSqlContainer) AddTask(ctx context.Context, ftask framework.Task
6868

6969
task, ok := ftask.TaskItem.(VideoCutTask)
7070
if !ok {
71-
return fmt.Errorf("TaskItem not be set to AddTask")
71+
return fmt.Errorf("TaskItem not be set to VideoCutTask")
7272
}
7373
db := e.db
7474
t := time.Now()
@@ -158,7 +158,7 @@ func (e *videoCutSqlContainer) ToRunningStatus(ctx context.Context, ftask *frame
158158
}()
159159
task, ok := ftask.TaskItem.(VideoCutTask)
160160
if !ok {
161-
return ftask, fmt.Errorf("TaskItem not be set to AddTask")
161+
return ftask, fmt.Errorf("TaskItem not be set to VideoCutTask")
162162
}
163163
db := e.db
164164
t := time.Now()
@@ -191,14 +191,16 @@ func (e *videoCutSqlContainer) ToStopStatus(ctx context.Context, ftask *framewor
191191

192192
task, ok := ftask.TaskItem.(VideoCutTask)
193193
if !ok {
194-
return ftask, fmt.Errorf("TaskItem not be set to AddTask")
194+
return ftask, fmt.Errorf("TaskItem not be set to VideoCutTask")
195195
}
196196
db := e.db
197-
sql := db.Model(&VideoCutTask{}).Where("task_id = ? and status = ?", ftask.TaskId, ftask.TaskStatus).
197+
db = db.Debug()
198+
sql := db.Model(&VideoCutTask{}).Where("task_id = ? and status = ?", task.TaskId, task.Status).
198199
Update("status", framework.TASK_STATUS_STOPED)
199200
if sql.Error != nil {
200201
return ftask, fmt.Errorf("db update error: %v", sql.Error)
201202
}
203+
fmt.Println(sql.RowsAffected)
202204
if sql.RowsAffected == 0 {
203205
return ftask, fmt.Errorf("task %s not found, may status has been changed", task.TaskId)
204206
}
@@ -217,10 +219,10 @@ func (e *videoCutSqlContainer) ToDeleteStatus(ctx context.Context, ftask *framew
217219
}()
218220
task, ok := ftask.TaskItem.(VideoCutTask)
219221
if !ok {
220-
return ftask, fmt.Errorf("TaskItem not be set to AddTask")
222+
return ftask, fmt.Errorf("TaskItem not be set to VideoCutTask")
221223
}
222224
db := e.db
223-
sql := db.Model(&VideoCutTask{}).Delete("task_id = ? and status = ?", ftask.TaskId, ftask.TaskStatus)
225+
sql := db.Where("task_id = ? and status = ?", task.TaskId, task.Status).Delete(&VideoCutTask{})
224226
if sql.Error != nil {
225227
return ftask, fmt.Errorf("db delete error: %v", sql.Error)
226228
}
@@ -242,7 +244,7 @@ func (e *videoCutSqlContainer) ToFailedStatus(ctx context.Context, ftask *framew
242244
}()
243245
task, ok := ftask.TaskItem.(VideoCutTask)
244246
if !ok {
245-
return ftask, fmt.Errorf("TaskItem not be set to AddTask")
247+
return ftask, fmt.Errorf("TaskItem not be set to VideoCutTask")
246248
}
247249
db := e.db
248250
t := time.Now()
@@ -275,7 +277,7 @@ func (e *videoCutSqlContainer) ToExportStatus(ctx context.Context, ftask *framew
275277
}()
276278
task, ok := ftask.TaskItem.(VideoCutTask)
277279
if !ok {
278-
return ftask, fmt.Errorf("TaskItem not be set to AddTask")
280+
return ftask, fmt.Errorf("TaskItem not be set to VideoCutTask")
279281
}
280282
db := e.db
281283
sql := db.Model(&VideoCutTask{}).Where("task_id = ? and status = ?", ftask.TaskId, ftask.TaskStatus).
@@ -301,7 +303,7 @@ func (e *videoCutSqlContainer) ToSuccessStatus(ctx context.Context, ftask *frame
301303
}()
302304
task, ok := ftask.TaskItem.(VideoCutTask)
303305
if !ok {
304-
return ftask, fmt.Errorf("TaskItem not be set to AddTask")
306+
return ftask, fmt.Errorf("TaskItem not be set to VideoCutTask")
305307
}
306308
db := e.db
307309
t := time.Now()
@@ -340,7 +342,7 @@ func (e *videoCutSqlContainer) SaveData(ctx context.Context, ftask *framework.Ta
340342

341343
task, ok := ftask.TaskItem.(VideoCutTask)
342344
if !ok {
343-
return fmt.Errorf("TaskItem not be set to AddTask")
345+
return fmt.Errorf("TaskItem not be set to VideoCutTask")
344346
}
345347
db := e.db
346348
outputVideo, ok := data.(string)

0 commit comments

Comments
 (0)