diff --git a/nextflow/module.properties b/nextflow/module.properties
index be867ce9..e79c9076 100644
--- a/nextflow/module.properties
+++ b/nextflow/module.properties
@@ -5,4 +5,4 @@ Description: This module provides the functionality \
License: Apache 2.0
LicenseURL: http://www.apache.org/licenses/LICENSE-2.0
SupportedDatabases: pgsql
-ManageVersion: false
+ManageVersion: true
diff --git a/nextflow/resources/schemas/dbscripts/nextflow.xml b/nextflow/resources/schemas/dbscripts/nextflow.xml
new file mode 100644
index 00000000..01709064
--- /dev/null
+++ b/nextflow/resources/schemas/dbscripts/nextflow.xml
@@ -0,0 +1,28 @@
+
+
+
+
+ Invocation counts to ensure unique NextFlow run names
+
+
+
+
+
+
\ No newline at end of file
diff --git a/nextflow/resources/schemas/dbscripts/postgresql/nextflow-0.000-25.000.sql b/nextflow/resources/schemas/dbscripts/postgresql/nextflow-0.000-25.000.sql
new file mode 100644
index 00000000..5f6a43a3
--- /dev/null
+++ b/nextflow/resources/schemas/dbscripts/postgresql/nextflow-0.000-25.000.sql
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2025 LabKey Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE SCHEMA nextflow;
+
+CREATE TABLE nextflow.Job
+(
+ JobId INTEGER NOT NULL,
+ InvocationCount INTEGER NOT NULL,
+ CONSTRAINT PK_Job PRIMARY KEY (JobId),
+ CONSTRAINT FK_Job_JobId FOREIGN KEY (JobId) REFERENCES pipeline.StatusFiles (RowId) ON DELETE CASCADE -- Automatically clean up when a job is deleted
+);
diff --git a/nextflow/src/org/labkey/nextflow/NextFlowController.java b/nextflow/src/org/labkey/nextflow/NextFlowController.java
index ea758886..736b8e4f 100644
--- a/nextflow/src/org/labkey/nextflow/NextFlowController.java
+++ b/nextflow/src/org/labkey/nextflow/NextFlowController.java
@@ -3,6 +3,7 @@
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.Logger;
import org.labkey.api.action.ApiResponse;
import org.labkey.api.action.ApiSimpleResponse;
import org.labkey.api.action.FormViewAction;
@@ -12,7 +13,6 @@
import org.labkey.api.data.PropertyManager;
import org.labkey.api.data.PropertyStore;
import org.labkey.api.pipeline.PipeRoot;
-import org.labkey.api.pipeline.PipelineJob;
import org.labkey.api.pipeline.PipelineProvider;
import org.labkey.api.pipeline.PipelineService;
import org.labkey.api.pipeline.PipelineStatusUrls;
@@ -31,6 +31,7 @@
import org.labkey.api.util.Path;
import org.labkey.api.util.URLHelper;
import org.labkey.api.util.element.Select;
+import org.labkey.api.util.logging.LogHelper;
import org.labkey.api.view.HtmlView;
import org.labkey.api.view.JspView;
import org.labkey.api.view.NavTree;
@@ -64,6 +65,8 @@ public class NextFlowController extends SpringActionController
private static final DefaultActionResolver _actionResolver = new DefaultActionResolver(NextFlowController.class);
public static final String NAME = "nextflow";
+ protected static final Logger LOG = LogHelper.getLogger(NextFlowPipelineJob.class, "LabKey UI and API for NextFlow usage");
+
public NextFlowController()
{
setActionResolver(_actionResolver);
@@ -326,8 +329,9 @@ public boolean handlePost(AnalyzeForm form, BindException errors) throws Excepti
{
ViewBackgroundInfo info = getViewBackgroundInfo();
PipeRoot root = PipelineService.get().findPipelineRoot(info.getContainer());
- PipelineJob job = NextFlowPipelineJob.create(info, root, configFile.toPath(), inputFiles.stream().map(File::toPath).toList());
+ NextFlowPipelineJob job = NextFlowPipelineJob.create(info, root, configFile.toPath(), inputFiles.stream().map(File::toPath).toList());
PipelineService.get().queueJob(job);
+ LOG.info("NextFlow job queued: {}", job.getJsonJobInfo(false));
}
}
diff --git a/nextflow/src/org/labkey/nextflow/NextFlowManager.java b/nextflow/src/org/labkey/nextflow/NextFlowManager.java
index e560e413..a1b115c7 100644
--- a/nextflow/src/org/labkey/nextflow/NextFlowManager.java
+++ b/nextflow/src/org/labkey/nextflow/NextFlowManager.java
@@ -3,8 +3,16 @@
import org.apache.commons.lang3.StringUtils;
import org.labkey.api.data.Container;
import org.labkey.api.data.CoreSchema;
+import org.labkey.api.data.DbSchema;
+import org.labkey.api.data.DbSchemaType;
import org.labkey.api.data.DbScope;
import org.labkey.api.data.PropertyManager;
+import org.labkey.api.data.SQLFragment;
+import org.labkey.api.data.SqlExecutor;
+import org.labkey.api.data.SqlSelector;
+import org.labkey.api.pipeline.PipelineService;
+import org.labkey.api.pipeline.PipelineStatusFile;
+import org.labkey.nextflow.pipeline.NextFlowPipelineJob;
import org.springframework.validation.BindException;
import java.nio.file.Files;
@@ -27,6 +35,8 @@ public class NextFlowManager
private static final String NEXTFLOW_S3_BUCKET_PATH = "s3BucketPath";
private static final String NEXTFLOW_API_KEY = "apiKey";
+ public static final String SCHEMA_NAME = "nextflow";
+
private static final String IS_NEXTFLOW_ENABLED = "enabled";
private static final NextFlowManager _instance = new NextFlowManager();
@@ -158,4 +168,42 @@ public void saveEnabledState(Container container, Boolean enabled)
map.save();
}
}
+
+ private DbSchema getDbSchema()
+ {
+ return DbSchema.get(SCHEMA_NAME, DbSchemaType.Module);
+ }
+
+ private Integer getJobId(NextFlowPipelineJob job)
+ {
+ PipelineStatusFile file = PipelineService.get().getStatusFile(job.getJobGUID());
+ return file == null ? null : file.getRowId();
+ }
+
+ public int getInvocationCount(NextFlowPipelineJob job)
+ {
+ return getInvocationCount(getJobId(job));
+ }
+
+ private int getInvocationCount(int jobId)
+ {
+ Integer result = new SqlSelector(getDbSchema(), new SQLFragment("SELECT InvocationCount FROM nextflow.Job WHERE JobId = ?", jobId)).getObject(Integer.class);
+ return result != null ? result.intValue() : 0;
+ }
+
+ public int incrementInvocationCount(NextFlowPipelineJob job)
+ {
+ int jobId = getJobId(job);
+ int current = getInvocationCount(jobId);
+ current++;
+ if (current == 1)
+ {
+ new SqlExecutor(getDbSchema()).execute(new SQLFragment("INSERT INTO nextflow.Job (JobId, InvocationCount) VALUES (?, ?)", jobId, current));
+ }
+ else
+ {
+ new SqlExecutor(getDbSchema()).execute(new SQLFragment("UPDATE nextflow.Job SET InvocationCount = ? WHERE JobId = ?", current, jobId));
+ }
+ return current;
+ }
}
diff --git a/nextflow/src/org/labkey/nextflow/NextFlowModule.java b/nextflow/src/org/labkey/nextflow/NextFlowModule.java
index 46853d27..83dac6fc 100644
--- a/nextflow/src/org/labkey/nextflow/NextFlowModule.java
+++ b/nextflow/src/org/labkey/nextflow/NextFlowModule.java
@@ -1,6 +1,7 @@
package org.labkey.nextflow;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import org.labkey.api.data.ContainerManager;
import org.labkey.api.module.ModuleContext;
import org.labkey.api.module.SpringModule;
@@ -40,13 +41,18 @@ protected void init()
@Override
public boolean hasScripts()
{
- return false;
+ return true;
}
@Override
- public @NotNull Collection getSchemaNames()
+ public @Nullable Double getSchemaVersion()
{
- return List.of();
+ return 25.000;
}
+ @Override
+ public @NotNull Collection getSchemaNames()
+ {
+ return List.of(NextFlowManager.SCHEMA_NAME);
+ }
}
diff --git a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java
index 8a3cd0c0..137a9dca 100644
--- a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java
+++ b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java
@@ -20,6 +20,7 @@
import org.labkey.api.util.StringUtilsLabKey;
import org.labkey.api.util.logging.LogHelper;
import org.labkey.api.view.ViewBackgroundInfo;
+import org.labkey.nextflow.NextFlowManager;
import java.io.BufferedWriter;
import java.io.File;
@@ -59,25 +60,24 @@ public NextFlowPipelineJob(ViewBackgroundInfo info, @NotNull PipeRoot root, Path
super(new NextFlowProtocol(), NextFlowPipelineProvider.NAME, info, root, config.getFileName().toString(), config, inputFiles, false, false);
this.config = config;
setLogFile(log);
- LOG.info("NextFlow job queued: {}", getJsonJobInfo(null));
}
- protected JSONObject getJsonJobInfo(Long invocationCount)
+ public JSONObject getJsonJobInfo(boolean includeInvocationCount)
{
JSONObject result = new JSONObject();
result.put("user", getUser().getEmail());
result.put("container", getContainer().getPath());
result.put("filePath", getLogFilePath().getParent().toString());
- result.put("runName", getNextFlowRunName(invocationCount));
+ result.put("runName", getNextFlowRunName(includeInvocationCount));
result.put("configFile", getConfig().getFileName().toString());
return result;
}
- protected String getNextFlowRunName(Long invocationCount)
+ protected String getNextFlowRunName(boolean includeInvocationCount)
{
PipelineStatusFile file = PipelineService.get().getStatusFile(getJobGUID());
String result = file == null ? "Unknown" : ("LabKeyJob" + file.getRowId());
- result += invocationCount == null ? "" : ("_" + invocationCount);
+ result += includeInvocationCount ? ("_" + NextFlowManager.get().getInvocationCount(this)) : "";
return result;
}
diff --git a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java
index 2749796f..1a64acc9 100644
--- a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java
+++ b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java
@@ -2,9 +2,6 @@
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
-import org.labkey.api.data.ContainerManager;
-import org.labkey.api.data.DbSequence;
-import org.labkey.api.data.DbSequenceManager;
import org.labkey.api.exp.XarFormatException;
import org.labkey.api.pipeline.AbstractTaskFactory;
import org.labkey.api.pipeline.AbstractTaskFactorySettings;
@@ -40,8 +37,6 @@ public class NextFlowRunTask extends WorkDirectoryTask
public static final String ACTION_NAME = "NextFlow";
- private static final DbSequence INVOCATION_SEQUENCE = DbSequenceManager.get(ContainerManager.getRoot(), NextFlowRunTask.class.getName());
-
public NextFlowRunTask(Factory factory, PipelineJob job)
{
super(factory, job);
@@ -54,9 +49,9 @@ public NextFlowRunTask(Factory factory, PipelineJob job)
// NextFlow requires a unique job name for every execution. Increment a counter to append as a suffix to
// ensure uniqueness
- long invocationCount = INVOCATION_SEQUENCE.next();
- INVOCATION_SEQUENCE.sync();
- NextFlowPipelineJob.LOG.info("Starting to execute NextFlow: {}", getJob().getJsonJobInfo(invocationCount));
+ NextFlowManager.get().incrementInvocationCount(getJob());
+
+ NextFlowPipelineJob.LOG.info("Starting to execute NextFlow: {}", getJob().getJsonJobInfo(true));
SecurityManager.TransformSession session = null;
boolean success = false;
@@ -83,10 +78,10 @@ public NextFlowRunTask(Factory factory, PipelineJob job)
File dir = getJob().getLogFile().getParentFile();
getJob().runSubProcess(secretsPB, dir);
- ProcessBuilder executionPB = new ProcessBuilder(getArgs(invocationCount));
+ ProcessBuilder executionPB = new ProcessBuilder(getArgs());
getJob().runSubProcess(executionPB, dir);
log.info("Job Finished");
- NextFlowPipelineJob.LOG.info("Finished executing NextFlow: {}", getJob().getJsonJobInfo(invocationCount));
+ NextFlowPipelineJob.LOG.info("Finished executing NextFlow: {}", getJob().getJsonJobInfo(true));
RecordedAction action = new RecordedAction(ACTION_NAME);
for (Path inputFile : getJob().getInputFilePaths())
@@ -110,7 +105,7 @@ public NextFlowRunTask(Factory factory, PipelineJob job)
}
if (!success)
{
- NextFlowPipelineJob.LOG.info("Failed executing NextFlow: {}", getJob().getJsonJobInfo(invocationCount));
+ NextFlowPipelineJob.LOG.info("Failed executing NextFlow: {}", getJob().getJsonJobInfo(true));
}
}
}
@@ -176,7 +171,7 @@ private boolean hasAwsSection(Path configFile) throws PipelineJobException
}
- private @NotNull List getArgs(long invocationCount) throws PipelineJobException
+ private @NotNull List getArgs() throws PipelineJobException
{
NextFlowConfiguration config = NextFlowManager.get().getConfiguration();
Path configFile = getJob().getConfig();
@@ -201,7 +196,7 @@ private boolean hasAwsSection(Path configFile) throws PipelineJobException
args.add("-c");
args.add(configFile.toAbsolutePath().toString());
args.add("-name");
- args.add(getJob().getNextFlowRunName(invocationCount));
+ args.add(getJob().getNextFlowRunName(true));
return args;
}