From 4ad2999c5b7945394146d0c76fd0807ab714a808 Mon Sep 17 00:00:00 2001 From: stdpi Date: Sun, 23 Nov 2025 16:40:00 +0700 Subject: [PATCH] feat: option to omit transfer on same storage volume --- config/config.go | 6 ++++++ router/router_server.go | 20 +++++++++++--------- router/router_transfer.go | 15 +++++++++++++++ server/transfer/source.go | 8 ++++++++ 4 files changed, 40 insertions(+), 9 deletions(-) diff --git a/config/config.go b/config/config.go index a57d9c46..cd9dbd4a 100644 --- a/config/config.go +++ b/config/config.go @@ -278,6 +278,12 @@ type Transfers struct { // // Defaults to 0 (unlimited) DownloadLimit int `default:"0" yaml:"download_limit"` + + // StoragePool acts as a per-node identifier to signal that this node shares a common data volume + // with other nodes in the cluster. When this value is set and matches the value on a target node, + // Wings will assume the server data already exists on the target and will skip copying and cleanup. + // When empty, transfers behave normally. + StoragePool string `yaml:"storage_pool"` } type ConsoleThrottles struct { diff --git a/router/router_server.go b/router/router_server.go index 83d9abaa..e245d68a 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -75,7 +75,6 @@ func getServerInstallLogs(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"data": output}) } - // Handles a request to control the power state of a server. If the action being passed // through is invalid a 404 is returned. Otherwise, a HTTP/202 Accepted response is returned // and the actual power action is run asynchronously so that we don't have to block the @@ -276,14 +275,17 @@ func deleteServer(c *gin.Context) { // // In addition, servers with large amounts of files can take some time to finish deleting, // so we don't want to block the HTTP call while waiting on this. - go func(s *server.Server) { - fs := s.Filesystem() - p := fs.Path() - _ = fs.UnixFS().Close() - if err := os.RemoveAll(p); err != nil { - log.WithFields(log.Fields{"path": p, "error": err}).Warn("failed to remove server files during deletion process") - } - }(s) + // Skip file removal when a storage pool is configured, since the data is shared across nodes. + if config.Get().System.Transfers.StoragePool == "" { + go func(s *server.Server) { + fs := s.Filesystem() + p := fs.Path() + _ = fs.UnixFS().Close() + if err := os.RemoveAll(p); err != nil { + log.WithFields(log.Fields{"path": p, "error": err}).Warn("failed to remove server files during deletion process") + } + }(s) + } middleware.ExtractManager(c).Remove(func(server *server.Server) bool { return server.ID() == s.ID() diff --git a/router/router_transfer.go b/router/router_transfer.go index 8e45f2ff..9b7ca272 100644 --- a/router/router_transfer.go +++ b/router/router_transfer.go @@ -18,6 +18,7 @@ import ( "github.com/gin-gonic/gin" "github.com/google/uuid" + "github.com/pelican-dev/wings/config" "github.com/pelican-dev/wings/router/middleware" "github.com/pelican-dev/wings/router/tokens" "github.com/pelican-dev/wings/server" @@ -128,6 +129,20 @@ func postTransfers(c *gin.Context) { trnsfr.Server.Events().Publish(server.TransferStatusEvent, "success") }(ctx, trnsfr) + { + remotePool := config.Get().System.Transfers.StoragePool + sourcePool := c.GetHeader("X-Storage-Pool") + if remotePool != "" && sourcePool != "" && strings.EqualFold(remotePool, sourcePool) { + if err := trnsfr.Server.CreateEnvironment(); err != nil { + middleware.CaptureAndAbort(c, err) + return + } + successful = true + c.Status(http.StatusOK) + return + } + } + mediaType, params, err := mime.ParseMediaType(c.GetHeader("Content-Type")) if err != nil { trnsfr.Log().Debug("failed to parse content type header") diff --git a/server/transfer/source.go b/server/transfer/source.go index bf6a9d3e..3966c951 100644 --- a/server/transfer/source.go +++ b/server/transfer/source.go @@ -11,6 +11,7 @@ import ( "net/http" "time" + "github.com/pelican-dev/wings/config" "github.com/pelican-dev/wings/internal/progress" ) @@ -23,6 +24,10 @@ func (t *Transfer) PushArchiveToTarget(url, token string) ([]byte, error) { t.SendMessage("Preparing to stream server data to destination...") t.SetStatus(StatusProcessing) + // Always include the configured storage pool identifier in the outgoing request headers. + // The destination can use this information to determine if it should skip copying files when both nodes share the same storage backend. + sp := config.Get().System.Transfers.StoragePool + a, err := t.Archive() if err != nil { t.Error(err, "Failed to get archive for transfer.") @@ -56,6 +61,9 @@ func (t *Transfer) PushArchiveToTarget(url, token string) ([]byte, error) { return nil, err } req.Header.Set("Authorization", token) + if sp != "" { + req.Header.Set("X-Storage-Pool", sp) + } // Create a new multipart writer that writes the archive to the pipe. mp := multipart.NewWriter(writer)