Skip to content

Commit a3a6048

Browse files
author
willzhen
committed
Add web service
1 parent cde2a90 commit a3a6048

File tree

4 files changed

+686
-200
lines changed

4 files changed

+686
-200
lines changed

example/videocut_example/video_cut/example_sql_container.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
framework "github.com/memory-overflow/light-task-scheduler"
1212
"gorm.io/driver/mysql"
1313
"gorm.io/gorm"
14+
"gorm.io/gorm/clause"
1415
gormlogger "gorm.io/gorm/logger"
1516
)
1617

@@ -75,7 +76,10 @@ func (e *videoCutSqlContainer) AddTask(ctx context.Context, ftask framework.Task
7576
task.StartAt = &t
7677
task.EndAt = nil
7778

78-
if err = db.Create(&task).Error; err != nil {
79+
if err = db.Clauses(clause.OnConflict{
80+
Columns: []clause.Column{{Name: "task_id"}},
81+
DoUpdates: clause.AssignmentColumns([]string{"status", "start_time", "end_time"}),
82+
}).Create(&task).Error; err != nil {
7983
err = fmt.Errorf("db create error: %v", err)
8084
log.Println(err)
8185
return err
@@ -357,3 +361,49 @@ func (e *videoCutSqlContainer) SaveData(ctx context.Context, ftask *framework.Ta
357361
ftask.TaskItem = task
358362
return nil
359363
}
364+
365+
// 下面的方法用来做其他的业务查询
366+
367+
// GetTaskInfoSet 分页拉取任务列表接口
368+
func (e *videoCutSqlContainer) GetTasks(ctx context.Context, pageNumber, pageSize int) (
369+
tasks []VideoCutTask, count int, err error) {
370+
db := e.db
371+
tempDb := db.Model(VideoCutTask{}).Where("delete_time is null")
372+
orderStr := "create_time DESC"
373+
var total int64
374+
tempDb.Count(&total)
375+
count = int(total)
376+
tempDb = tempDb.Order(orderStr).Offset((pageNumber - 1) * pageSize).Limit(pageSize)
377+
if e := tempDb.Find(&tasks).Error; e != nil {
378+
return nil, 0, e
379+
}
380+
return tasks, count, nil
381+
}
382+
383+
// CreateTask 创建任务
384+
func (e *videoCutSqlContainer) CreateTask(ctx context.Context, task VideoCutTask) (err error) {
385+
db := e.db
386+
if err = db.Model(&task).Create(&task).Error; err != nil {
387+
err = fmt.Errorf("db create error: %v", err)
388+
log.Println(err)
389+
return err
390+
}
391+
return nil
392+
}
393+
394+
// GetTask 根据 taskId 查询任务
395+
func (e *videoCutSqlContainer) GetTask(ctx context.Context, taskId string) (
396+
task *VideoCutTask, err error) {
397+
db := e.db
398+
tempTask := VideoCutTask{}
399+
if err = db.Model(&tempTask).Where("task_id = ?", taskId).First(&tempTask).Error; err != nil {
400+
if err == gorm.ErrRecordNotFound {
401+
return nil, fmt.Errorf("不存在的任务")
402+
} else {
403+
err = fmt.Errorf("db first error: %v", err)
404+
log.Println(err)
405+
}
406+
return nil, err
407+
}
408+
return &tempTask, nil
409+
}
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
package videocut
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io/ioutil"
8+
"net/http"
9+
"time"
10+
11+
framework "github.com/memory-overflow/light-task-scheduler"
12+
)
13+
14+
type webService struct {
15+
sqldao *videoCutSqlContainer
16+
sch framework.TaskScheduler
17+
}
18+
19+
type TaskInfo struct {
20+
TaskId string
21+
InputVideo string
22+
OutputVideo string
23+
CutStartTime float32
24+
CutEndTime float32
25+
Status string
26+
FailedReason string
27+
StartTime string
28+
EndTime string
29+
}
30+
31+
type CreateTaskReq struct {
32+
InputVideo string
33+
CutStartTime float32
34+
CutEndTime float32
35+
}
36+
37+
type CreateTaskRsp struct {
38+
TaskId string
39+
ErrorCode int
40+
ErrorMessage string
41+
}
42+
43+
type TaskListReq struct {
44+
PageSize int
45+
PageNumber int
46+
}
47+
48+
type TaskListRsp struct {
49+
Tasks []TaskInfo
50+
TotalCount int
51+
ErrorCode int
52+
ErrorMessage string
53+
}
54+
55+
type TaskOpReq struct {
56+
TaskId string
57+
}
58+
59+
type TaskOpRsp struct {
60+
TaskId string
61+
ErrorCode int
62+
ErrorMessage string
63+
}
64+
65+
func getTaskInfoByTask(task *VideoCutTask) (info TaskInfo) {
66+
m := map[framework.TaskStatus]string{
67+
framework.TASK_STATUS_INVALID: "未知状态",
68+
framework.TASK_STATUS_UNSTART: "为开始",
69+
framework.TASK_STATUS_WAITING: "等待执行",
70+
framework.TASK_STATUS_RUNNING: "执行中",
71+
framework.TASK_STATUS_SUCCESS: "执行成功",
72+
framework.TASK_STATUS_FAILED: "执行失败",
73+
framework.TASK_STATUS_STOPED: "已暂停",
74+
}
75+
info.TaskId = task.TaskId
76+
info.InputVideo = task.InputVideo
77+
info.OutputVideo = task.OutputVideo
78+
info.CutStartTime = task.CutStartTime
79+
info.CutEndTime = task.CutEndTime
80+
info.Status = m[task.Status]
81+
info.FailedReason = task.FailedReason
82+
if task.StartAt != nil {
83+
info.StartTime = task.StartAt.Format("2006-01-02 15:04:05")
84+
}
85+
if task.EndAt != nil {
86+
info.EndTime = task.EndAt.Format("2006-01-02 15:04:05")
87+
}
88+
89+
return info
90+
}
91+
92+
func (web webService) taskList(w http.ResponseWriter, r *http.Request) {
93+
input, _ := ioutil.ReadAll(r.Body)
94+
var req TaskListReq
95+
json.Unmarshal(input, &req)
96+
rsp := TaskListRsp{
97+
ErrorCode: 0,
98+
ErrorMessage: "success",
99+
}
100+
tasks, count, err := web.sqldao.GetTasks(context.Background(), req.PageNumber, req.PageSize)
101+
if err != nil {
102+
rsp.ErrorCode = 1001
103+
rsp.ErrorMessage = err.Error()
104+
}
105+
rsp.TotalCount = count
106+
for _, task := range tasks {
107+
rsp.Tasks = append(rsp.Tasks, getTaskInfoByTask(&task))
108+
}
109+
bs, _ := json.Marshal(rsp)
110+
w.Write(bs)
111+
}
112+
113+
func (web webService) createTask(w http.ResponseWriter, r *http.Request) {
114+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
115+
defer cancel()
116+
input, _ := ioutil.ReadAll(r.Body)
117+
var req CreateTaskReq
118+
json.Unmarshal(input, &req)
119+
rsp := CreateTaskRsp{
120+
ErrorCode: 0,
121+
ErrorMessage: "success",
122+
}
123+
taskId := "task-" + GenerateRandomString(8)
124+
err := web.sqldao.CreateTask(ctx, VideoCutTask{
125+
TaskId: taskId,
126+
InputVideo: req.InputVideo,
127+
CutStartTime: req.CutStartTime,
128+
CutEndTime: req.CutEndTime,
129+
Status: framework.TASK_STATUS_UNSTART,
130+
})
131+
if err != nil {
132+
rsp.ErrorCode = 1001
133+
rsp.ErrorMessage = err.Error()
134+
} else {
135+
rsp.TaskId = taskId
136+
}
137+
bs, _ := json.Marshal(rsp)
138+
w.Write(bs)
139+
}
140+
141+
func (web webService) startTask(w http.ResponseWriter, r *http.Request) {
142+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
143+
defer cancel()
144+
input, _ := ioutil.ReadAll(r.Body)
145+
var req TaskOpReq
146+
json.Unmarshal(input, &req)
147+
rsp := TaskOpRsp{
148+
ErrorCode: 0,
149+
ErrorMessage: "success",
150+
}
151+
defer func() {
152+
bs, _ := json.Marshal(rsp)
153+
w.Write(bs)
154+
}()
155+
task, err := web.sqldao.GetTask(ctx, req.TaskId)
156+
if err != nil {
157+
rsp.ErrorCode = 1001
158+
rsp.ErrorMessage = err.Error()
159+
return
160+
}
161+
if task.Status == framework.TASK_STATUS_EXPORTING ||
162+
task.Status == framework.TASK_STATUS_WAITING ||
163+
task.Status == framework.TASK_STATUS_RUNNING {
164+
rsp.ErrorCode = 1002
165+
rsp.ErrorMessage = "任务已经在执行中或者等待队列中"
166+
return
167+
}
168+
err = web.sch.AddTask(ctx, framework.Task{
169+
TaskId: task.TaskId,
170+
TaskItem: task,
171+
})
172+
if err != nil {
173+
rsp.ErrorCode = 1001
174+
rsp.ErrorMessage = err.Error()
175+
return
176+
}
177+
rsp.TaskId = task.TaskId
178+
}
179+
180+
func (web webService) stopTask(w http.ResponseWriter, r *http.Request) {
181+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
182+
defer cancel()
183+
input, _ := ioutil.ReadAll(r.Body)
184+
var req TaskOpReq
185+
json.Unmarshal(input, &req)
186+
rsp := TaskOpRsp{
187+
ErrorCode: 0,
188+
ErrorMessage: "success",
189+
}
190+
defer func() {
191+
bs, _ := json.Marshal(rsp)
192+
w.Write(bs)
193+
}()
194+
task, err := web.sqldao.GetTask(ctx, req.TaskId)
195+
if err != nil {
196+
rsp.ErrorCode = 1001
197+
rsp.ErrorMessage = err.Error()
198+
return
199+
}
200+
if task.Status == framework.TASK_STATUS_EXPORTING ||
201+
task.Status == framework.TASK_STATUS_WAITING ||
202+
task.Status == framework.TASK_STATUS_RUNNING {
203+
err = web.sch.StopTask(ctx, &framework.Task{
204+
TaskId: task.TaskId,
205+
TaskItem: task,
206+
})
207+
if err != nil {
208+
rsp.ErrorCode = 1001
209+
rsp.ErrorMessage = err.Error()
210+
return
211+
}
212+
} else {
213+
rsp.ErrorCode = 1002
214+
rsp.ErrorMessage = "任务不在分析中"
215+
return
216+
}
217+
rsp.TaskId = task.TaskId
218+
}
219+
220+
func (web webService) deleteTask(w http.ResponseWriter, r *http.Request) {
221+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
222+
defer cancel()
223+
input, _ := ioutil.ReadAll(r.Body)
224+
var req TaskOpReq
225+
json.Unmarshal(input, &req)
226+
rsp := TaskOpRsp{
227+
ErrorCode: 0,
228+
ErrorMessage: "success",
229+
}
230+
defer func() {
231+
bs, _ := json.Marshal(rsp)
232+
w.Write(bs)
233+
}()
234+
task, err := web.sqldao.GetTask(ctx, req.TaskId)
235+
if err != nil {
236+
rsp.ErrorCode = 1001
237+
rsp.ErrorMessage = err.Error()
238+
return
239+
}
240+
if task.Status == framework.TASK_STATUS_EXPORTING ||
241+
task.Status == framework.TASK_STATUS_WAITING ||
242+
task.Status == framework.TASK_STATUS_RUNNING {
243+
rsp.ErrorCode = 1003
244+
rsp.ErrorMessage = "执行中、等待中的任务无法直接删除"
245+
return
246+
}
247+
_, err = web.sqldao.ToDeleteStatus(ctx, &framework.Task{
248+
TaskId: task.TaskId,
249+
TaskItem: task,
250+
})
251+
if err != nil {
252+
rsp.ErrorCode = 1001
253+
rsp.ErrorMessage = err.Error()
254+
return
255+
}
256+
rsp.TaskId = task.TaskId
257+
}
258+
259+
func StartWebServer(container *videoCutSqlContainer) {
260+
ws := webService{
261+
sqldao: container,
262+
}
263+
http.HandleFunc("/TaskList", ws.taskList)
264+
http.HandleFunc("/CreateTask", ws.createTask)
265+
http.HandleFunc("/StartTask", ws.startTask)
266+
http.HandleFunc("/StopTask", ws.stopTask)
267+
http.HandleFunc("/DeleteTask", ws.deleteTask)
268+
fmt.Printf("start videocut service ...\n")
269+
http.ListenAndServe("127.0.0.1:8000", nil)
270+
}

0 commit comments

Comments
 (0)