From 431c08c120b5bfa39e57f1da36928bf0185adfa4 Mon Sep 17 00:00:00 2001 From: labkey-jeckels Date: Tue, 18 Mar 2025 18:09:33 -0700 Subject: [PATCH] Switch to tracking NextFlow invocation counts per-job --- nextflow/module.properties | 2 +- .../resources/schemas/dbscripts/nextflow.xml | 28 +++++++++++ .../postgresql/nextflow-0.000-25.000.sql | 24 ++++++++++ .../labkey/nextflow/NextFlowController.java | 8 +++- .../org/labkey/nextflow/NextFlowManager.java | 48 +++++++++++++++++++ .../org/labkey/nextflow/NextFlowModule.java | 12 +++-- .../pipeline/NextFlowPipelineJob.java | 10 ++-- .../nextflow/pipeline/NextFlowRunTask.java | 21 ++++---- 8 files changed, 129 insertions(+), 24 deletions(-) create mode 100644 nextflow/resources/schemas/dbscripts/nextflow.xml create mode 100644 nextflow/resources/schemas/dbscripts/postgresql/nextflow-0.000-25.000.sql diff --git a/nextflow/module.properties b/nextflow/module.properties index be867ce9..e79c9076 100644 --- a/nextflow/module.properties +++ b/nextflow/module.properties @@ -5,4 +5,4 @@ Description: This module provides the functionality \ License: Apache 2.0 LicenseURL: http://www.apache.org/licenses/LICENSE-2.0 SupportedDatabases: pgsql -ManageVersion: false +ManageVersion: true diff --git a/nextflow/resources/schemas/dbscripts/nextflow.xml b/nextflow/resources/schemas/dbscripts/nextflow.xml new file mode 100644 index 00000000..01709064 --- /dev/null +++ b/nextflow/resources/schemas/dbscripts/nextflow.xml @@ -0,0 +1,28 @@ + + + + + Invocation counts to ensure unique NextFlow run names + + + + +
+
\ No newline at end of file diff --git a/nextflow/resources/schemas/dbscripts/postgresql/nextflow-0.000-25.000.sql b/nextflow/resources/schemas/dbscripts/postgresql/nextflow-0.000-25.000.sql new file mode 100644 index 00000000..5f6a43a3 --- /dev/null +++ b/nextflow/resources/schemas/dbscripts/postgresql/nextflow-0.000-25.000.sql @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2025 LabKey Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE SCHEMA nextflow; + +CREATE TABLE nextflow.Job +( + JobId INTEGER NOT NULL, + InvocationCount INTEGER NOT NULL, + CONSTRAINT PK_Job PRIMARY KEY (JobId), + CONSTRAINT FK_Job_JobId FOREIGN KEY (JobId) REFERENCES pipeline.StatusFiles (RowId) ON DELETE CASCADE -- Automatically clean up when a job is deleted +); diff --git a/nextflow/src/org/labkey/nextflow/NextFlowController.java b/nextflow/src/org/labkey/nextflow/NextFlowController.java index ea758886..736b8e4f 100644 --- a/nextflow/src/org/labkey/nextflow/NextFlowController.java +++ b/nextflow/src/org/labkey/nextflow/NextFlowController.java @@ -3,6 +3,7 @@ import lombok.Getter; import lombok.Setter; import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.Logger; import org.labkey.api.action.ApiResponse; import org.labkey.api.action.ApiSimpleResponse; import org.labkey.api.action.FormViewAction; @@ -12,7 +13,6 @@ import org.labkey.api.data.PropertyManager; import org.labkey.api.data.PropertyStore; import org.labkey.api.pipeline.PipeRoot; -import org.labkey.api.pipeline.PipelineJob; import org.labkey.api.pipeline.PipelineProvider; import org.labkey.api.pipeline.PipelineService; import org.labkey.api.pipeline.PipelineStatusUrls; @@ -31,6 +31,7 @@ import org.labkey.api.util.Path; import org.labkey.api.util.URLHelper; import org.labkey.api.util.element.Select; +import org.labkey.api.util.logging.LogHelper; import org.labkey.api.view.HtmlView; import org.labkey.api.view.JspView; import org.labkey.api.view.NavTree; @@ -64,6 +65,8 @@ public class NextFlowController extends SpringActionController private static final DefaultActionResolver _actionResolver = new DefaultActionResolver(NextFlowController.class); public static final String NAME = "nextflow"; + protected static final Logger LOG = LogHelper.getLogger(NextFlowPipelineJob.class, "LabKey UI and API for NextFlow usage"); + public NextFlowController() { setActionResolver(_actionResolver); @@ -326,8 +329,9 @@ public boolean handlePost(AnalyzeForm form, BindException errors) throws Excepti { ViewBackgroundInfo info = getViewBackgroundInfo(); PipeRoot root = PipelineService.get().findPipelineRoot(info.getContainer()); - PipelineJob job = NextFlowPipelineJob.create(info, root, configFile.toPath(), inputFiles.stream().map(File::toPath).toList()); + NextFlowPipelineJob job = NextFlowPipelineJob.create(info, root, configFile.toPath(), inputFiles.stream().map(File::toPath).toList()); PipelineService.get().queueJob(job); + LOG.info("NextFlow job queued: {}", job.getJsonJobInfo(false)); } } diff --git a/nextflow/src/org/labkey/nextflow/NextFlowManager.java b/nextflow/src/org/labkey/nextflow/NextFlowManager.java index e560e413..a1b115c7 100644 --- a/nextflow/src/org/labkey/nextflow/NextFlowManager.java +++ b/nextflow/src/org/labkey/nextflow/NextFlowManager.java @@ -3,8 +3,16 @@ import org.apache.commons.lang3.StringUtils; import org.labkey.api.data.Container; import org.labkey.api.data.CoreSchema; +import org.labkey.api.data.DbSchema; +import org.labkey.api.data.DbSchemaType; import org.labkey.api.data.DbScope; import org.labkey.api.data.PropertyManager; +import org.labkey.api.data.SQLFragment; +import org.labkey.api.data.SqlExecutor; +import org.labkey.api.data.SqlSelector; +import org.labkey.api.pipeline.PipelineService; +import org.labkey.api.pipeline.PipelineStatusFile; +import org.labkey.nextflow.pipeline.NextFlowPipelineJob; import org.springframework.validation.BindException; import java.nio.file.Files; @@ -27,6 +35,8 @@ public class NextFlowManager private static final String NEXTFLOW_S3_BUCKET_PATH = "s3BucketPath"; private static final String NEXTFLOW_API_KEY = "apiKey"; + public static final String SCHEMA_NAME = "nextflow"; + private static final String IS_NEXTFLOW_ENABLED = "enabled"; private static final NextFlowManager _instance = new NextFlowManager(); @@ -158,4 +168,42 @@ public void saveEnabledState(Container container, Boolean enabled) map.save(); } } + + private DbSchema getDbSchema() + { + return DbSchema.get(SCHEMA_NAME, DbSchemaType.Module); + } + + private Integer getJobId(NextFlowPipelineJob job) + { + PipelineStatusFile file = PipelineService.get().getStatusFile(job.getJobGUID()); + return file == null ? null : file.getRowId(); + } + + public int getInvocationCount(NextFlowPipelineJob job) + { + return getInvocationCount(getJobId(job)); + } + + private int getInvocationCount(int jobId) + { + Integer result = new SqlSelector(getDbSchema(), new SQLFragment("SELECT InvocationCount FROM nextflow.Job WHERE JobId = ?", jobId)).getObject(Integer.class); + return result != null ? result.intValue() : 0; + } + + public int incrementInvocationCount(NextFlowPipelineJob job) + { + int jobId = getJobId(job); + int current = getInvocationCount(jobId); + current++; + if (current == 1) + { + new SqlExecutor(getDbSchema()).execute(new SQLFragment("INSERT INTO nextflow.Job (JobId, InvocationCount) VALUES (?, ?)", jobId, current)); + } + else + { + new SqlExecutor(getDbSchema()).execute(new SQLFragment("UPDATE nextflow.Job SET InvocationCount = ? WHERE JobId = ?", current, jobId)); + } + return current; + } } diff --git a/nextflow/src/org/labkey/nextflow/NextFlowModule.java b/nextflow/src/org/labkey/nextflow/NextFlowModule.java index 46853d27..83dac6fc 100644 --- a/nextflow/src/org/labkey/nextflow/NextFlowModule.java +++ b/nextflow/src/org/labkey/nextflow/NextFlowModule.java @@ -1,6 +1,7 @@ package org.labkey.nextflow; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.labkey.api.data.ContainerManager; import org.labkey.api.module.ModuleContext; import org.labkey.api.module.SpringModule; @@ -40,13 +41,18 @@ protected void init() @Override public boolean hasScripts() { - return false; + return true; } @Override - public @NotNull Collection getSchemaNames() + public @Nullable Double getSchemaVersion() { - return List.of(); + return 25.000; } + @Override + public @NotNull Collection getSchemaNames() + { + return List.of(NextFlowManager.SCHEMA_NAME); + } } diff --git a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java index 8a3cd0c0..137a9dca 100644 --- a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java +++ b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowPipelineJob.java @@ -20,6 +20,7 @@ import org.labkey.api.util.StringUtilsLabKey; import org.labkey.api.util.logging.LogHelper; import org.labkey.api.view.ViewBackgroundInfo; +import org.labkey.nextflow.NextFlowManager; import java.io.BufferedWriter; import java.io.File; @@ -59,25 +60,24 @@ public NextFlowPipelineJob(ViewBackgroundInfo info, @NotNull PipeRoot root, Path super(new NextFlowProtocol(), NextFlowPipelineProvider.NAME, info, root, config.getFileName().toString(), config, inputFiles, false, false); this.config = config; setLogFile(log); - LOG.info("NextFlow job queued: {}", getJsonJobInfo(null)); } - protected JSONObject getJsonJobInfo(Long invocationCount) + public JSONObject getJsonJobInfo(boolean includeInvocationCount) { JSONObject result = new JSONObject(); result.put("user", getUser().getEmail()); result.put("container", getContainer().getPath()); result.put("filePath", getLogFilePath().getParent().toString()); - result.put("runName", getNextFlowRunName(invocationCount)); + result.put("runName", getNextFlowRunName(includeInvocationCount)); result.put("configFile", getConfig().getFileName().toString()); return result; } - protected String getNextFlowRunName(Long invocationCount) + protected String getNextFlowRunName(boolean includeInvocationCount) { PipelineStatusFile file = PipelineService.get().getStatusFile(getJobGUID()); String result = file == null ? "Unknown" : ("LabKeyJob" + file.getRowId()); - result += invocationCount == null ? "" : ("_" + invocationCount); + result += includeInvocationCount ? ("_" + NextFlowManager.get().getInvocationCount(this)) : ""; return result; } diff --git a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java index 2749796f..1a64acc9 100644 --- a/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java +++ b/nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java @@ -2,9 +2,6 @@ import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; -import org.labkey.api.data.ContainerManager; -import org.labkey.api.data.DbSequence; -import org.labkey.api.data.DbSequenceManager; import org.labkey.api.exp.XarFormatException; import org.labkey.api.pipeline.AbstractTaskFactory; import org.labkey.api.pipeline.AbstractTaskFactorySettings; @@ -40,8 +37,6 @@ public class NextFlowRunTask extends WorkDirectoryTask public static final String ACTION_NAME = "NextFlow"; - private static final DbSequence INVOCATION_SEQUENCE = DbSequenceManager.get(ContainerManager.getRoot(), NextFlowRunTask.class.getName()); - public NextFlowRunTask(Factory factory, PipelineJob job) { super(factory, job); @@ -54,9 +49,9 @@ public NextFlowRunTask(Factory factory, PipelineJob job) // NextFlow requires a unique job name for every execution. Increment a counter to append as a suffix to // ensure uniqueness - long invocationCount = INVOCATION_SEQUENCE.next(); - INVOCATION_SEQUENCE.sync(); - NextFlowPipelineJob.LOG.info("Starting to execute NextFlow: {}", getJob().getJsonJobInfo(invocationCount)); + NextFlowManager.get().incrementInvocationCount(getJob()); + + NextFlowPipelineJob.LOG.info("Starting to execute NextFlow: {}", getJob().getJsonJobInfo(true)); SecurityManager.TransformSession session = null; boolean success = false; @@ -83,10 +78,10 @@ public NextFlowRunTask(Factory factory, PipelineJob job) File dir = getJob().getLogFile().getParentFile(); getJob().runSubProcess(secretsPB, dir); - ProcessBuilder executionPB = new ProcessBuilder(getArgs(invocationCount)); + ProcessBuilder executionPB = new ProcessBuilder(getArgs()); getJob().runSubProcess(executionPB, dir); log.info("Job Finished"); - NextFlowPipelineJob.LOG.info("Finished executing NextFlow: {}", getJob().getJsonJobInfo(invocationCount)); + NextFlowPipelineJob.LOG.info("Finished executing NextFlow: {}", getJob().getJsonJobInfo(true)); RecordedAction action = new RecordedAction(ACTION_NAME); for (Path inputFile : getJob().getInputFilePaths()) @@ -110,7 +105,7 @@ public NextFlowRunTask(Factory factory, PipelineJob job) } if (!success) { - NextFlowPipelineJob.LOG.info("Failed executing NextFlow: {}", getJob().getJsonJobInfo(invocationCount)); + NextFlowPipelineJob.LOG.info("Failed executing NextFlow: {}", getJob().getJsonJobInfo(true)); } } } @@ -176,7 +171,7 @@ private boolean hasAwsSection(Path configFile) throws PipelineJobException } - private @NotNull List getArgs(long invocationCount) throws PipelineJobException + private @NotNull List getArgs() throws PipelineJobException { NextFlowConfiguration config = NextFlowManager.get().getConfiguration(); Path configFile = getJob().getConfig(); @@ -201,7 +196,7 @@ private boolean hasAwsSection(Path configFile) throws PipelineJobException args.add("-c"); args.add(configFile.toAbsolutePath().toString()); args.add("-name"); - args.add(getJob().getNextFlowRunName(invocationCount)); + args.add(getJob().getNextFlowRunName(true)); return args; }