1515 */
1616package io .serverlessworkflow .impl .container .executors ;
1717
18- import static io .serverlessworkflow .api .types .ContainerLifetime .ContainerCleanupPolicy . *;
18+ import static io .serverlessworkflow .api .types .ContainerLifetime .*;
1919
2020import com .github .dockerjava .api .DockerClient ;
2121import com .github .dockerjava .api .command .CreateContainerCmd ;
2222import com .github .dockerjava .api .command .CreateContainerResponse ;
23+ import com .github .dockerjava .api .command .PullImageResultCallback ;
2324import com .github .dockerjava .api .command .WaitContainerResultCallback ;
24- import com .github .dockerjava .api .exception .DockerClientException ;
2525import com .github .dockerjava .core .DefaultDockerClientConfig ;
2626import com .github .dockerjava .core .DockerClientImpl ;
27+ import com .github .dockerjava .core .NameParser ;
2728import com .github .dockerjava .httpclient5 .ApacheDockerHttpClient ;
2829import io .serverlessworkflow .api .types .Container ;
2930import io .serverlessworkflow .impl .TaskContext ;
3233import io .serverlessworkflow .impl .WorkflowModel ;
3334import io .serverlessworkflow .impl .WorkflowUtils ;
3435import io .serverlessworkflow .impl .WorkflowValueResolver ;
35- import java .io .IOException ;
3636import java .time .Duration ;
3737import java .util .ArrayList ;
3838import java .util .List ;
@@ -71,48 +71,124 @@ private ContainerRunner(
7171 CompletableFuture <WorkflowModel > startSync (
7272 WorkflowContext workflowContext , TaskContext taskContext , WorkflowModel input ) {
7373
74- StringExpressionResolver resolver =
75- new StringExpressionResolver (workflowContext , taskContext , input );
74+ try {
75+ var resolver = new StringExpressionResolver (workflowContext , taskContext , input );
76+ applyPropertySetters (resolver );
77+ pullImageIfNeeded (container .getName ());
7678
77- propertySetters .forEach (setter -> setter .accept (resolver ));
79+ String id = createAndStartContainer ();
80+ int exit = waitAccordingToLifetime (id , workflowContext , taskContext , input );
7881
79- CreateContainerResponse createContainerResponse = createContainerCmd .exec ();
80- String containerId = createContainerResponse .getId ();
82+ return mapExitCode (exit , input );
8183
82- if (containerId == null || containerId .isEmpty ()) {
83- return failed ("Container creation failed: empty container ID" );
84+ } catch (InterruptedException ie ) {
85+ Thread .currentThread ().interrupt ();
86+ return failed ("Interrupted while waiting for container" );
87+ } catch (Exception e ) {
88+ return failed ("Container run failed: " + e .getMessage ());
8489 }
90+ }
91+
92+ private void applyPropertySetters (StringExpressionResolver resolver ) {
93+ for (var setter : propertySetters ) setter .accept (resolver );
94+ }
95+
96+ private void pullImageIfNeeded (String imageRef ) throws InterruptedException {
97+ NameParser .ReposTag rt = NameParser .parseRepositoryTag (imageRef );
98+ NameParser .HostnameReposName hr = NameParser .resolveRepositoryName (imageRef );
8599
86- dockerClient .startContainerCmd (containerId ).exec ();
87-
88- int exitCode ;
89- try (WaitContainerResultCallback resultCallback =
90- dockerClient .waitContainerCmd (containerId ).exec (new WaitContainerResultCallback ())) {
91- if (container .getLifetime () != null
92- && container .getLifetime ().getCleanup () != null
93- && container .getLifetime ().getCleanup ().equals (EVENTUALLY )) {
94- try {
95- WorkflowValueResolver <Duration > durationResolver =
96- WorkflowUtils .fromTimeoutAfter (
97- definition .application (), container .getLifetime ().getAfter ());
98- Duration timeout = durationResolver .apply (workflowContext , taskContext , input );
99- exitCode = resultCallback .awaitStatusCode (timeout .toMillis (), TimeUnit .MILLISECONDS );
100- } catch (DockerClientException e ) {
101- return failed (
102- String .format ("Error while waiting for container to finish: %s " , e .getMessage ()));
103- } finally {
104- dockerClient .removeContainerCmd (containerId ).withForce (true ).exec ();
100+ String repository = hr .reposName ;
101+ String tag = rt .tag != null ? rt .tag : "latest" ;
102+
103+ dockerClient
104+ .pullImageCmd (repository )
105+ .withTag (tag )
106+ .exec (new PullImageResultCallback ())
107+ .awaitCompletion ();
108+ }
109+
110+ private String createAndStartContainer () {
111+ CreateContainerResponse resp = createContainerCmd .exec ();
112+ String id = resp .getId ();
113+ if (id == null || id .isEmpty ()) {
114+ throw new IllegalStateException ("Container creation failed: empty ID" );
115+ }
116+ dockerClient .startContainerCmd (id ).exec ();
117+ return id ;
118+ }
119+
120+ private int waitAccordingToLifetime (
121+ String id , WorkflowContext workflowContext , TaskContext taskContext , WorkflowModel input )
122+ throws Exception {
123+
124+ var lifetime = container .getLifetime ();
125+ var policy = lifetime != null ? lifetime .getCleanup () : null ;
126+
127+ try (var cb = dockerClient .waitContainerCmd (id ).exec (new WaitContainerResultCallback ())) {
128+
129+ if (policy == ContainerCleanupPolicy .EVENTUALLY ) {
130+ Duration timeout = resolveAfter (lifetime , workflowContext , taskContext , input );
131+ int exit = cb .awaitStatusCode (timeout .toMillis (), TimeUnit .MILLISECONDS );
132+
133+ if (isRunning (id )) {
134+ safeStop (id , Duration .ofSeconds (10 ));
105135 }
136+ safeRemove (id );
137+ return exit ;
138+
106139 } else {
107- exitCode = resultCallback .awaitStatusCode ();
140+ int exit = cb .awaitStatusCode ();
141+ if (policy == ContainerCleanupPolicy .ALWAYS ) {
142+ safeRemove (id );
143+ }
144+ return exit ;
108145 }
109- } catch (IOException e ) {
110- return failed (
111- String .format ("Error while waiting for container to finish: %s " , e .getMessage ()));
112146 }
147+ }
148+
149+ private Duration resolveAfter (
150+ io .serverlessworkflow .api .types .ContainerLifetime lifetime ,
151+ WorkflowContext workflowContext ,
152+ TaskContext taskContext ,
153+ WorkflowModel input ) {
154+
155+ if (lifetime == null || lifetime .getAfter () == null ) {
156+ return Duration .ZERO ;
157+ }
158+ WorkflowValueResolver <Duration > r =
159+ WorkflowUtils .fromTimeoutAfter (definition .application (), lifetime .getAfter ());
160+ return r .apply (workflowContext , taskContext , input );
161+ }
162+
163+ private boolean isRunning (String id ) {
164+ try {
165+ var st = dockerClient .inspectContainerCmd (id ).exec ().getState ();
166+ return st != null && Boolean .TRUE .equals (st .getRunning ());
167+ } catch (Exception e ) {
168+ return false ; // must be already removed
169+ }
170+ }
171+
172+ private void safeStop (String id , Duration timeout ) {
173+ try {
174+ dockerClient .stopContainerCmd (id ).withTimeout ((int ) Math .max (1 , timeout .toSeconds ())).exec ();
175+ } catch (Exception ignore ) {
176+ // we can ignore this
177+ }
178+ }
179+
180+ // must be removed because of withAutoRemove(true), but just in case
181+ private void safeRemove (String id ) {
182+ try {
183+ dockerClient .removeContainerCmd (id ).withForce (true ).exec ();
184+ } catch (Exception ignore ) {
185+ // we can ignore this
186+ }
187+ }
113188
114- return switch (exitCode ) {
115- case 0 -> CompletableFuture .completedFuture (input );
189+ private static <T > CompletableFuture <T > mapExitCode (int exit , T ok ) {
190+ return switch (exit ) {
191+ case 0 -> CompletableFuture .completedFuture (ok );
116192 case 1 -> failed ("General error (exit code 1)" );
117193 case 2 -> failed ("Shell syntax error (exit code 2)" );
118194 case 126 -> failed ("Command found but not executable (exit code 126)" );
@@ -121,7 +197,7 @@ CompletableFuture<WorkflowModel> startSync(
121197 case 137 -> failed ("Killed by SIGKILL (exit code 137)" );
122198 case 139 -> failed ("Segmentation fault (exit code 139)" );
123199 case 143 -> failed ("Terminated by SIGTERM (exit code 143)" );
124- default -> failed ("Process exited with code " + exitCode );
200+ default -> failed ("Process exited with code " + exit );
125201 };
126202 }
127203
0 commit comments