Skip to content

Commit 098b99e

Browse files
authored
[Fix #936] Adding Python implementation (#1027)
* [Fix #936] Adding Python implementation Signed-off-by: fjtirado <ftirados@redhat.com> * [Fix #936] Alternative approach Signed-off-by: fjtirado <ftirados@redhat.com> --------- Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 4e50003 commit 098b99e

File tree

32 files changed

+761
-466
lines changed

32 files changed

+761
-466
lines changed

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java

Lines changed: 14 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,21 @@
2828
import com.github.dockerjava.core.DockerClientImpl;
2929
import com.github.dockerjava.core.NameParser;
3030
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
31-
import io.serverlessworkflow.api.types.Container;
32-
import io.serverlessworkflow.api.types.ContainerLifetime;
33-
import io.serverlessworkflow.api.types.TimeoutAfter;
31+
import io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy;
3432
import io.serverlessworkflow.impl.TaskContext;
3533
import io.serverlessworkflow.impl.WorkflowContext;
36-
import io.serverlessworkflow.impl.WorkflowDefinition;
3734
import io.serverlessworkflow.impl.WorkflowModel;
3835
import io.serverlessworkflow.impl.WorkflowUtils;
3936
import io.serverlessworkflow.impl.WorkflowValueResolver;
37+
import io.serverlessworkflow.impl.executors.CallableTask;
4038
import java.io.IOException;
4139
import java.time.Duration;
42-
import java.util.ArrayList;
4340
import java.util.Collection;
4441
import java.util.Optional;
4542
import java.util.concurrent.CompletableFuture;
4643
import java.util.concurrent.TimeUnit;
4744

48-
class ContainerRunner {
45+
class ContainerRunner implements CallableTask {
4946

5047
private static final DefaultDockerClientConfig DEFAULT_CONFIG =
5148
DefaultDockerClientConfig.createDefaultConfigBuilder().build();
@@ -64,14 +61,19 @@ private static class DockerClientHolder {
6461
private final ContainerCleanupPolicy policy;
6562
private final String containerImage;
6663

67-
private ContainerRunner(ContainerRunnerBuilder builder) {
68-
this.propertySetters = builder.propertySetters;
69-
this.timeout = Optional.ofNullable(builder.timeout);
70-
this.policy = builder.policy;
71-
this.containerImage = builder.containerImage;
64+
public ContainerRunner(
65+
Collection<ContainerPropertySetter> propertySetters,
66+
Optional<WorkflowValueResolver<Duration>> timeout,
67+
ContainerCleanupPolicy policy,
68+
String containerImage) {
69+
this.propertySetters = propertySetters;
70+
this.timeout = timeout;
71+
this.policy = policy;
72+
this.containerImage = containerImage;
7273
}
7374

74-
CompletableFuture<WorkflowModel> start(
75+
@Override
76+
public CompletableFuture<WorkflowModel> apply(
7577
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
7678
return CompletableFuture.supplyAsync(
7779
() -> startSync(workflowContext, taskContext, input),
@@ -215,52 +217,4 @@ private static RuntimeException mapExitCode(int exit) {
215217
private static RuntimeException failed(String message) {
216218
return new RuntimeException(message);
217219
}
218-
219-
static ContainerRunnerBuilder builder() {
220-
return new ContainerRunnerBuilder();
221-
}
222-
223-
public static class ContainerRunnerBuilder {
224-
private Container container;
225-
private WorkflowDefinition definition;
226-
private WorkflowValueResolver<Duration> timeout;
227-
private ContainerCleanupPolicy policy;
228-
private String containerImage;
229-
private Collection<ContainerPropertySetter> propertySetters = new ArrayList<>();
230-
231-
private ContainerRunnerBuilder() {}
232-
233-
ContainerRunnerBuilder withContainer(Container container) {
234-
this.container = container;
235-
return this;
236-
}
237-
238-
public ContainerRunnerBuilder withWorkflowDefinition(WorkflowDefinition definition) {
239-
this.definition = definition;
240-
return this;
241-
}
242-
243-
ContainerRunner build() {
244-
propertySetters.add(new NamePropertySetter(definition, container));
245-
propertySetters.add(new CommandPropertySetter(definition, container));
246-
propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container));
247-
propertySetters.add(new LifetimePropertySetter(container));
248-
propertySetters.add(new PortsPropertySetter(container));
249-
propertySetters.add(new VolumesPropertySetter(definition, container));
250-
251-
containerImage = container.getImage();
252-
if (containerImage == null || container.getImage().isBlank()) {
253-
throw new IllegalArgumentException("Container image must be provided");
254-
}
255-
ContainerLifetime lifetime = container.getLifetime();
256-
if (lifetime != null) {
257-
policy = lifetime.getCleanup();
258-
TimeoutAfter afterTimeout = lifetime.getAfter();
259-
if (afterTimeout != null)
260-
timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout);
261-
}
262-
263-
return new ContainerRunner(this);
264-
}
265-
}
266220
}

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java

Lines changed: 0 additions & 51 deletions
This file was deleted.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.container.executors;
17+
18+
import io.serverlessworkflow.api.types.Container;
19+
import io.serverlessworkflow.api.types.ContainerLifetime;
20+
import io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy;
21+
import io.serverlessworkflow.api.types.RunContainer;
22+
import io.serverlessworkflow.api.types.RunTaskConfiguration;
23+
import io.serverlessworkflow.api.types.TimeoutAfter;
24+
import io.serverlessworkflow.impl.WorkflowDefinition;
25+
import io.serverlessworkflow.impl.WorkflowUtils;
26+
import io.serverlessworkflow.impl.WorkflowValueResolver;
27+
import io.serverlessworkflow.impl.executors.CallableTask;
28+
import io.serverlessworkflow.impl.executors.RunnableTaskBuilder;
29+
import java.time.Duration;
30+
import java.util.ArrayList;
31+
import java.util.Collection;
32+
import java.util.Optional;
33+
34+
public class RunContainerExecutorBuilder implements RunnableTaskBuilder<RunContainer> {
35+
36+
@Override
37+
public CallableTask build(RunContainer taskConfiguration, WorkflowDefinition definition) {
38+
Collection<ContainerPropertySetter> propertySetters = new ArrayList<>();
39+
Container container = taskConfiguration.getContainer();
40+
propertySetters.add(new NamePropertySetter(definition, container));
41+
propertySetters.add(new CommandPropertySetter(definition, container));
42+
propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container));
43+
propertySetters.add(new LifetimePropertySetter(container));
44+
propertySetters.add(new PortsPropertySetter(container));
45+
propertySetters.add(new VolumesPropertySetter(definition, container));
46+
47+
ContainerCleanupPolicy policy = null;
48+
WorkflowValueResolver<Duration> timeout = null;
49+
ContainerLifetime lifetime = container.getLifetime();
50+
if (lifetime != null) {
51+
policy = lifetime.getCleanup();
52+
TimeoutAfter afterTimeout = lifetime.getAfter();
53+
if (afterTimeout != null)
54+
timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout);
55+
}
56+
return new ContainerRunner(
57+
propertySetters, Optional.ofNullable(timeout), policy, container.getImage());
58+
}
59+
60+
@Override
61+
public boolean accept(Class<? extends RunTaskConfiguration> clazz) {
62+
return RunContainer.class.equals(clazz);
63+
}
64+
}

impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask renamed to impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTaskBuilder

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
io.serverlessworkflow.impl.container.executors.RunContainerExecutor
1+
io.serverlessworkflow.impl.container.executors.RunContainerExecutorBuilder

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class WorkflowMutableInstance implements WorkflowInstance {
4848
protected AtomicReference<CompletableFuture<WorkflowModel>> futureRef = new AtomicReference<>();
4949
protected Instant completedAt;
5050

51-
protected final Map<String, Object> additionalObjects = new ConcurrentHashMap<String, Object>();
51+
protected final Map<String, Object> additionalObjects = new ConcurrentHashMap<>();
5252

5353
private Lock statusLock = new ReentrantLock();
5454
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;

impl/core/src/main/java/io/serverlessworkflow/impl/config/AbstractConfigManager.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
import java.time.Instant;
1919
import java.time.OffsetDateTime;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.Collections;
2023
import java.util.Optional;
2124

2225
public abstract class AbstractConfigManager implements ConfigManager {
@@ -56,5 +59,19 @@ protected <T> T convert(String value, Class<T> propClass) {
5659
return propClass.cast(result);
5760
}
5861

62+
@Override
63+
public <T> Collection<T> multiConfig(String propName, Class<T> propClass) {
64+
String multiValue = get(propName);
65+
if (multiValue != null) {
66+
Collection<T> result = new ArrayList<>();
67+
for (String value : multiValue.split(",")) {
68+
result.add(convert(value, propClass));
69+
}
70+
return result;
71+
} else {
72+
return Collections.emptyList();
73+
}
74+
}
75+
5976
protected abstract <T> T convertComplex(String value, Class<T> propClass);
6077
}

impl/core/src/main/java/io/serverlessworkflow/impl/config/ConfigManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,17 @@
1616
package io.serverlessworkflow.impl.config;
1717

1818
import io.serverlessworkflow.impl.ServicePriority;
19+
import java.util.Collection;
20+
import java.util.List;
1921
import java.util.Optional;
2022

2123
public interface ConfigManager extends ServicePriority {
2224

2325
<T> Optional<T> config(String propName, Class<T> propClass);
2426

27+
default <T> Collection<T> multiConfig(String propName, Class<T> propClass) {
28+
return List.of();
29+
}
30+
2531
Iterable<String> names();
2632
}

0 commit comments

Comments
 (0)