Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ type Transfers struct {
//
// Defaults to 0 (unlimited)
DownloadLimit int `default:"0" yaml:"download_limit"`

// StoragePool acts as a per-node identifier to signal that this node shares a common data volume
// with other nodes in the cluster. When this value is set and matches the value on a target node,
// Wings will assume the server data already exists on the target and will skip copying and cleanup.
// When empty, transfers behave normally.
StoragePool string `yaml:"storage_pool"`
}

type ConsoleThrottles struct {
Expand Down
20 changes: 11 additions & 9 deletions router/router_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func getServerInstallLogs(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"data": output})
}


// Handles a request to control the power state of a server. If the action being passed
// through is invalid a 404 is returned. Otherwise, a HTTP/202 Accepted response is returned
// and the actual power action is run asynchronously so that we don't have to block the
Expand Down Expand Up @@ -276,14 +275,17 @@ func deleteServer(c *gin.Context) {
//
// In addition, servers with large amounts of files can take some time to finish deleting,
// so we don't want to block the HTTP call while waiting on this.
go func(s *server.Server) {
fs := s.Filesystem()
p := fs.Path()
_ = fs.UnixFS().Close()
if err := os.RemoveAll(p); err != nil {
log.WithFields(log.Fields{"path": p, "error": err}).Warn("failed to remove server files during deletion process")
}
}(s)
// Skip file removal when a storage pool is configured, since the data is shared across nodes.
if config.Get().System.Transfers.StoragePool == "" {
go func(s *server.Server) {
fs := s.Filesystem()
p := fs.Path()
_ = fs.UnixFS().Close()
if err := os.RemoveAll(p); err != nil {
log.WithFields(log.Fields{"path": p, "error": err}).Warn("failed to remove server files during deletion process")
}
}(s)
}

middleware.ExtractManager(c).Remove(func(server *server.Server) bool {
return server.ID() == s.ID()
Expand Down
15 changes: 15 additions & 0 deletions router/router_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/google/uuid"

"github.com/pelican-dev/wings/config"
"github.com/pelican-dev/wings/router/middleware"
"github.com/pelican-dev/wings/router/tokens"
"github.com/pelican-dev/wings/server"
Expand Down Expand Up @@ -128,6 +129,20 @@ func postTransfers(c *gin.Context) {
trnsfr.Server.Events().Publish(server.TransferStatusEvent, "success")
}(ctx, trnsfr)

{
remotePool := config.Get().System.Transfers.StoragePool
sourcePool := c.GetHeader("X-Storage-Pool")
if remotePool != "" && sourcePool != "" && strings.EqualFold(remotePool, sourcePool) {
if err := trnsfr.Server.CreateEnvironment(); err != nil {
middleware.CaptureAndAbort(c, err)
return
}
successful = true
c.Status(http.StatusOK)
return
}
}

mediaType, params, err := mime.ParseMediaType(c.GetHeader("Content-Type"))
if err != nil {
trnsfr.Log().Debug("failed to parse content type header")
Expand Down
8 changes: 8 additions & 0 deletions server/transfer/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"time"

"github.com/pelican-dev/wings/config"
"github.com/pelican-dev/wings/internal/progress"
)

Expand All @@ -23,6 +24,10 @@ func (t *Transfer) PushArchiveToTarget(url, token string) ([]byte, error) {
t.SendMessage("Preparing to stream server data to destination...")
t.SetStatus(StatusProcessing)

// Always include the configured storage pool identifier in the outgoing request headers.
// The destination can use this information to determine if it should skip copying files when both nodes share the same storage backend.
sp := config.Get().System.Transfers.StoragePool

a, err := t.Archive()
if err != nil {
t.Error(err, "Failed to get archive for transfer.")
Expand Down Expand Up @@ -56,6 +61,9 @@ func (t *Transfer) PushArchiveToTarget(url, token string) ([]byte, error) {
return nil, err
}
req.Header.Set("Authorization", token)
if sp != "" {
req.Header.Set("X-Storage-Pool", sp)
}

// Create a new multipart writer that writes the archive to the pipe.
mp := multipart.NewWriter(writer)
Expand Down