Skip to content

Commit 0af4037

Browse files
Merge pull request #11 from memory-overflow/dev/docker_actuator
Add docker actuator
2 parents d09dba4 + 054e41a commit 0af4037

File tree

5 files changed

+441
-2
lines changed

5 files changed

+441
-2
lines changed

actuator/docker_actuator.go

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// 封装好的 docker 执行器
2+
3+
package actuator
4+
5+
import (
6+
"context"
7+
"errors"
8+
"fmt"
9+
"sync"
10+
"time"
11+
12+
"github.com/docker/docker/api/types"
13+
"github.com/docker/docker/api/types/container"
14+
"github.com/docker/docker/api/types/mount"
15+
dockerclient "github.com/docker/docker/client"
16+
"github.com/docker/go-connections/nat"
17+
framework "github.com/memory-overflow/light-task-scheduler"
18+
)
19+
20+
// DockerTask docker 任务
21+
type DockerTask struct {
22+
Image string // 镜像
23+
Cmd []string // 容器执行命令
24+
ContainerName string // 容器名,不能重复
25+
26+
// 资源限制
27+
MemoryLimit int64 // bytes
28+
CpuPercent int // cpu 占用百分比, 比如暂用 2 核就是 200
29+
30+
// 可选配置
31+
ExposedPorts []string // 暴露端口
32+
Env []string // 环境变量
33+
WorkingDir string // 工作目录
34+
NetworkDisabled bool // 是否关闭网络
35+
NetworkMode string // 网络模式
36+
Privileged bool // Is the container in privileged mode
37+
VolumeBinds map[string]string // host 路径到容器路径的映射,类似于 -v 参数
38+
39+
ContainerId string // 执行后对应的 容器 Id
40+
}
41+
42+
// dockerActuator docker 执行器
43+
type dockerActuator struct {
44+
initFunc InitFunction // 初始函数
45+
callbackChannel chan framework.Task // 回调队列
46+
47+
runningTask sync.Map // ContainerName -> containerId 映射
48+
datatMap sync.Map // ContainerName -> interface{} 映射
49+
}
50+
51+
func MakeDockerActuator(initFunc InitFunction) *dockerActuator {
52+
return &dockerActuator{
53+
initFunc: initFunc,
54+
}
55+
}
56+
57+
// SetCallbackChannel 任务配置回调 channel
58+
func (dc *dockerActuator) SetCallbackChannel(callbackChannel chan framework.Task) {
59+
dc.callbackChannel = callbackChannel
60+
}
61+
62+
// Init 任务在被调度前的初始化工作
63+
func (dc *dockerActuator) Init(ctx context.Context, task *framework.Task) (
64+
newTask *framework.Task, err error) {
65+
if dc.initFunc != nil {
66+
return dc.initFunc(ctx, task)
67+
}
68+
return task, nil
69+
}
70+
71+
// Start 执行任务
72+
func (dc *dockerActuator) Start(ctx context.Context, ftask *framework.Task) (
73+
newTask *framework.Task, ignoreErr bool, err error) {
74+
var task DockerTask
75+
if v, ok := ftask.TaskItem.(DockerTask); ok {
76+
task = v
77+
} else if v, ok := ftask.TaskItem.(*DockerTask); ok {
78+
task = *v
79+
} else {
80+
return ftask, false, fmt.Errorf("TaskItem is not configured as DockerTask")
81+
}
82+
cli, err := dockerclient.NewClientWithOpts(dockerclient.WithAPIVersionNegotiation())
83+
if err != nil {
84+
return ftask, false, fmt.Errorf("new docker client with error: %v", err)
85+
}
86+
exposed := nat.PortSet{}
87+
for _, port := range task.ExposedPorts {
88+
exposed[nat.Port(port)] = struct{}{}
89+
}
90+
mounts := []mount.Mount{}
91+
for src, dst := range task.VolumeBinds {
92+
mounts = append(mounts, mount.Mount{
93+
Type: "bind",
94+
Source: src,
95+
Target: dst,
96+
})
97+
}
98+
resp, err := cli.ContainerCreate(ctx,
99+
&container.Config{
100+
Image: task.Image,
101+
Cmd: task.Cmd,
102+
Env: task.Env,
103+
WorkingDir: task.WorkingDir,
104+
ExposedPorts: exposed,
105+
},
106+
&container.HostConfig{
107+
AutoRemove: true,
108+
Mounts: mounts,
109+
Resources: container.Resources{
110+
Memory: task.MemoryLimit,
111+
CPUPeriod: 100000,
112+
CPUQuota: int64(task.CpuPercent) * 100000 / 100,
113+
},
114+
},
115+
nil, nil, task.ContainerName)
116+
if err != nil {
117+
return ftask, false, fmt.Errorf("create container error: %v", err)
118+
}
119+
120+
// 启动容器
121+
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
122+
return ftask, false, fmt.Errorf("start container error: %v", err)
123+
}
124+
125+
ftask.TaskStatus = framework.TASK_STATUS_RUNNING
126+
ftask.TaskStartTime = time.Now()
127+
task.ContainerId = resp.ID
128+
ftask.TaskItem = task
129+
dc.runningTask.Store(task.ContainerName, resp.ID)
130+
131+
return ftask, false, nil
132+
}
133+
134+
// Stop 停止任务
135+
func (dc *dockerActuator) Stop(ctx context.Context, ftask *framework.Task) error {
136+
var task DockerTask
137+
if v, ok := ftask.TaskItem.(DockerTask); ok {
138+
task = v
139+
} else if v, ok := ftask.TaskItem.(*DockerTask); ok {
140+
task = *v
141+
} else {
142+
return fmt.Errorf("TaskItem is not configured as DockerTask")
143+
}
144+
145+
cli, err := dockerclient.NewClientWithOpts(dockerclient.WithAPIVersionNegotiation())
146+
if err != nil {
147+
return fmt.Errorf("new docker client error: %v", err)
148+
}
149+
if v, ok := dc.runningTask.Load(task.ContainerName); ok {
150+
id := v.(string)
151+
err := cli.ContainerRemove(ctx, id, types.ContainerRemoveOptions{Force: true})
152+
if err != nil {
153+
return fmt.Errorf("remove container with id %s error: %v", id, err)
154+
}
155+
dc.runningTask.Delete(task.ContainerName)
156+
}
157+
158+
return nil
159+
}
160+
161+
// GetAsyncTaskStatus 批量获取任务状态
162+
func (fc *dockerActuator) GetAsyncTaskStatus(ctx context.Context, ftasks []framework.Task) (
163+
status []framework.AsyncTaskStatus, err error) {
164+
for i, ftask := range ftasks {
165+
var task DockerTask
166+
if v, ok := ftask.TaskItem.(DockerTask); ok {
167+
task = v
168+
} else if v, ok := ftask.TaskItem.(*DockerTask); ok {
169+
task = *v
170+
} else {
171+
return nil, fmt.Errorf("tasks[%d].TaskItem is not configured as DockerTask", i)
172+
}
173+
cli, err := dockerclient.NewClientWithOpts(dockerclient.WithAPIVersionNegotiation())
174+
if err != nil {
175+
return nil, fmt.Errorf("new docker client error: %v", err)
176+
}
177+
178+
v, ok := fc.runningTask.Load(task.ContainerName)
179+
if !ok {
180+
status = append(status, framework.AsyncTaskStatus{
181+
TaskStatus: framework.TASK_STATUS_FAILED,
182+
FailedReason: errors.New("同步任务未找到"),
183+
Progress: float32(0.0),
184+
})
185+
} else {
186+
id := v.(string)
187+
stats, err := cli.ContainerInspect(ctx, id)
188+
if err != nil {
189+
return nil, fmt.Errorf("get container stats for id %s error: %v", id, err)
190+
}
191+
st := framework.AsyncTaskStatus{
192+
TaskStatus: framework.TASK_STATUS_RUNNING,
193+
Progress: float32(0.0),
194+
}
195+
if stats.State.Status != "running" {
196+
if stats.State.ExitCode == 0 {
197+
st.TaskStatus = framework.TASK_STATUS_SUCCESS
198+
} else {
199+
st.TaskStatus = framework.TASK_STATUS_FAILED
200+
st.FailedReason = fmt.Errorf(stats.State.Error)
201+
}
202+
}
203+
if st.TaskStatus != framework.TASK_STATUS_RUNNING {
204+
fc.runningTask.Delete(ftask.TaskId) // delete task status after query if task finished
205+
cli.ContainerRemove(ctx, id, types.ContainerRemoveOptions{Force: true})
206+
}
207+
status = append(status, st)
208+
}
209+
}
210+
return status, nil
211+
}
212+
213+
// GetOutput ...
214+
func (dc *dockerActuator) GetOutput(ctx context.Context, ftask *framework.Task) (
215+
data interface{}, err error) {
216+
var task DockerTask
217+
if v, ok := ftask.TaskItem.(DockerTask); ok {
218+
task = v
219+
} else if v, ok := ftask.TaskItem.(*DockerTask); ok {
220+
task = *v
221+
} else {
222+
return nil, fmt.Errorf("TaskItem is not configured as DockerTask")
223+
}
224+
if v, ok := dc.datatMap.LoadAndDelete(task.ContainerName); ok {
225+
return v, nil
226+
} else {
227+
return nil, fmt.Errorf("task data not found")
228+
}
229+
}

example/docker_example/main.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"strconv"
6+
"time"
7+
8+
"context"
9+
10+
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
11+
"github.com/memory-overflow/light-task-scheduler/actuator"
12+
memeorycontainer "github.com/memory-overflow/light-task-scheduler/container/memory_container"
13+
)
14+
15+
func main() {
16+
container := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond)
17+
scanInterval := 50 * time.Millisecond
18+
// 构建裁剪任务执行器
19+
act := actuator.MakeDockerActuator(nil)
20+
sch, _ := lighttaskscheduler.MakeScheduler(
21+
container, act, nil,
22+
lighttaskscheduler.Config{
23+
TaskLimit: 2, // 2 并发
24+
TaskTimeout: 60 * time.Second, // 60s 超时时间
25+
SchedulingPollInterval: scanInterval,
26+
StatePollInterval: scanInterval,
27+
EnableFinshedTaskList: true,
28+
},
29+
)
30+
31+
for i := 0; i < 10; i++ {
32+
sch.AddTask(context.Background(),
33+
lighttaskscheduler.Task{
34+
TaskId: strconv.Itoa(i),
35+
TaskItem: actuator.DockerTask{
36+
Image: "centos:7",
37+
Cmd: []string{"sh", "-c", "echo 'helloworld'; sleep 1"},
38+
ContainerName: "helloworld-" + strconv.Itoa(i),
39+
VolumeBinds: map[string]string{
40+
"/home": "/data",
41+
},
42+
MemoryLimit: 10 * 1024 * 1024, // 10 MB
43+
CpuPercent: 100,
44+
},
45+
})
46+
}
47+
48+
for task := range sch.FinshedTasks() {
49+
if task.TaskStatus == lighttaskscheduler.TASK_STATUS_FAILED {
50+
log.Printf("failed task %s, reason: %s, timecost: %dms, attempt times: %d\n",
51+
task.TaskId, task.FailedReason, task.TaskEnbTime.Sub(task.TaskStartTime).Milliseconds(), task.TaskAttemptsTime)
52+
} else if task.TaskStatus == lighttaskscheduler.TASK_STATUS_SUCCESS {
53+
log.Printf("success task %s, timecost: %dms, attempt times: %d\n", task.TaskId,
54+
task.TaskEnbTime.Sub(task.TaskStartTime).Milliseconds(), task.TaskAttemptsTime)
55+
}
56+
}
57+
58+
}

go.mod

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,18 @@ module github.com/memory-overflow/light-task-scheduler
33
go 1.16
44

55
require (
6+
github.com/Microsoft/go-winio v0.6.1 // indirect
7+
github.com/docker/distribution v2.8.2+incompatible // indirect
8+
github.com/docker/docker v24.0.6+incompatible // indirect
9+
github.com/docker/go-connections v0.4.0 // indirect
10+
github.com/docker/go-units v0.5.0 // indirect
11+
github.com/gogo/protobuf v1.3.2 // indirect
12+
github.com/json-iterator/go v1.1.12 // indirect
613
github.com/memory-overflow/go-common-library v0.0.0-20230427064346-aef3d86a1c60
714
github.com/memory-overflow/go-orderedmap v0.0.0-20230427064227-758a452e8a9c
15+
github.com/opencontainers/go-digest v1.0.0 // indirect
16+
github.com/opencontainers/image-spec v1.0.2 // indirect
17+
github.com/pkg/errors v0.9.1 // indirect
818
gorm.io/driver/mysql v1.5.1
919
gorm.io/gorm v1.25.1
1020
)

0 commit comments

Comments
 (0)