Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion nextflow/module.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ Description: This module provides the functionality \
for running the NextFlow pipeline jobs on PanoramaWeb.
License: Apache 2.0
LicenseURL: http://www.apache.org/licenses/LICENSE-2.0
ManageVersion: false
SupportedDatabases: pgsql
ManageVersion: true
28 changes: 28 additions & 0 deletions nextflow/resources/schemas/dbscripts/nextflow.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

* Copyright (c) 2025 LabKey Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.

-->
<tables xsi:schemaLocation="http://labkey.org/data/xml ../../../../../schemas/tableInfo.xsd"
xmlns="http://labkey.org/data/xml" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<table tableName="Job" tableDbType="TABLE">
<description>Invocation counts to ensure unique NextFlow run names</description>
<columns>
<column columnName="JobId"/>
<column columnName="InvocationCount"/>
</columns>
</table>
</tables>
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2025 LabKey Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE SCHEMA nextflow;

CREATE TABLE nextflow.Job
(
JobId INTEGER NOT NULL,
InvocationCount INTEGER NOT NULL,
CONSTRAINT PK_Job PRIMARY KEY (JobId),
CONSTRAINT FK_Job_JobId FOREIGN KEY (JobId) REFERENCES pipeline.StatusFiles (RowId) ON DELETE CASCADE -- Automatically clean up when a job is deleted
);
8 changes: 6 additions & 2 deletions nextflow/src/org/labkey/nextflow/NextFlowController.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.labkey.api.action.ApiResponse;
import org.labkey.api.action.ApiSimpleResponse;
import org.labkey.api.action.FormViewAction;
Expand All @@ -12,7 +13,6 @@
import org.labkey.api.data.PropertyManager;
import org.labkey.api.data.PropertyStore;
import org.labkey.api.pipeline.PipeRoot;
import org.labkey.api.pipeline.PipelineJob;
import org.labkey.api.pipeline.PipelineProvider;
import org.labkey.api.pipeline.PipelineService;
import org.labkey.api.pipeline.PipelineStatusUrls;
Expand All @@ -31,6 +31,7 @@
import org.labkey.api.util.Path;
import org.labkey.api.util.URLHelper;
import org.labkey.api.util.element.Select;
import org.labkey.api.util.logging.LogHelper;
import org.labkey.api.view.HtmlView;
import org.labkey.api.view.JspView;
import org.labkey.api.view.NavTree;
Expand Down Expand Up @@ -64,6 +65,8 @@ public class NextFlowController extends SpringActionController
private static final DefaultActionResolver _actionResolver = new DefaultActionResolver(NextFlowController.class);
public static final String NAME = "nextflow";

protected static final Logger LOG = LogHelper.getLogger(NextFlowPipelineJob.class, "LabKey UI and API for NextFlow usage");

public NextFlowController()
{
setActionResolver(_actionResolver);
Expand Down Expand Up @@ -326,8 +329,9 @@ public boolean handlePost(AnalyzeForm form, BindException errors) throws Excepti
{
ViewBackgroundInfo info = getViewBackgroundInfo();
PipeRoot root = PipelineService.get().findPipelineRoot(info.getContainer());
PipelineJob job = NextFlowPipelineJob.create(info, root, configFile.toPath(), inputFiles.stream().map(File::toPath).toList());
NextFlowPipelineJob job = NextFlowPipelineJob.create(info, root, configFile.toPath(), inputFiles.stream().map(File::toPath).toList());
PipelineService.get().queueJob(job);
LOG.info("NextFlow job queued: {}", job.getJsonJobInfo(false));
}
}

Expand Down
48 changes: 48 additions & 0 deletions nextflow/src/org/labkey/nextflow/NextFlowManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,16 @@
import org.apache.commons.lang3.StringUtils;
import org.labkey.api.data.Container;
import org.labkey.api.data.CoreSchema;
import org.labkey.api.data.DbSchema;
import org.labkey.api.data.DbSchemaType;
import org.labkey.api.data.DbScope;
import org.labkey.api.data.PropertyManager;
import org.labkey.api.data.SQLFragment;
import org.labkey.api.data.SqlExecutor;
import org.labkey.api.data.SqlSelector;
import org.labkey.api.pipeline.PipelineService;
import org.labkey.api.pipeline.PipelineStatusFile;
import org.labkey.nextflow.pipeline.NextFlowPipelineJob;
import org.springframework.validation.BindException;

import java.nio.file.Files;
Expand All @@ -27,6 +35,8 @@ public class NextFlowManager
private static final String NEXTFLOW_S3_BUCKET_PATH = "s3BucketPath";
private static final String NEXTFLOW_API_KEY = "apiKey";

public static final String SCHEMA_NAME = "nextflow";

private static final String IS_NEXTFLOW_ENABLED = "enabled";

private static final NextFlowManager _instance = new NextFlowManager();
Expand Down Expand Up @@ -158,4 +168,42 @@ public void saveEnabledState(Container container, Boolean enabled)
map.save();
}
}

private DbSchema getDbSchema()
{
return DbSchema.get(SCHEMA_NAME, DbSchemaType.Module);
}

private Integer getJobId(NextFlowPipelineJob job)
{
PipelineStatusFile file = PipelineService.get().getStatusFile(job.getJobGUID());
return file == null ? null : file.getRowId();
}

public int getInvocationCount(NextFlowPipelineJob job)
{
return getInvocationCount(getJobId(job));
}

private int getInvocationCount(int jobId)
{
Integer result = new SqlSelector(getDbSchema(), new SQLFragment("SELECT InvocationCount FROM nextflow.Job WHERE JobId = ?", jobId)).getObject(Integer.class);
return result != null ? result.intValue() : 0;
}

public int incrementInvocationCount(NextFlowPipelineJob job)
{
int jobId = getJobId(job);
int current = getInvocationCount(jobId);
current++;
if (current == 1)
{
new SqlExecutor(getDbSchema()).execute(new SQLFragment("INSERT INTO nextflow.Job (JobId, InvocationCount) VALUES (?, ?)", jobId, current));
}
else
{
new SqlExecutor(getDbSchema()).execute(new SQLFragment("UPDATE nextflow.Job SET InvocationCount = ? WHERE JobId = ?", current, jobId));
}
return current;
}
}
12 changes: 9 additions & 3 deletions nextflow/src/org/labkey/nextflow/NextFlowModule.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.labkey.nextflow;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.labkey.api.data.ContainerManager;
import org.labkey.api.module.ModuleContext;
import org.labkey.api.module.SpringModule;
Expand Down Expand Up @@ -40,13 +41,18 @@ protected void init()
@Override
public boolean hasScripts()
{
return false;
return true;
}

@Override
public @NotNull Collection<String> getSchemaNames()
public @Nullable Double getSchemaVersion()
{
return List.of();
return 25.000;
}

@Override
public @NotNull Collection<String> getSchemaNames()
{
return List.of(NextFlowManager.SCHEMA_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.labkey.api.util.StringUtilsLabKey;
import org.labkey.api.util.logging.LogHelper;
import org.labkey.api.view.ViewBackgroundInfo;
import org.labkey.nextflow.NextFlowManager;

import java.io.BufferedWriter;
import java.io.File;
Expand Down Expand Up @@ -59,25 +60,24 @@ public NextFlowPipelineJob(ViewBackgroundInfo info, @NotNull PipeRoot root, Path
super(new NextFlowProtocol(), NextFlowPipelineProvider.NAME, info, root, config.getFileName().toString(), config, inputFiles, false, false);
this.config = config;
setLogFile(log);
LOG.info("NextFlow job queued: {}", getJsonJobInfo(null));
}

protected JSONObject getJsonJobInfo(Long invocationCount)
public JSONObject getJsonJobInfo(boolean includeInvocationCount)
{
JSONObject result = new JSONObject();
result.put("user", getUser().getEmail());
result.put("container", getContainer().getPath());
result.put("filePath", getLogFilePath().getParent().toString());
result.put("runName", getNextFlowRunName(invocationCount));
result.put("runName", getNextFlowRunName(includeInvocationCount));
result.put("configFile", getConfig().getFileName().toString());
return result;
}

protected String getNextFlowRunName(Long invocationCount)
protected String getNextFlowRunName(boolean includeInvocationCount)
{
PipelineStatusFile file = PipelineService.get().getStatusFile(getJobGUID());
String result = file == null ? "Unknown" : ("LabKeyJob" + file.getRowId());
result += invocationCount == null ? "" : ("_" + invocationCount);
result += includeInvocationCount ? ("_" + NextFlowManager.get().getInvocationCount(this)) : "";
return result;
}

Expand Down
21 changes: 8 additions & 13 deletions nextflow/src/org/labkey/nextflow/pipeline/NextFlowRunTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.labkey.api.data.ContainerManager;
import org.labkey.api.data.DbSequence;
import org.labkey.api.data.DbSequenceManager;
import org.labkey.api.exp.XarFormatException;
import org.labkey.api.pipeline.AbstractTaskFactory;
import org.labkey.api.pipeline.AbstractTaskFactorySettings;
Expand Down Expand Up @@ -40,8 +37,6 @@ public class NextFlowRunTask extends WorkDirectoryTask<NextFlowRunTask.Factory>

public static final String ACTION_NAME = "NextFlow";

private static final DbSequence INVOCATION_SEQUENCE = DbSequenceManager.get(ContainerManager.getRoot(), NextFlowRunTask.class.getName());

public NextFlowRunTask(Factory factory, PipelineJob job)
{
super(factory, job);
Expand All @@ -54,9 +49,9 @@ public NextFlowRunTask(Factory factory, PipelineJob job)

// NextFlow requires a unique job name for every execution. Increment a counter to append as a suffix to
// ensure uniqueness
long invocationCount = INVOCATION_SEQUENCE.next();
INVOCATION_SEQUENCE.sync();
NextFlowPipelineJob.LOG.info("Starting to execute NextFlow: {}", getJob().getJsonJobInfo(invocationCount));
NextFlowManager.get().incrementInvocationCount(getJob());

NextFlowPipelineJob.LOG.info("Starting to execute NextFlow: {}", getJob().getJsonJobInfo(true));

SecurityManager.TransformSession session = null;
boolean success = false;
Expand All @@ -83,10 +78,10 @@ public NextFlowRunTask(Factory factory, PipelineJob job)
File dir = getJob().getLogFile().getParentFile();
getJob().runSubProcess(secretsPB, dir);

ProcessBuilder executionPB = new ProcessBuilder(getArgs(invocationCount));
ProcessBuilder executionPB = new ProcessBuilder(getArgs());
getJob().runSubProcess(executionPB, dir);
log.info("Job Finished");
NextFlowPipelineJob.LOG.info("Finished executing NextFlow: {}", getJob().getJsonJobInfo(invocationCount));
NextFlowPipelineJob.LOG.info("Finished executing NextFlow: {}", getJob().getJsonJobInfo(true));

RecordedAction action = new RecordedAction(ACTION_NAME);
for (Path inputFile : getJob().getInputFilePaths())
Expand All @@ -110,7 +105,7 @@ public NextFlowRunTask(Factory factory, PipelineJob job)
}
if (!success)
{
NextFlowPipelineJob.LOG.info("Failed executing NextFlow: {}", getJob().getJsonJobInfo(invocationCount));
NextFlowPipelineJob.LOG.info("Failed executing NextFlow: {}", getJob().getJsonJobInfo(true));
}
}
}
Expand Down Expand Up @@ -176,7 +171,7 @@ private boolean hasAwsSection(Path configFile) throws PipelineJobException
}


private @NotNull List<String> getArgs(long invocationCount) throws PipelineJobException
private @NotNull List<String> getArgs() throws PipelineJobException
{
NextFlowConfiguration config = NextFlowManager.get().getConfiguration();
Path configFile = getJob().getConfig();
Expand All @@ -201,7 +196,7 @@ private boolean hasAwsSection(Path configFile) throws PipelineJobException
args.add("-c");
args.add(configFile.toAbsolutePath().toString());
args.add("-name");
args.add(getJob().getNextFlowRunName(invocationCount));
args.add(getJob().getNextFlowRunName(true));
return args;
}

Expand Down
Loading