From 1c25ca05fdb378b6069bc0f09acb278cf4621d82 Mon Sep 17 00:00:00 2001 From: Quinten <67589015+QuintenQVD0@users.noreply.github.com> Date: Sun, 16 Nov 2025 19:03:45 +0100 Subject: [PATCH 1/3] Transfer backups and install logs --- router/router_server.go | 2 +- router/router_transfer.go | 187 +++++++++++++++++++++++++++++-------- server/transfer/archive.go | 140 ++++++++++++++++++++++++++- server/transfer/source.go | 52 ++++++++--- 4 files changed, 324 insertions(+), 57 deletions(-) diff --git a/router/router_server.go b/router/router_server.go index 83d9abaa..47c4eb46 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -250,7 +250,7 @@ func deleteServer(c *gin.Context) { // Remove the install log from this server filename := filepath.Join(config.Get().System.LogDirectory, "install", ID+".log") err := os.Remove(filename) - if err != nil { + if err != nil && !os.IsNotExist(err) { log.WithFields(log.Fields{"server_id": ID, "error": err}).Warn("failed to remove server install log during deletion process") } diff --git a/router/router_transfer.go b/router/router_transfer.go index 8e45f2ff..118c5044 100644 --- a/router/router_transfer.go +++ b/router/router_transfer.go @@ -1,7 +1,6 @@ package router import ( - "bytes" "context" "crypto/sha256" "encoding/hex" @@ -12,12 +11,14 @@ import ( "mime/multipart" "net/http" "os" + "path/filepath" "strings" "github.com/apex/log" "github.com/gin-gonic/gin" "github.com/google/uuid" + "github.com/pelican-dev/wings/config" "github.com/pelican-dev/wings/router/middleware" "github.com/pelican-dev/wings/router/tokens" "github.com/pelican-dev/wings/server" @@ -141,18 +142,17 @@ func postTransfers(c *gin.Context) { return } - // Used to calculate the hash of the file as it is being uploaded. - h := sha256.New() // Used to read the file and checksum from the request body. mr := multipart.NewReader(c.Request.Body, params["boundary"]) - // Loop through the parts of the request body and process them. var ( - hasArchive bool - hasChecksum bool - checksumVerified bool + hasArchive bool + archiveChecksum string + archiveChecksumReceived string + backupChecksums = make(map[string]string) ) + // Process multipart form out: for { select { @@ -169,76 +169,183 @@ out: } name := p.FormName() - switch name { - case "archive": + + switch { + case name == "archive": trnsfr.Log().Debug("received archive") + hasArchive = true if err := trnsfr.Server.EnsureDataDirectoryExists(); err != nil { middleware.CaptureAndAbort(c, err) return } - tee := io.TeeReader(p, h) + // Calculate checksum while streaming to extraction + archiveHasher := sha256.New() + tee := io.TeeReader(p, archiveHasher) + + // Stream directly to extraction while calculating checksum if err := trnsfr.Server.Filesystem().ExtractStreamUnsafe(ctx, "/", tee); err != nil { middleware.CaptureAndAbort(c, err) return } - hasArchive = true - case "checksum": - trnsfr.Log().Debug("received checksum") + // Store the CALCULATED checksum for later verification + archiveChecksum = hex.EncodeToString(archiveHasher.Sum(nil)) - if !hasArchive { - middleware.CaptureAndAbort(c, errors.New("archive must be sent before the checksum")) - return - } + trnsfr.Log().Debug("archive extracted and checksum calculated") - hasChecksum = true - - v, err := io.ReadAll(p) + case strings.HasPrefix(name, "checksum_archive"): + trnsfr.Log().Debug("received archive checksum") + checksumData, err := io.ReadAll(p) if err != nil { middleware.CaptureAndAbort(c, err) return } + // Store the RECEIVED checksum for verification + archiveChecksumReceived = string(checksumData) + + case name == "install_logs": + trnsfr.Log().Debug("received install logs") + + // Create install log directory if it doesn't exist + cfg := config.Get() + installLogDir := filepath.Join(cfg.System.LogDirectory, "install") + if err := os.MkdirAll(installLogDir, 0755); err != nil { + // Don't fail transfer for install logs, just log and continue + trnsfr.Log().WithError(err).Warn("failed to create install log directory, skipping") + break + } + + // Use the correct install log path with server UUID + installLogPath := filepath.Join(installLogDir, trnsfr.Server.ID()+".log") + + // Create the install log file + installLogFile, err := os.Create(installLogPath) + if err != nil { + // Don't fail transfer for install logs, just log and continue + trnsfr.Log().WithError(err).Warn("failed to create install log file, skipping") + break + } + + // Stream the install logs to file + if _, err := io.Copy(installLogFile, p); err != nil { + installLogFile.Close() + // Don't fail transfer for install logs, just log and continue + trnsfr.Log().WithError(err).Warn("failed to stream install logs to file, skipping") + break + } + + if err := installLogFile.Close(); err != nil { + // Don't fail transfer for install logs, just log and continue + trnsfr.Log().WithError(err).Warn("failed to close install log file") + } + + trnsfr.Log().WithField("path", installLogPath).Debug("install logs saved successfully") + + case strings.HasPrefix(name, "backup_"): + backupName := strings.TrimPrefix(name, "backup_") + trnsfr.Log().WithField("backup", backupName).Debug("received backup file") + + // Create backup directory if it doesn't exist + cfg := config.Get() + backupDir := filepath.Join(cfg.System.BackupDirectory, trnsfr.Server.ID()) + if err := os.MkdirAll(backupDir, 0755); err != nil { + middleware.CaptureAndAbort(c, fmt.Errorf("failed to create backup directory: %w", err)) + return + } - expected := make([]byte, hex.DecodedLen(len(v))) - n, err := hex.Decode(expected, v) + backupPath := filepath.Join(backupDir, backupName) + + // Create the backup file and stream directly to disk + backupFile, err := os.Create(backupPath) if err != nil { - middleware.CaptureAndAbort(c, err) + middleware.CaptureAndAbort(c, fmt.Errorf("failed to create backup file %s: %w", backupPath, err)) return } - actual := h.Sum(nil) - trnsfr.Log().WithFields(log.Fields{ - "expected": hex.EncodeToString(expected), - "actual": hex.EncodeToString(actual), - }).Debug("checksums") + // Stream and calculate checksum simultaneously + hasher := sha256.New() + tee := io.TeeReader(p, hasher) - if !bytes.Equal(expected[:n], actual) { - middleware.CaptureAndAbort(c, errors.New("checksums don't match")) + if _, err := io.Copy(backupFile, tee); err != nil { + backupFile.Close() + middleware.CaptureAndAbort(c, fmt.Errorf("failed to stream backup file %s: %w", backupName, err)) return } - trnsfr.Log().Debug("checksums match") - checksumVerified = true - default: - continue + if err := backupFile.Close(); err != nil { + middleware.CaptureAndAbort(c, fmt.Errorf("failed to close backup file %s: %w", backupName, err)) + return + } + + // Store the checksum for later verification + backupChecksums[backupName] = hex.EncodeToString(hasher.Sum(nil)) + + trnsfr.Log().WithField("backup", backupName).Debug("backup streamed to disk successfully") + + case strings.HasPrefix(name, "checksum_backup_"): + backupName := strings.TrimPrefix(name, "checksum_backup_") + trnsfr.Log().WithField("backup", backupName).Debug("received backup checksum") + + checksumData, err := io.ReadAll(p) + if err != nil { + middleware.CaptureAndAbort(c, err) + return + } + backupChecksums[backupName] = string(checksumData) } } } - if !hasArchive || !hasChecksum { - middleware.CaptureAndAbort(c, errors.New("missing archive or checksum")) - return + // Verify main archive checksum + if hasArchive { + if archiveChecksumReceived == "" { + middleware.CaptureAndAbort(c, errors.New("archive checksum missing")) + return + } + + // Compare the calculated checksum with the received checksum + if archiveChecksum != archiveChecksumReceived { + trnsfr.Log().WithFields(log.Fields{ + "expected": archiveChecksumReceived, + "actual": archiveChecksum, + }).Error("archive checksum mismatch") + middleware.CaptureAndAbort(c, errors.New("archive checksum mismatch")) + return + } + + trnsfr.Log().Debug("archive checksum verified") + } + + // Verify backup checksums + for backupName, calculatedChecksum := range backupChecksums { + receivedChecksum, exists := backupChecksums[backupName] + if !exists { + middleware.CaptureAndAbort(c, fmt.Errorf("checksum missing for backup %s", backupName)) + return + } + + if calculatedChecksum != receivedChecksum { + trnsfr.Log().WithFields(log.Fields{ + "backup": backupName, + "expected": receivedChecksum, + "actual": calculatedChecksum, + }).Error("backup checksum mismatch") + middleware.CaptureAndAbort(c, fmt.Errorf("backup %s checksum mismatch", backupName)) + return + } + + trnsfr.Log().WithField("backup", backupName).Debug("backup checksum verified") } - if !checksumVerified { - middleware.CaptureAndAbort(c, errors.New("checksums don't match")) + if !hasArchive { + middleware.CaptureAndAbort(c, errors.New("missing archive")) return } // Transfer is almost complete, we just want to ensure the environment is - // configured correctly. We might want to not fail the transfer at this + // configured correctly. We might want to not fail the transfer at this // stage, but we will just to be safe. // Ensure the server environment gets configured. diff --git a/server/transfer/archive.go b/server/transfer/archive.go index bb144c69..fe14848e 100644 --- a/server/transfer/archive.go +++ b/server/transfer/archive.go @@ -2,9 +2,17 @@ package transfer import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "io" + "mime/multipart" + "os" + "path/filepath" + "strings" + "github.com/apex/log" + "github.com/pelican-dev/wings/config" "github.com/pelican-dev/wings/internal/progress" "github.com/pelican-dev/wings/server/filesystem" ) @@ -26,9 +34,138 @@ func (t *Transfer) Archive() (*Archive, error) { return t.archive, nil } +func (a *Archive) StreamBackups(ctx context.Context, mp *multipart.Writer) error { + cfg := config.Get() + backupPath := filepath.Join(cfg.System.BackupDirectory, a.transfer.Server.ID()) + + // Check if backup directory exists + if _, err := os.Stat(backupPath); os.IsNotExist(err) { + a.transfer.Log().Debug("no backups found to transfer") + return nil + } + + entries, err := os.ReadDir(backupPath) + if err != nil { + return err + } + + // Count backups first + var backupFiles []os.DirEntry + for _, entry := range entries { + if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".tar.gz") { + backupFiles = append(backupFiles, entry) + } + } + + totalBackups := len(backupFiles) + if totalBackups == 0 { + a.transfer.Log().Debug("no backups found to transfer") + return nil + } + + a.transfer.Log().Infof("Starting transfer of %d backup files", totalBackups) + a.transfer.SendMessage(fmt.Sprintf("Starting transfer of %d backup files", totalBackups)) + + for _, entry := range backupFiles { + backupFile := filepath.Join(backupPath, entry.Name()) + + a.transfer.Log().WithField("backup", entry.Name()).Debug("streaming backup file") + + // Open backup file for reading + file, err := os.Open(backupFile) + if err != nil { + return fmt.Errorf("failed to open backup file %s: %w", backupFile, err) + } + + // Create hasher for this specific backup + backupHasher := sha256.New() + backupTee := io.TeeReader(file, backupHasher) + + // Create form file for the backup + part, err := mp.CreateFormFile("backup_"+entry.Name(), entry.Name()) + if err != nil { + file.Close() + return fmt.Errorf("failed to create form file for backup %s: %w", entry.Name(), err) + } + + // Stream the backup file + if _, err := io.Copy(part, backupTee); err != nil { + file.Close() + return fmt.Errorf("failed to stream backup file %s: %w", entry.Name(), err) + } + file.Close() + + // Write individual backup checksum + checksumField := "checksum_backup_" + entry.Name() + if err := mp.WriteField(checksumField, hex.EncodeToString(backupHasher.Sum(nil))); err != nil { + return fmt.Errorf("failed to write checksum for backup %s: %w", entry.Name(), err) + } + + // Update progress tracking + a.backupsStreamed++ + + // Progress message + progressMsg := fmt.Sprintf("Backup %d/%d completed: %s", a.backupsStreamed, totalBackups, entry.Name()) + a.transfer.Log().Info(progressMsg) + a.transfer.SendMessage(progressMsg) + + a.transfer.Log().WithFields(log.Fields{ + "backup": entry.Name(), + "checksum": checksumField, + }).Debug("backup file streamed with checksum") + } + + a.transfer.Log().WithField("count", totalBackups).Debug("finished streaming backups") + return nil +} + +// In archive.go - add this method +func (a *Archive) StreamInstallLogs(ctx context.Context, mp *multipart.Writer) error { + // Look for install logs in the server directory + + installLogPath := filepath.Join(config.Get().System.LogDirectory, "install", a.transfer.Server.ID()+".log") + + // Check if install log file exists + if _, err := os.Stat(installLogPath); os.IsNotExist(err) { + a.transfer.Log().Debug("install logs not found, skipping") + return nil // No error if logs don't exist + } + + a.transfer.Log().Debug("streaming install logs") + + // Open install log file for reading + file, err := os.Open(installLogPath) + if err != nil { + // Don't fail the transfer if we can't read install logs + a.transfer.Log().WithError(err).Warn("failed to open install logs, skipping") + return nil + } + defer file.Close() + + // Create form file for the install logs + part, err := mp.CreateFormFile("install_logs", "install.log") + if err != nil { + // Don't fail the transfer if we can't create form file + a.transfer.Log().WithError(err).Warn("failed to create form file for install logs, skipping") + return nil + } + + // Stream the install log file + if _, err := io.Copy(part, file); err != nil { + // Don't fail the transfer if we can't stream install logs + a.transfer.Log().WithError(err).Warn("failed to stream install logs, skipping") + return nil + } + + a.transfer.Log().Debug("install logs streamed successfully") + return nil +} + // Archive represents an archive used to transfer the contents of a server. type Archive struct { - archive *filesystem.Archive + archive *filesystem.Archive + transfer *Transfer + backupsStreamed int } // NewArchive returns a new archive associated with the given transfer. @@ -38,6 +175,7 @@ func NewArchive(t *Transfer, size uint64) *Archive { Filesystem: t.Server.Filesystem(), Progress: progress.NewProgress(size), }, + transfer: t, } } diff --git a/server/transfer/source.go b/server/transfer/source.go index bf6a9d3e..30d70be0 100644 --- a/server/transfer/source.go +++ b/server/transfer/source.go @@ -10,8 +10,6 @@ import ( "mime/multipart" "net/http" "time" - - "github.com/pelican-dev/wings/internal/progress" ) // PushArchiveToTarget POSTs the archive to the target node and returns the @@ -34,7 +32,7 @@ func (t *Transfer) PushArchiveToTarget(url, token string) ([]byte, error) { // Send the upload progress to the websocket every 5 seconds. ctx2, cancel2 := context.WithCancel(ctx) defer cancel2() - go func(ctx context.Context, p *progress.Progress, tc *time.Ticker) { + go func(ctx context.Context, a *Archive, tc *time.Ticker) { defer tc.Stop() for { @@ -42,10 +40,17 @@ func (t *Transfer) PushArchiveToTarget(url, token string) ([]byte, error) { case <-ctx.Done(): return case <-tc.C: - t.SendMessage("Uploading " + p.Progress(25)) + progress := a.Progress() + if progress != nil { + message := "Uploading " + progress.Progress(25) + // We can't easily show backup count here without tracking totalBackups + // But we're already showing individual backup progress in StreamBackups + t.SendMessage(message) + t.Log().Info(message) + } } } - }(ctx2, a.Progress(), time.NewTicker(5*time.Second)) + }(ctx2, a, time.NewTicker(5*time.Second)) // Create a new request using the pipe as the body. body, writer := io.Pipe() @@ -70,12 +75,13 @@ func (t *Transfer) PushArchiveToTarget(url, token string) ([]byte, error) { defer writer.Close() defer mp.Close() + // Stream server data with its own checksum src, pw := io.Pipe() defer src.Close() defer pw.Close() - h := sha256.New() - tee := io.TeeReader(src, h) + mainHasher := sha256.New() + mainTee := io.TeeReader(src, mainHasher) dest, err := mp.CreateFormFile("archive", "archive.tar.gz") if err != nil { @@ -87,37 +93,53 @@ func (t *Transfer) PushArchiveToTarget(url, token string) ([]byte, error) { go func() { defer close(ch) - if _, err := io.Copy(dest, tee); err != nil { + if _, err := io.Copy(dest, mainTee); err != nil { ch <- fmt.Errorf("failed to stream archive to destination: %w", err) return } - t.Log().Debug("finished copying dest to tee") + t.Log().Debug("finished copying main archive to destination") }() + // Stream server data if err := a.Stream(ctx, pw); err != nil { errChan <- errors.New("failed to stream archive to pipe") return } t.Log().Debug("finished streaming archive to pipe") - // Close the pipe writer early to release resources and ensure that the data gets flushed. + // Close the pipe writer to ensure data gets flushed _ = pw.Close() - // Wait for the copy to finish before we continue. - t.Log().Debug("waiting on copy to finish") + // Wait for the copy to finish + t.Log().Debug("waiting on main archive copy to finish") if err := <-ch; err != nil { errChan <- err return } - if err := mp.WriteField("checksum", hex.EncodeToString(h.Sum(nil))); err != nil { - errChan <- errors.New("failed to stream checksum") + // Write main archive checksum + if err := mp.WriteField("checksum_archive", hex.EncodeToString(mainHasher.Sum(nil))); err != nil { + errChan <- errors.New("failed to stream main archive checksum") + return + } + + // Stream backups with individual checksums + t.SendMessage("Streaming backup files to destination...") + if err := a.StreamBackups(ctx, mp); err != nil { + errChan <- fmt.Errorf("failed to stream backups: %w", err) return } cancel2() - t.SendMessage("Finished streaming archive to destination.") + t.SendMessage("Finished streaming archive and backups to destination.") + + // Stream install logs if they exist + if err := a.StreamInstallLogs(ctx, mp); err != nil { + errChan <- fmt.Errorf("failed to stream install logs: %w", err) + return + } + t.SendMessage("Finished streaming the install logs to destination.") if err := mp.Close(); err != nil { t.Log().WithError(err).Error("error while closing multipart writer") From a07ed0ecf1ea78d63c189d82cc24e8324de46692 Mon Sep 17 00:00:00 2001 From: Quinten <67589015+QuintenQVD0@users.noreply.github.com> Date: Sun, 16 Nov 2025 19:19:44 +0100 Subject: [PATCH 2/3] Fix checksums check --- router/router_transfer.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/router/router_transfer.go b/router/router_transfer.go index 118c5044..f49ef709 100644 --- a/router/router_transfer.go +++ b/router/router_transfer.go @@ -150,7 +150,8 @@ func postTransfers(c *gin.Context) { hasArchive bool archiveChecksum string archiveChecksumReceived string - backupChecksums = make(map[string]string) + backupChecksumsCalculated = make(map[string]string) + backupChecksumsReceived = make(map[string]string) ) // Process multipart form out: @@ -280,7 +281,7 @@ out: } // Store the checksum for later verification - backupChecksums[backupName] = hex.EncodeToString(hasher.Sum(nil)) + backupChecksumsCalculated[backupName] = hex.EncodeToString(hasher.Sum(nil)) trnsfr.Log().WithField("backup", backupName).Debug("backup streamed to disk successfully") @@ -293,7 +294,7 @@ out: middleware.CaptureAndAbort(c, err) return } - backupChecksums[backupName] = string(checksumData) + backupChecksumsReceived[backupName] = string(checksumData) } } } @@ -319,8 +320,8 @@ out: } // Verify backup checksums - for backupName, calculatedChecksum := range backupChecksums { - receivedChecksum, exists := backupChecksums[backupName] + for backupName, calculatedChecksum := range backupChecksumsCalculated { + receivedChecksum, exists := backupChecksumsReceived[backupName] if !exists { middleware.CaptureAndAbort(c, fmt.Errorf("checksum missing for backup %s", backupName)) return From 91f17f86dd32d8e0d720bab338b91ebf5d9c6086 Mon Sep 17 00:00:00 2001 From: Quinten <67589015+QuintenQVD0@users.noreply.github.com> Date: Sat, 29 Nov 2025 13:47:18 +0100 Subject: [PATCH 3/3] Let the panel provide an array of backups to transfer --- server/transfer/archive.go | 42 ++++++++++++++++++++++++------------- server/transfer/source.go | 13 +++++++----- server/transfer/transfer.go | 4 ++++ 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/server/transfer/archive.go b/server/transfer/archive.go index fe14848e..684c11cc 100644 --- a/server/transfer/archive.go +++ b/server/transfer/archive.go @@ -35,6 +35,11 @@ func (t *Transfer) Archive() (*Archive, error) { } func (a *Archive) StreamBackups(ctx context.Context, mp *multipart.Writer) error { + if len(a.transfer.BackupUUIDs) == 0 { + a.transfer.Log().Debug("no backups specified for transfer") + return nil + } + cfg := config.Get() backupPath := filepath.Join(cfg.System.BackupDirectory, a.transfer.Server.ID()) @@ -49,24 +54,31 @@ func (a *Archive) StreamBackups(ctx context.Context, mp *multipart.Writer) error return err } - // Count backups first - var backupFiles []os.DirEntry - for _, entry := range entries { - if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".tar.gz") { - backupFiles = append(backupFiles, entry) - } - } - - totalBackups := len(backupFiles) - if totalBackups == 0 { - a.transfer.Log().Debug("no backups found to transfer") - return nil - } - + // Create a set of backup UUIDs for quick lookup + backupSet := make(map[string]bool) + for _, uuid := range a.transfer.BackupUUIDs { + backupSet[uuid+".tar.gz"] = true // Backup files are stored as UUID.tar.gz + } + + var backupsToTransfer []os.DirEntry + for _, entry := range entries { + if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".tar.gz") { + if backupSet[entry.Name()] { + backupsToTransfer = append(backupsToTransfer, entry) + } + } + } + + totalBackups := len(backupsToTransfer) + if totalBackups == 0 { + a.transfer.Log().Debug("no matching backup files found") + return nil + } + a.transfer.Log().Infof("Starting transfer of %d backup files", totalBackups) a.transfer.SendMessage(fmt.Sprintf("Starting transfer of %d backup files", totalBackups)) - for _, entry := range backupFiles { + for _, entry := range backupsToTransfer { backupFile := filepath.Join(backupPath, entry.Name()) a.transfer.Log().WithField("backup", entry.Name()).Debug("streaming backup file") diff --git a/server/transfer/source.go b/server/transfer/source.go index 30d70be0..9123023b 100644 --- a/server/transfer/source.go +++ b/server/transfer/source.go @@ -124,11 +124,14 @@ func (t *Transfer) PushArchiveToTarget(url, token string) ([]byte, error) { return } - // Stream backups with individual checksums - t.SendMessage("Streaming backup files to destination...") - if err := a.StreamBackups(ctx, mp); err != nil { - errChan <- fmt.Errorf("failed to stream backups: %w", err) - return + if len(t.BackupUUIDs) > 0 { + t.SendMessage(fmt.Sprintf("Streaming %d backup files to destination...", len(t.BackupUUIDs))) + if err := a.StreamBackups(ctx, mp); err != nil { + errChan <- fmt.Errorf("failed to stream backups: %w", err) + return + } + } else { + t.Log().Debug("no backups specified for transfer") } cancel2() diff --git a/server/transfer/transfer.go b/server/transfer/transfer.go index 800dac6a..8fe5ce47 100644 --- a/server/transfer/transfer.go +++ b/server/transfer/transfer.go @@ -53,6 +53,10 @@ type Transfer struct { // archive is the archive that is being created for the transfer. archive *Archive + + // BackupUUIDs is the list of backup UUIDs that should be transferred. + // If empty, no backups will be transferred. + BackupUUIDs []string } // New returns a new transfer instance for the given server.