From bb25803d81c944c2bd5d51817b93a02a5b847610 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Thu, 11 Dec 2025 16:40:21 +0300 Subject: [PATCH 01/14] feat: new push Signed-off-by: Timur Tuktamyshev --- internal/mirror/cmd/pull/pull.go | 12 +- internal/mirror/cmd/push/push.go | 63 +++++++ internal/mirror/modules/push.go | 197 ++++++++++++++++++++ internal/mirror/platform/platform.go | 4 +- internal/mirror/platform/push.go | 77 ++++++++ internal/mirror/push.go | 88 ++++++++- internal/mirror/pusher/pusher.go | 260 +++++++++++++++++++++------ internal/mirror/security/push.go | 85 +++++++++ 8 files changed, 720 insertions(+), 66 deletions(-) create mode 100644 internal/mirror/modules/push.go create mode 100644 internal/mirror/platform/push.go create mode 100644 internal/mirror/security/push.go diff --git a/internal/mirror/cmd/pull/pull.go b/internal/mirror/cmd/pull/pull.go index 4a8d1318..b5c75c90 100644 --- a/internal/mirror/cmd/pull/pull.go +++ b/internal/mirror/cmd/pull/pull.go @@ -23,8 +23,10 @@ import ( "fmt" "log/slog" "os" + "os/signal" "path" "path/filepath" + "syscall" "time" "github.com/Masterminds/semver/v3" @@ -102,11 +104,19 @@ func NewCommand() *cobra.Command { } func pull(cmd *cobra.Command, _ []string) error { + // Set up graceful cancellation on Ctrl+C + ctx, cancel := signal.NotifyContext(cmd.Context(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + puller := NewPuller(cmd) puller.logger.Infof("d8 version: %s", version.Version) - if err := puller.Execute(cmd.Context()); err != nil { + if err := puller.Execute(ctx); err != nil { + if errors.Is(err, context.Canceled) { + puller.logger.WarnLn("\nOperation cancelled by user") + return nil + } return ErrPullFailed } diff --git a/internal/mirror/cmd/push/push.go b/internal/mirror/cmd/push/push.go index a2fb674a..ccf5bcd3 100644 --- a/internal/mirror/cmd/push/push.go +++ b/internal/mirror/cmd/push/push.go @@ -23,9 +23,11 @@ import ( "io" "log/slog" "os" + "os/signal" "path" "path/filepath" "strings" + "syscall" "time" "github.com/google/go-containerregistry/pkg/authn" @@ -36,6 +38,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/registry" regclient "github.com/deckhouse/deckhouse/pkg/registry/client" + "github.com/deckhouse/deckhouse-cli/internal/mirror" "github.com/deckhouse/deckhouse-cli/internal/mirror/chunked" "github.com/deckhouse/deckhouse-cli/internal/mirror/operations" "github.com/deckhouse/deckhouse-cli/internal/version" @@ -289,6 +292,11 @@ func (p *Pusher) Execute() error { return err } + // Use new push service when NEW_PULL env is set + if os.Getenv("NEW_PULL") == "true" { + return p.executeNewPush() + } + if err := p.pushStaticPackages(); err != nil { return err } @@ -300,6 +308,61 @@ func (p *Pusher) Execute() error { return nil } +// executeNewPush runs the push using the new service architecture +func (p *Pusher) executeNewPush() error { + // Set up graceful cancellation on Ctrl+C + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + logger := dkplog.NewNop() + + if log.DebugLogLevel() >= 3 { + logger = dkplog.NewLogger(dkplog.WithLevel(slog.LevelDebug)) + } + + // Create registry client + clientOpts := ®client.Options{ + Insecure: p.pushParams.Insecure, + TLSSkipVerify: p.pushParams.SkipTLSVerification, + Logger: logger, + } + + if p.pushParams.RegistryAuth != nil { + clientOpts.Auth = p.pushParams.RegistryAuth + } + + var client registry.Client + client = regclient.NewClientWithOptions(p.pushParams.RegistryHost, clientOpts) + + // Scope to the registry path + if p.pushParams.RegistryPath != "" { + client = client.WithSegment(p.pushParams.RegistryPath) + } + + svc := mirror.NewPushService( + client, + &mirror.PushServiceOptions{ + BundleDir: p.pushParams.BundleDir, + WorkingDir: p.pushParams.WorkingDir, + ModulesPathSuffix: p.pushParams.ModulesPathSuffix, + }, + logger.Named("push"), + p.logger.(*log.SLogger), + ) + + err := svc.Push(ctx) + if err != nil { + // Handle context cancellation gracefully + if errors.Is(err, context.Canceled) { + p.logger.WarnLn("Operation cancelled by user") + return nil + } + return err + } + + return nil +} + // validateRegistryAccess validates access to the registry func (p *Pusher) validateRegistryAccess() error { p.logger.InfoLn("Validating registry access") diff --git a/internal/mirror/modules/push.go b/internal/mirror/modules/push.go new file mode 100644 index 00000000..f4705fe3 --- /dev/null +++ b/internal/mirror/modules/push.go @@ -0,0 +1,197 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package modules + +import ( + "context" + "fmt" + "log/slog" + "os" + "path" + "path/filepath" + "slices" + "strings" + + dkplog "github.com/deckhouse/deckhouse/pkg/log" + "github.com/deckhouse/deckhouse/pkg/registry" + + "github.com/deckhouse/deckhouse-cli/internal/mirror/pusher" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" +) + +// PushOptions contains options for pushing module images +type PushOptions struct { + BundleDir string + WorkingDir string +} + +// PushService handles pushing module images to registry +type PushService struct { + client registry.Client + pusherService *pusher.Service + options *PushOptions + logger *dkplog.Logger + userLogger *log.SLogger +} + +// NewPushService creates a new modules push service +func NewPushService( + client registry.Client, + options *PushOptions, + logger *dkplog.Logger, + userLogger *log.SLogger, +) *PushService { + if options == nil { + options = &PushOptions{} + } + + return &PushService{ + client: client, + pusherService: pusher.NewService(logger, userLogger), + options: options, + logger: logger, + userLogger: userLogger, + } +} + +// Push pushes all module packages to the registry +func (svc *PushService) Push(ctx context.Context) error { + modulePackages, err := svc.findModulePackages() + if err != nil { + return fmt.Errorf("find module packages: %w", err) + } + + if len(modulePackages) == 0 { + svc.userLogger.InfoLn("No module packages found, skipping") + return nil + } + + pushed := make(map[string]struct{}) + for _, moduleName := range modulePackages { + if _, ok := pushed[moduleName]; ok { + continue + } + + if err := svc.pushModule(ctx, moduleName); err != nil { + svc.userLogger.WarnLn(err) + continue + } + pushed[moduleName] = struct{}{} + } + + if len(pushed) > 0 { + names := make([]string, 0, len(pushed)) + for name := range pushed { + names = append(names, name) + } + slices.Sort(names) + svc.userLogger.Infof("Modules pushed: %s", strings.Join(names, ", ")) + } + + return nil +} + +func (svc *PushService) findModulePackages() ([]string, error) { + entries, err := os.ReadDir(svc.options.BundleDir) + if err != nil { + return nil, fmt.Errorf("list bundle directory: %w", err) + } + + modules := make([]string, 0, len(entries)) + for _, entry := range entries { + name := entry.Name() + + // Skip non-module files + if !strings.HasPrefix(name, "module-") { + continue + } + + // Only process .tar and .chunk files + ext := filepath.Ext(name) + if ext != ".tar" && ext != ".chunk" { + continue + } + + // Extract module name: "module-foo.tar" -> "foo" + // Handle chunked files: "module-foo.tar.chunk000" -> "foo" + moduleName := strings.TrimPrefix(name, "module-") + moduleName = strings.TrimSuffix(moduleName, ext) + moduleName = strings.TrimSuffix(moduleName, ".tar") + + modules = append(modules, moduleName) + } + + return modules, nil +} + +func (svc *PushService) pushModule(ctx context.Context, moduleName string) error { + return svc.pusherService.PushPackage(ctx, pusher.PackagePushConfig{ + PackageName: "module-" + moduleName, + ProcessName: "Push module: " + moduleName, + WorkingDir: filepath.Join(svc.options.WorkingDir, "modules"), + BundleDir: svc.options.BundleDir, + Client: svc.client.WithSegment(moduleName), + // New pull creates: module/, release/, extra/ + MandatoryLayoutsFunc: func(packageDir string) map[string]string { + return map[string]string{ + "module root layout": filepath.Join(packageDir, "module"), + "module release channels layout": filepath.Join(packageDir, "release"), + } + }, + // Dynamic layout discovery after unpacking + LayoutsFunc: svc.buildModuleLayouts, + }) +} + +// buildModuleLayouts returns the list of layouts for a module, including dynamic extra discovery +func (svc *PushService) buildModuleLayouts(packageDir string) []pusher.LayoutMapping { + layouts := []pusher.LayoutMapping{ + {LayoutPath: "module", Segment: ""}, // Root module images + {LayoutPath: "release", Segment: "release"}, // Release channels + } + + // Check if extra directory exists + extraDir := filepath.Join(packageDir, "extra") + if _, err := os.Stat(extraDir); os.IsNotExist(err) { + return layouts + } + + // Add root extra layout + layouts = append(layouts, pusher.LayoutMapping{ + LayoutPath: "extra", + Segment: "extra", + }) + + // Discover nested extra layouts + entries, err := os.ReadDir(extraDir) + if err != nil { + svc.logger.Warn("Error reading extra dir", slog.Any("error", err)) + return layouts + } + + for _, entry := range entries { + if entry.IsDir() { + svc.logger.Debug("Found extra layout", slog.String("layout", entry.Name())) + layouts = append(layouts, pusher.LayoutMapping{ + LayoutPath: path.Join("extra", entry.Name()), + Segment: path.Join("extra", entry.Name()), + }) + } + } + + return layouts +} diff --git a/internal/mirror/platform/platform.go b/internal/mirror/platform/platform.go index 55f66520..fc843623 100644 --- a/internal/mirror/platform/platform.go +++ b/internal/mirror/platform/platform.go @@ -288,11 +288,13 @@ func (svc *Service) getReleaseChannelVersionFromRegistry(ctx context.Context, re svc.userLogger.Debugf("image reference: %s@%s", imageMeta, digest.String()) - err = svc.layout.DeckhouseReleaseChannel.AddImage(image, imageMeta.GetTagReference()) + // Use just the channel name (e.g., "alpha") as the tag for the layout, not the full reference + err = svc.layout.DeckhouseReleaseChannel.AddImage(image, releaseChannel) if err != nil { return nil, fmt.Errorf("append %s release channel image to layout: %w", releaseChannel, err) } + // But use full reference for internal tracking svc.downloadList.DeckhouseReleaseChannel[imageMeta.GetTagReference()] = puller.NewImageMeta(meta.Version, imageMeta.GetTagReference(), &digest) return ver, nil diff --git a/internal/mirror/platform/push.go b/internal/mirror/platform/push.go new file mode 100644 index 00000000..9f5d646d --- /dev/null +++ b/internal/mirror/platform/push.go @@ -0,0 +1,77 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package platform + +import ( + "context" + + dkplog "github.com/deckhouse/deckhouse/pkg/log" + "github.com/deckhouse/deckhouse/pkg/registry" + + "github.com/deckhouse/deckhouse-cli/internal/mirror/pusher" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/bundle" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" +) + +// PushOptions contains options for pushing platform images +type PushOptions struct { + BundleDir string + WorkingDir string +} + +// PushService handles pushing platform images to registry +type PushService struct { + client registry.Client + pusherService *pusher.Service + options *PushOptions +} + +// NewPushService creates a new platform push service +func NewPushService( + client registry.Client, + options *PushOptions, + logger *dkplog.Logger, + userLogger *log.SLogger, +) *PushService { + if options == nil { + options = &PushOptions{} + } + + return &PushService{ + client: client, + pusherService: pusher.NewService(logger, userLogger), + options: options, + } +} + +// Push pushes the platform package to the registry +func (svc *PushService) Push(ctx context.Context) error { + return svc.pusherService.PushPackage(ctx, pusher.PackagePushConfig{ + PackageName: "platform", + ProcessName: "Push Deckhouse platform", + WorkingDir: svc.options.WorkingDir, + BundleDir: svc.options.BundleDir, + Client: svc.client, + MandatoryLayoutsFunc: bundle.MandatoryLayoutsForPlatform, + Layouts: []pusher.LayoutMapping{ + {LayoutPath: "", Segment: ""}, // Root layout + {LayoutPath: "install", Segment: "install"}, // Installer images + {LayoutPath: "install-standalone", Segment: "install-standalone"}, // Standalone installer + {LayoutPath: "release-channel", Segment: "release-channel"}, // Release channels + }, + }) +} diff --git a/internal/mirror/push.go b/internal/mirror/push.go index 7f4c7430..de9bbae9 100644 --- a/internal/mirror/push.go +++ b/internal/mirror/push.go @@ -17,23 +17,93 @@ limitations under the License. package mirror import ( + "context" + "fmt" + dkplog "github.com/deckhouse/deckhouse/pkg/log" + "github.com/deckhouse/deckhouse/pkg/registry" + "github.com/deckhouse/deckhouse-cli/internal/mirror/modules" + "github.com/deckhouse/deckhouse-cli/internal/mirror/platform" + "github.com/deckhouse/deckhouse-cli/internal/mirror/security" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" - registryservice "github.com/deckhouse/deckhouse-cli/pkg/registry/service" ) -type PushService struct { - registryService registryservice.Service +// PushServiceOptions contains configuration options for PushService +type PushServiceOptions struct { + // BundleDir is the directory containing the bundle to push + BundleDir string + // WorkingDir is the temporary directory for unpacking bundles + WorkingDir string + // ModulesPathSuffix is the path suffix for modules in registry + ModulesPathSuffix string +} - logger *dkplog.Logger - userLogger *log.SLogger +// PushService orchestrates pushing Deckhouse components to registry +type PushService struct { + platformService *platform.PushService + securityService *security.PushService + modulesService *modules.PushService } -func NewPushService(registryService registryservice.Service, logger *dkplog.Logger, userLogger *log.SLogger) *PushService { +// NewPushService creates a new PushService +func NewPushService( + client registry.Client, + options *PushServiceOptions, + logger *dkplog.Logger, + userLogger *log.SLogger, +) *PushService { + if options == nil { + options = &PushServiceOptions{} + } + return &PushService{ - registryService: registryService, - logger: logger, - userLogger: userLogger, + platformService: platform.NewPushService( + client, + &platform.PushOptions{ + BundleDir: options.BundleDir, + WorkingDir: options.WorkingDir, + }, + logger.Named("platform"), + userLogger, + ), + securityService: security.NewPushService( + client, + &security.PushOptions{ + BundleDir: options.BundleDir, + WorkingDir: options.WorkingDir, + }, + logger.Named("security"), + userLogger, + ), + modulesService: modules.NewPushService( + client.WithSegment(options.ModulesPathSuffix), + &modules.PushOptions{ + BundleDir: options.BundleDir, + WorkingDir: options.WorkingDir, + }, + logger.Named("modules"), + userLogger, + ), + } +} + +// Push uploads Deckhouse components to registry +func (svc *PushService) Push(ctx context.Context) error { + // Push platform package + if err := svc.platformService.Push(ctx); err != nil { + return fmt.Errorf("push platform: %w", err) + } + + // Push security package + if err := svc.securityService.Push(ctx); err != nil { + return fmt.Errorf("push security databases: %w", err) } + + // Push module packages + if err := svc.modulesService.Push(ctx); err != nil { + return fmt.Errorf("push modules: %w", err) + } + + return nil } diff --git a/internal/mirror/pusher/pusher.go b/internal/mirror/pusher/pusher.go index 5a129e13..7e51e7ba 100644 --- a/internal/mirror/pusher/pusher.go +++ b/internal/mirror/pusher/pusher.go @@ -20,99 +20,239 @@ import ( "context" "fmt" "io" + "log/slog" "os" "path/filepath" - "strings" + "time" - "github.com/samber/lo" + "github.com/google/go-containerregistry/pkg/v1/layout" dkplog "github.com/deckhouse/deckhouse/pkg/log" + "github.com/deckhouse/deckhouse/pkg/registry" "github.com/deckhouse/deckhouse-cli/internal/mirror/chunked" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/bundle" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/errorutil" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/retry" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/retry/task" ) -// PusherService handles the pushing of images to the registry -type PusherService struct { +const ( + defaultDirPermissions = 0755 + pushRetryAttempts = 4 + pushRetryDelay = 3 * time.Second +) + +// LayoutMapping defines the mapping between bundle layout path and registry segment +type LayoutMapping struct { + // LayoutPath is the path within the unpacked package (e.g., "module", "release") + LayoutPath string + // Segment is the registry path segment to push to (e.g., "", "release") + Segment string +} + +// PackagePushConfig defines the configuration for pushing a package +type PackagePushConfig struct { + // PackageName is the name of the package file (without .tar extension) + PackageName string + // ProcessName is the name shown in logs (e.g., "Push Deckhouse platform") + ProcessName string + // WorkingDir is the temp directory for unpacking + WorkingDir string + // BundleDir is the directory containing the bundle + BundleDir string + // Client is the registry client to use for pushing + Client registry.Client + // MandatoryLayouts returns paths that must exist after unpacking (for validation) + // Key is description, value is path. Used if MandatoryLayoutsFunc is nil. + MandatoryLayouts map[string]string + // MandatoryLayoutsFunc dynamically builds the mandatory layouts for validation + // If set, MandatoryLayouts field is ignored. packageDir is the unpacked path. + MandatoryLayoutsFunc func(packageDir string) map[string]string + // Layouts is the static list of layouts to push (used if LayoutsFunc is nil) + Layouts []LayoutMapping + // LayoutsFunc dynamically builds the layouts list after unpacking + // If set, Layouts field is ignored. packageDir is the unpacked path. + LayoutsFunc func(packageDir string) []LayoutMapping +} + +// Service handles the pushing of images to the registry +type Service struct { logger *dkplog.Logger userLogger *log.SLogger } -// NewPusherService creates a new PusherService -func NewPusherService( - logger *dkplog.Logger, - userLogger *log.SLogger, -) *PusherService { - return &PusherService{ +// NewService creates a new pusher service +func NewService(logger *dkplog.Logger, userLogger *log.SLogger) *Service { + return &Service{ logger: logger, userLogger: userLogger, } } -// PushModules pushes module packages from the bundle directory -func (ps *PusherService) PushModules(_ context.Context, bundleDir string, _ interface{}) error { - bundleContents, err := os.ReadDir(bundleDir) +// PackageExists checks if a package exists (tar or chunked) +func (s *Service) PackageExists(bundleDir, pkgName string) bool { + packagePath := filepath.Join(bundleDir, pkgName+".tar") + if _, err := os.Stat(packagePath); err == nil { + return true + } + // Check for chunked package + if _, err := os.Stat(packagePath + ".chunk000"); err == nil { + return true + } + return false +} + +// PushPackage handles the common flow of pushing a package: +// 1. Check if package exists +// 2. Create temp directory +// 3. Unpack package +// 4. Validate structure +// 5. Push all layouts +// 6. Cleanup temp directory +func (s *Service) PushPackage(ctx context.Context, config PackagePushConfig) error { + // Check if package exists + if !s.PackageExists(config.BundleDir, config.PackageName) { + s.userLogger.Infof("%s package is not present, skipping", config.PackageName) + return nil + } + + return s.userLogger.Process(config.ProcessName, func() error { + return s.pushPackageInternal(ctx, config) + }) +} + +func (s *Service) pushPackageInternal(ctx context.Context, config PackagePushConfig) error { + // Create temp directory + packageDir := filepath.Join(config.WorkingDir, config.PackageName) + if err := os.MkdirAll(packageDir, defaultDirPermissions); err != nil { + return fmt.Errorf("create temp directory: %w", err) + } + defer func() { + if err := os.RemoveAll(packageDir); err != nil { + s.logger.Warn("Failed to cleanup temp directory", + slog.String("path", packageDir), + slog.Any("error", err)) + } + }() + + // Open and unpack + pkg, err := s.OpenPackage(config.BundleDir, config.PackageName) if err != nil { - return fmt.Errorf("list bundle directory: %w", err) - } - - modulePackages := lo.Compact(lo.Map(bundleContents, func(item os.DirEntry, _ int) string { - fileExt := filepath.Ext(item.Name()) - pkgName, _, ok := strings.Cut(strings.TrimPrefix(item.Name(), "module-"), ".") - switch { - case !ok: - fallthrough - case fileExt != ".tar" && fileExt != ".chunk": - fallthrough - case !strings.HasPrefix(item.Name(), "module-"): - return "" + return fmt.Errorf("open package: %w", err) + } + defer pkg.Close() + + s.userLogger.InfoLn("Unpacking package") + if err := bundle.Unpack(ctx, pkg, packageDir); err != nil { + return fmt.Errorf("unpack package: %w", err) + } + + // Validate structure (dynamic or static) + mandatoryLayouts := config.MandatoryLayouts + if config.MandatoryLayoutsFunc != nil { + mandatoryLayouts = config.MandatoryLayoutsFunc(packageDir) + } + if len(mandatoryLayouts) > 0 { + s.userLogger.InfoLn("Validating package structure") + if err := bundle.ValidateUnpackedPackage(mandatoryLayouts); err != nil { + return fmt.Errorf("invalid package structure: %w", err) } - return pkgName - })) + } + + // Get layouts to push (dynamic or static) + layouts := config.Layouts + if config.LayoutsFunc != nil { + layouts = config.LayoutsFunc(packageDir) + } + + // Push layouts + for _, layoutMapping := range layouts { + layoutFullPath := filepath.Join(packageDir, layoutMapping.LayoutPath) - successfullyPushedModules := make([]string, 0) - for _, modulePackageName := range modulePackages { - if lo.Contains(successfullyPushedModules, modulePackageName) { + // Check if layout exists + if _, err := os.Stat(filepath.Join(layoutFullPath, "index.json")); os.IsNotExist(err) { + s.logger.Debug("Layout does not exist, skipping", slog.String("layout", layoutMapping.LayoutPath)) continue } - if err = ps.userLogger.Process("Push module: "+modulePackageName, func() error { - pkg, err := ps.openPackage(bundleDir, "module-"+modulePackageName) - if err != nil { - return fmt.Errorf("open package %q: %w", modulePackageName, err) - } - defer pkg.Close() + client := config.Client + if layoutMapping.Segment != "" { + client = client.WithSegment(layoutMapping.Segment) + } + + repoRef := client.GetRegistry() + s.userLogger.InfoLn("Pushing", repoRef) - // Here we would call operations.PushModule, but since we don't have access to it, - // we'll leave this as a placeholder - // if err = operations.PushModule(pushParams, modulePackageName, pkg, client); err != nil { - // return fmt.Errorf("failed to push module %q: %w", modulePackageName, err) - // } + if err := s.PushLayout(ctx, layout.Path(layoutFullPath), client); err != nil { + return fmt.Errorf("push layout %q: %w", layoutMapping.LayoutPath, err) + } + } - ps.userLogger.InfoLn("Module " + modulePackageName + " pushed successfully") + return nil +} - successfullyPushedModules = append(successfullyPushedModules, modulePackageName) +// PushLayout pushes all images from an OCI layout to the registry +func (s *Service) PushLayout(ctx context.Context, layoutPath layout.Path, client registry.Client) error { + index, err := layoutPath.ImageIndex() + if err != nil { + return fmt.Errorf("read OCI image index: %w", err) + } - return nil - }); err != nil { - ps.userLogger.WarnLn(err) - } + indexManifest, err := index.IndexManifest() + if err != nil { + return fmt.Errorf("parse OCI image index manifest: %w", err) } - if len(successfullyPushedModules) > 0 { - ps.userLogger.Infof("Modules pushed: %v", strings.Join(successfullyPushedModules, ", ")) + if len(indexManifest.Manifests) == 0 { + s.userLogger.InfoLn("No images to push") + return nil + } + + s.userLogger.Infof("Pushing %d images", len(indexManifest.Manifests)) + + for i, manifest := range indexManifest.Manifests { + tag := manifest.Annotations["io.deckhouse.image.short_tag"] + if tag == "" { + s.logger.Warn("Skipping image without short_tag annotation", slog.String("digest", manifest.Digest.String())) + continue + } + + s.userLogger.Infof("[%d / %d] Pushing image %s:%s", i+1, len(indexManifest.Manifests), client.GetRegistry(), tag) + + img, err := index.Image(manifest.Digest) + if err != nil { + return fmt.Errorf("read image %s: %w", tag, err) + } + + err = retry.RunTaskWithContext( + ctx, silentLogger{}, "push", + task.WithConstantRetries(pushRetryAttempts, pushRetryDelay, func(ctx context.Context) error { + if err := client.PushImage(ctx, tag, img); err != nil { + if errorutil.IsTrivyMediaTypeNotAllowedError(err) { + return fmt.Errorf(errorutil.CustomTrivyMediaTypesWarning) + } + return fmt.Errorf("write %s:%s to registry: %w", client.GetRegistry(), tag, err) + } + return nil + }), + ) + if err != nil { + return fmt.Errorf("push image %s: %w", tag, err) + } } return nil } -// openPackage opens a package file, trying .tar first, then .chunk -func (ps *PusherService) openPackage(bundleDir, pkgName string) (io.ReadCloser, error) { +// OpenPackage opens a package file, trying .tar first, then chunked +func (s *Service) OpenPackage(bundleDir, pkgName string) (io.ReadCloser, error) { p := filepath.Join(bundleDir, pkgName+".tar") pkg, err := os.Open(p) switch { case os.IsNotExist(err): - return ps.openChunkedPackage(bundleDir, pkgName) + return s.openChunkedPackage(bundleDir, pkgName) case err != nil: return nil, fmt.Errorf("read bundle package %s: %w", pkgName, err) } @@ -120,8 +260,7 @@ func (ps *PusherService) openPackage(bundleDir, pkgName string) (io.ReadCloser, return pkg, nil } -// openChunkedPackage opens a chunked package -func (ps *PusherService) openChunkedPackage(bundleDir, pkgName string) (io.ReadCloser, error) { +func (s *Service) openChunkedPackage(bundleDir, pkgName string) (io.ReadCloser, error) { pkg, err := chunked.Open(bundleDir, pkgName+".tar") if err != nil { return nil, fmt.Errorf("open bundle package %q: %w", pkgName, err) @@ -129,3 +268,14 @@ func (ps *PusherService) openChunkedPackage(bundleDir, pkgName string) (io.ReadC return pkg, nil } + +// silentLogger suppresses retry task logging +type silentLogger struct{} + +func (silentLogger) Debugf(_ string, _ ...interface{}) {} +func (silentLogger) DebugLn(_ ...interface{}) {} +func (silentLogger) Infof(_ string, _ ...interface{}) {} +func (silentLogger) InfoLn(_ ...interface{}) {} +func (silentLogger) Warnf(_ string, _ ...interface{}) {} +func (silentLogger) WarnLn(_ ...interface{}) {} +func (silentLogger) Process(_ string, _ func() error) error { return nil } diff --git a/internal/mirror/security/push.go b/internal/mirror/security/push.go new file mode 100644 index 00000000..e0617af4 --- /dev/null +++ b/internal/mirror/security/push.go @@ -0,0 +1,85 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package security + +import ( + "context" + "path/filepath" + + dkplog "github.com/deckhouse/deckhouse/pkg/log" + "github.com/deckhouse/deckhouse/pkg/registry" + + "github.com/deckhouse/deckhouse-cli/internal/mirror/pusher" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" +) + +// PushOptions contains options for pushing security images +type PushOptions struct { + BundleDir string + WorkingDir string +} + +// PushService handles pushing security database images to registry +type PushService struct { + client registry.Client + pusherService *pusher.Service + options *PushOptions +} + +// NewPushService creates a new security push service +func NewPushService( + client registry.Client, + options *PushOptions, + logger *dkplog.Logger, + userLogger *log.SLogger, +) *PushService { + if options == nil { + options = &PushOptions{} + } + + return &PushService{ + client: client, + pusherService: pusher.NewService(logger, userLogger), + options: options, + } +} + +// Push pushes the security package to the registry +func (svc *PushService) Push(ctx context.Context) error { + return svc.pusherService.PushPackage(ctx, pusher.PackagePushConfig{ + PackageName: "security", + ProcessName: "Push security databases", + WorkingDir: svc.options.WorkingDir, + BundleDir: svc.options.BundleDir, + Client: svc.client.WithSegment("security"), + // New pull creates layouts at security/trivy-db, security/trivy-bdu, etc. + MandatoryLayoutsFunc: func(packageDir string) map[string]string { + return map[string]string{ + "trivy database layout": filepath.Join(packageDir, "security", "trivy-db"), + "trivy bdu layout": filepath.Join(packageDir, "security", "trivy-bdu"), + "trivy java database layout": filepath.Join(packageDir, "security", "trivy-java-db"), + "trivy checks layout": filepath.Join(packageDir, "security", "trivy-checks"), + } + }, + Layouts: []pusher.LayoutMapping{ + {LayoutPath: "security/trivy-db", Segment: "trivy-db"}, + {LayoutPath: "security/trivy-java-db", Segment: "trivy-java-db"}, + {LayoutPath: "security/trivy-bdu", Segment: "trivy-bdu"}, + {LayoutPath: "security/trivy-checks", Segment: "trivy-checks"}, + }, + }) +} From 7dc3688287f860ce3ac613ca94356493323781ce Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Thu, 18 Dec 2025 16:14:45 +0300 Subject: [PATCH 02/14] fix pull-push Signed-off-by: Timur Tuktamyshev --- internal/layout.go | 4 +- internal/mirror/cmd/pull/flags/flags.go | 11 +- internal/mirror/cmd/pull/pull.go | 4 +- internal/mirror/cmd/push/push.go | 5 +- internal/mirror/modules/modules.go | 8 +- internal/mirror/modules/push.go | 197 ------------- internal/mirror/platform/platform.go | 8 +- internal/mirror/platform/push.go | 77 ----- internal/mirror/pull.go | 17 +- internal/mirror/push.go | 373 +++++++++++++++++++++--- internal/mirror/pusher/pusher.go | 149 +--------- internal/mirror/releases/versions.go | 6 +- internal/mirror/security/push.go | 85 ------ pkg/libmirror/operations/params/pull.go | 1 + 14 files changed, 382 insertions(+), 563 deletions(-) delete mode 100644 internal/mirror/modules/push.go delete mode 100644 internal/mirror/platform/push.go delete mode 100644 internal/mirror/security/push.go diff --git a/internal/layout.go b/internal/layout.go index c3472290..9d8e562d 100644 --- a/internal/layout.go +++ b/internal/layout.go @@ -31,9 +31,9 @@ const ( InstallStandaloneSegment = "install-standalone" ReleaseChannelSegment = "release-channel" - ModulesSegment = "modules" + ModulesSegment = "module" ModulesExtraSegment = "extra" - ModulesReleasesSegment = "releases" + ModulesReleasesSegment = "release" SecuritySegment = "security" diff --git a/internal/mirror/cmd/pull/flags/flags.go b/internal/mirror/cmd/pull/flags/flags.go index e0aacd0b..c5487349 100644 --- a/internal/mirror/cmd/pull/flags/flags.go +++ b/internal/mirror/cmd/pull/flags/flags.go @@ -55,8 +55,9 @@ var ( SourceRegistryPassword string DeckhouseLicenseToken string - DoGOSTDigest bool - NoPullResume bool + DoGOSTDigest bool + NoPullResume bool + IgnoreSuspend bool NoPlatform bool NoSecurityDB bool @@ -161,6 +162,12 @@ module-name@=v1.3.0+stable → exact tag match: include only v1.3.0 and and publ false, "Do not continue last unfinished pull operation and start from scratch.", ) + flagSet.BoolVar( + &IgnoreSuspend, + "ignore-suspend", + false, + "Ignore suspended release channels and continue mirroring. Use with caution.", + ) flagSet.BoolVar( &NoPlatform, "no-platform", diff --git a/internal/mirror/cmd/pull/pull.go b/internal/mirror/cmd/pull/pull.go index 51f1d015..6d40e349 100644 --- a/internal/mirror/cmd/pull/pull.go +++ b/internal/mirror/cmd/pull/pull.go @@ -114,7 +114,7 @@ func pull(cmd *cobra.Command, _ []string) error { if err := puller.Execute(ctx); err != nil { if errors.Is(err, context.Canceled) { - puller.logger.WarnLn("\nOperation cancelled by user") + puller.logger.WarnLn("Operation cancelled by user") return nil } return ErrPullFailed @@ -173,6 +173,7 @@ func buildPullParams(logger params.Logger) *params.PullParams { SkipSecurityDatabases: pullflags.NoSecurityDB, SkipModules: pullflags.NoModules, OnlyExtraImages: pullflags.OnlyExtraImages, + IgnoreSuspend: pullflags.IgnoreSuspend, DeckhouseTag: pullflags.DeckhouseTag, SinceVersion: pullflags.SinceVersion, } @@ -285,6 +286,7 @@ func (p *Puller) Execute(ctx context.Context) error { SkipSecurity: pullflags.NoSecurityDB, SkipModules: pullflags.NoModules, OnlyExtraImages: pullflags.OnlyExtraImages, + IgnoreSuspend: pullflags.IgnoreSuspend, ModuleFilter: filter, BundleDir: pullflags.ImagesBundlePath, BundleChunkSize: pullflags.ImagesBundleChunkSizeGB * 1000 * 1000 * 1000, diff --git a/internal/mirror/cmd/push/push.go b/internal/mirror/cmd/push/push.go index ccf5bcd3..62916683 100644 --- a/internal/mirror/cmd/push/push.go +++ b/internal/mirror/cmd/push/push.go @@ -342,9 +342,8 @@ func (p *Pusher) executeNewPush() error { svc := mirror.NewPushService( client, &mirror.PushServiceOptions{ - BundleDir: p.pushParams.BundleDir, - WorkingDir: p.pushParams.WorkingDir, - ModulesPathSuffix: p.pushParams.ModulesPathSuffix, + BundleDir: p.pushParams.BundleDir, + WorkingDir: p.pushParams.WorkingDir, }, logger.Named("push"), p.logger.(*log.SLogger), diff --git a/internal/mirror/modules/modules.go b/internal/mirror/modules/modules.go index 220870f1..ed052684 100644 --- a/internal/mirror/modules/modules.go +++ b/internal/mirror/modules/modules.go @@ -369,9 +369,13 @@ func (svc *Service) extractVersionsFromReleaseChannels(ctx context.Context, modu svc.logger.Debug(fmt.Sprintf("Failed to extract version.json for %s/%s: %v", moduleName, channel, err)) continue } - if versionJSON.Version != "" { - versions = append(versions, "v"+versionJSON.Version) + version := versionJSON.Version + // Ensure version has "v" prefix (some may already have it) + if !strings.HasPrefix(version, "v") { + version = "v" + version + } + versions = append(versions, version) } } diff --git a/internal/mirror/modules/push.go b/internal/mirror/modules/push.go deleted file mode 100644 index f4705fe3..00000000 --- a/internal/mirror/modules/push.go +++ /dev/null @@ -1,197 +0,0 @@ -/* -Copyright 2025 Flant JSC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package modules - -import ( - "context" - "fmt" - "log/slog" - "os" - "path" - "path/filepath" - "slices" - "strings" - - dkplog "github.com/deckhouse/deckhouse/pkg/log" - "github.com/deckhouse/deckhouse/pkg/registry" - - "github.com/deckhouse/deckhouse-cli/internal/mirror/pusher" - "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" -) - -// PushOptions contains options for pushing module images -type PushOptions struct { - BundleDir string - WorkingDir string -} - -// PushService handles pushing module images to registry -type PushService struct { - client registry.Client - pusherService *pusher.Service - options *PushOptions - logger *dkplog.Logger - userLogger *log.SLogger -} - -// NewPushService creates a new modules push service -func NewPushService( - client registry.Client, - options *PushOptions, - logger *dkplog.Logger, - userLogger *log.SLogger, -) *PushService { - if options == nil { - options = &PushOptions{} - } - - return &PushService{ - client: client, - pusherService: pusher.NewService(logger, userLogger), - options: options, - logger: logger, - userLogger: userLogger, - } -} - -// Push pushes all module packages to the registry -func (svc *PushService) Push(ctx context.Context) error { - modulePackages, err := svc.findModulePackages() - if err != nil { - return fmt.Errorf("find module packages: %w", err) - } - - if len(modulePackages) == 0 { - svc.userLogger.InfoLn("No module packages found, skipping") - return nil - } - - pushed := make(map[string]struct{}) - for _, moduleName := range modulePackages { - if _, ok := pushed[moduleName]; ok { - continue - } - - if err := svc.pushModule(ctx, moduleName); err != nil { - svc.userLogger.WarnLn(err) - continue - } - pushed[moduleName] = struct{}{} - } - - if len(pushed) > 0 { - names := make([]string, 0, len(pushed)) - for name := range pushed { - names = append(names, name) - } - slices.Sort(names) - svc.userLogger.Infof("Modules pushed: %s", strings.Join(names, ", ")) - } - - return nil -} - -func (svc *PushService) findModulePackages() ([]string, error) { - entries, err := os.ReadDir(svc.options.BundleDir) - if err != nil { - return nil, fmt.Errorf("list bundle directory: %w", err) - } - - modules := make([]string, 0, len(entries)) - for _, entry := range entries { - name := entry.Name() - - // Skip non-module files - if !strings.HasPrefix(name, "module-") { - continue - } - - // Only process .tar and .chunk files - ext := filepath.Ext(name) - if ext != ".tar" && ext != ".chunk" { - continue - } - - // Extract module name: "module-foo.tar" -> "foo" - // Handle chunked files: "module-foo.tar.chunk000" -> "foo" - moduleName := strings.TrimPrefix(name, "module-") - moduleName = strings.TrimSuffix(moduleName, ext) - moduleName = strings.TrimSuffix(moduleName, ".tar") - - modules = append(modules, moduleName) - } - - return modules, nil -} - -func (svc *PushService) pushModule(ctx context.Context, moduleName string) error { - return svc.pusherService.PushPackage(ctx, pusher.PackagePushConfig{ - PackageName: "module-" + moduleName, - ProcessName: "Push module: " + moduleName, - WorkingDir: filepath.Join(svc.options.WorkingDir, "modules"), - BundleDir: svc.options.BundleDir, - Client: svc.client.WithSegment(moduleName), - // New pull creates: module/, release/, extra/ - MandatoryLayoutsFunc: func(packageDir string) map[string]string { - return map[string]string{ - "module root layout": filepath.Join(packageDir, "module"), - "module release channels layout": filepath.Join(packageDir, "release"), - } - }, - // Dynamic layout discovery after unpacking - LayoutsFunc: svc.buildModuleLayouts, - }) -} - -// buildModuleLayouts returns the list of layouts for a module, including dynamic extra discovery -func (svc *PushService) buildModuleLayouts(packageDir string) []pusher.LayoutMapping { - layouts := []pusher.LayoutMapping{ - {LayoutPath: "module", Segment: ""}, // Root module images - {LayoutPath: "release", Segment: "release"}, // Release channels - } - - // Check if extra directory exists - extraDir := filepath.Join(packageDir, "extra") - if _, err := os.Stat(extraDir); os.IsNotExist(err) { - return layouts - } - - // Add root extra layout - layouts = append(layouts, pusher.LayoutMapping{ - LayoutPath: "extra", - Segment: "extra", - }) - - // Discover nested extra layouts - entries, err := os.ReadDir(extraDir) - if err != nil { - svc.logger.Warn("Error reading extra dir", slog.Any("error", err)) - return layouts - } - - for _, entry := range entries { - if entry.IsDir() { - svc.logger.Debug("Found extra layout", slog.String("layout", entry.Name())) - layouts = append(layouts, pusher.LayoutMapping{ - LayoutPath: path.Join("extra", entry.Name()), - Segment: path.Join("extra", entry.Name()), - }) - } - } - - return layouts -} diff --git a/internal/mirror/platform/platform.go b/internal/mirror/platform/platform.go index b8888394..dd4ed42f 100644 --- a/internal/mirror/platform/platform.go +++ b/internal/mirror/platform/platform.go @@ -56,6 +56,8 @@ type Options struct { BundleDir string // BundleChunkSize is the max size of bundle chunks in bytes (0 = no chunking) BundleChunkSize int64 + // IgnoreSuspend allows mirroring even if release channels are suspended + IgnoreSuspend bool } type Service struct { @@ -278,9 +280,9 @@ func (svc *Service) getReleaseChannelVersionFromRegistry(ctx context.Context, re return nil, fmt.Errorf("cannot get %s release channel version.json: %w", releaseChannel, err) } - // if meta.Suspend { - // return nil, fmt.Errorf("source registry contains suspended release channel %q, try again later", releaseChannel) - // } + if meta.Suspend && !svc.options.IgnoreSuspend { + return nil, fmt.Errorf("source registry contains suspended release channel %q, try again later (use --ignore-suspend to override)", releaseChannel) + } ver, err := semver.NewVersion(meta.Version) if err != nil { diff --git a/internal/mirror/platform/push.go b/internal/mirror/platform/push.go deleted file mode 100644 index 9f5d646d..00000000 --- a/internal/mirror/platform/push.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Copyright 2025 Flant JSC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package platform - -import ( - "context" - - dkplog "github.com/deckhouse/deckhouse/pkg/log" - "github.com/deckhouse/deckhouse/pkg/registry" - - "github.com/deckhouse/deckhouse-cli/internal/mirror/pusher" - "github.com/deckhouse/deckhouse-cli/pkg/libmirror/bundle" - "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" -) - -// PushOptions contains options for pushing platform images -type PushOptions struct { - BundleDir string - WorkingDir string -} - -// PushService handles pushing platform images to registry -type PushService struct { - client registry.Client - pusherService *pusher.Service - options *PushOptions -} - -// NewPushService creates a new platform push service -func NewPushService( - client registry.Client, - options *PushOptions, - logger *dkplog.Logger, - userLogger *log.SLogger, -) *PushService { - if options == nil { - options = &PushOptions{} - } - - return &PushService{ - client: client, - pusherService: pusher.NewService(logger, userLogger), - options: options, - } -} - -// Push pushes the platform package to the registry -func (svc *PushService) Push(ctx context.Context) error { - return svc.pusherService.PushPackage(ctx, pusher.PackagePushConfig{ - PackageName: "platform", - ProcessName: "Push Deckhouse platform", - WorkingDir: svc.options.WorkingDir, - BundleDir: svc.options.BundleDir, - Client: svc.client, - MandatoryLayoutsFunc: bundle.MandatoryLayoutsForPlatform, - Layouts: []pusher.LayoutMapping{ - {LayoutPath: "", Segment: ""}, // Root layout - {LayoutPath: "install", Segment: "install"}, // Installer images - {LayoutPath: "install-standalone", Segment: "install-standalone"}, // Standalone installer - {LayoutPath: "release-channel", Segment: "release-channel"}, // Release channels - }, - }) -} diff --git a/internal/mirror/pull.go b/internal/mirror/pull.go index 4a1a74ef..6ca5b5c9 100644 --- a/internal/mirror/pull.go +++ b/internal/mirror/pull.go @@ -40,6 +40,8 @@ type PullServiceOptions struct { SkipModules bool // OnlyExtraImages pulls only extra images for modules (without main module images) OnlyExtraImages bool + // IgnoreSuspend allows mirroring even if release channels are suspended + IgnoreSuspend bool // ModuleFilter is the filter for module selection (whitelist/blacklist) ModuleFilter *libmodules.Filter // BundleDir is the directory to store the bundle @@ -88,6 +90,7 @@ func NewPullService( TargetTag: targetTag, BundleDir: options.BundleDir, BundleChunkSize: options.BundleChunkSize, + IgnoreSuspend: options.IgnoreSuspend, }, logger, userLogger, @@ -127,23 +130,23 @@ func NewPullService( // Pull downloads Deckhouse components from registry func (svc *PullService) Pull(ctx context.Context) error { if !svc.options.SkipPlatform { - err := svc.platformService.PullPlatform(ctx) - if err != nil { - return fmt.Errorf("pull platform: %w", err) + err := svc.platformService.PullPlatform(ctx) + if err != nil { + return fmt.Errorf("pull platform: %w", err) } } if !svc.options.SkipSecurity { err := svc.securityService.PullSecurity(ctx) - if err != nil { - return fmt.Errorf("pull security databases: %w", err) + if err != nil { + return fmt.Errorf("pull security databases: %w", err) } } if !svc.options.SkipModules || svc.options.OnlyExtraImages { err := svc.modulesService.PullModules(ctx) - if err != nil { - return fmt.Errorf("pull modules: %w", err) + if err != nil { + return fmt.Errorf("pull modules: %w", err) } } diff --git a/internal/mirror/push.go b/internal/mirror/push.go index de9bbae9..83bd4be5 100644 --- a/internal/mirror/push.go +++ b/internal/mirror/push.go @@ -19,31 +19,46 @@ package mirror import ( "context" "fmt" + "io/fs" + "log/slog" + "os" + "path/filepath" + "slices" + "strings" + + "github.com/google/go-containerregistry/pkg/v1/layout" dkplog "github.com/deckhouse/deckhouse/pkg/log" "github.com/deckhouse/deckhouse/pkg/registry" - "github.com/deckhouse/deckhouse-cli/internal/mirror/modules" - "github.com/deckhouse/deckhouse-cli/internal/mirror/platform" - "github.com/deckhouse/deckhouse-cli/internal/mirror/security" + "github.com/deckhouse/deckhouse-cli/internal/mirror/pusher" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/bundle" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" ) +const ( + defaultDirPermissions = 0755 + + platformPackage = "platform" + securityPackage = "security" + modulesPrefix = "module-" +) + // PushServiceOptions contains configuration options for PushService type PushServiceOptions struct { // BundleDir is the directory containing the bundle to push BundleDir string // WorkingDir is the temporary directory for unpacking bundles WorkingDir string - // ModulesPathSuffix is the path suffix for modules in registry - ModulesPathSuffix string } // PushService orchestrates pushing Deckhouse components to registry type PushService struct { - platformService *platform.PushService - securityService *security.PushService - modulesService *modules.PushService + client registry.Client + options *PushServiceOptions + pusher *pusher.Service + logger *dkplog.Logger + userLogger *log.SLogger } // NewPushService creates a new PushService @@ -58,52 +73,322 @@ func NewPushService( } return &PushService{ - platformService: platform.NewPushService( - client, - &platform.PushOptions{ - BundleDir: options.BundleDir, - WorkingDir: options.WorkingDir, - }, - logger.Named("platform"), - userLogger, - ), - securityService: security.NewPushService( - client, - &security.PushOptions{ - BundleDir: options.BundleDir, - WorkingDir: options.WorkingDir, - }, - logger.Named("security"), - userLogger, - ), - modulesService: modules.NewPushService( - client.WithSegment(options.ModulesPathSuffix), - &modules.PushOptions{ - BundleDir: options.BundleDir, - WorkingDir: options.WorkingDir, - }, - logger.Named("modules"), - userLogger, - ), + client: client, + options: options, + pusher: pusher.NewService(logger, userLogger), + logger: logger, + userLogger: userLogger, } } // Push uploads Deckhouse components to registry +// It unpacks all packages into a unified directory structure and pushes +// each OCI layout based on its path (path = registry segment) func (svc *PushService) Push(ctx context.Context) error { - // Push platform package - if err := svc.platformService.Push(ctx); err != nil { - return fmt.Errorf("push platform: %w", err) + // Create unified directory for unpacking + unifiedDir := filepath.Join(svc.options.WorkingDir, "unified") + if err := os.MkdirAll(unifiedDir, defaultDirPermissions); err != nil { + return fmt.Errorf("create unified directory: %w", err) + } + defer func() { + if err := os.RemoveAll(unifiedDir); err != nil { + svc.logger.Warn("Failed to cleanup unified directory", + slog.String("path", unifiedDir), + slog.Any("error", err)) + } + }() + + // Unpack all packages into unified structure + if err := svc.unpackPlatform(ctx, unifiedDir); err != nil { + return fmt.Errorf("unpack platform: %w", err) } - // Push security package - if err := svc.securityService.Push(ctx); err != nil { - return fmt.Errorf("push security databases: %w", err) + if err := svc.unpackSecurity(ctx, unifiedDir); err != nil { + return fmt.Errorf("unpack security: %w", err) } - // Push module packages - if err := svc.modulesService.Push(ctx); err != nil { - return fmt.Errorf("push modules: %w", err) + if err := svc.unpackModules(ctx, unifiedDir); err != nil { + return fmt.Errorf("unpack modules: %w", err) + } + + // Push all layouts recursively + return svc.userLogger.Process("Push to registry", func() error { + return svc.pushDir(ctx, unifiedDir, svc.client) + }) +} + +// pushDir recursively walks directory and pushes each OCI layout +// The relative path from root becomes the registry segment +func (svc *PushService) pushDir(ctx context.Context, rootDir string, client registry.Client) error { + var layouts []string + + // First, collect all layouts + err := filepath.WalkDir(rootDir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() || d.Name() != "index.json" { + return nil + } + layouts = append(layouts, filepath.Dir(path)) + return nil + }) + if err != nil { + return fmt.Errorf("scan layouts: %w", err) + } + + if len(layouts) == 0 { + svc.userLogger.InfoLn("No layouts to push") + return nil + } + + // Sort for predictable output + slices.Sort(layouts) + + svc.userLogger.Infof("Found %d layouts to push", len(layouts)) + + // Push each layout + for _, layoutDir := range layouts { + // Skip empty layouts (no images) + hasImages, err := svc.layoutHasImages(layoutDir) + if err != nil { + svc.logger.Warn("Failed to check layout", slog.String("path", layoutDir), slog.Any("error", err)) + continue + } + if !hasImages { + continue + } + + relPath, _ := filepath.Rel(rootDir, layoutDir) + + // Build registry segment from relative path + segment := "" + if relPath != "." { + segment = relPath + } + + targetClient := client + if segment != "" { + // WithSegment expects single segment, but we have path like "modules/virtualization" + // Split and apply each segment + for _, seg := range strings.Split(segment, string(os.PathSeparator)) { + targetClient = targetClient.WithSegment(seg) + } + } + + svc.userLogger.Infof("Pushing %s", targetClient.GetRegistry()) + + if err := svc.pusher.PushLayout(ctx, layout.Path(layoutDir), targetClient); err != nil { + return fmt.Errorf("push layout %q: %w", relPath, err) + } } return nil } + +// layoutHasImages checks if an OCI layout has any images to push +func (svc *PushService) layoutHasImages(layoutDir string) (bool, error) { + layoutPath := layout.Path(layoutDir) + index, err := layoutPath.ImageIndex() + if err != nil { + return false, fmt.Errorf("read index: %w", err) + } + + indexManifest, err := index.IndexManifest() + if err != nil { + return false, fmt.Errorf("parse index manifest: %w", err) + } + + return len(indexManifest.Manifests) > 0, nil +} + +// unpackPlatform unpacks platform.tar to root of unified directory +// platform.tar contains: index.json, install/, install-standalone/, release-channel/ +func (svc *PushService) unpackPlatform(ctx context.Context, unifiedDir string) error { + if !svc.pusher.PackageExists(svc.options.BundleDir, platformPackage) { + svc.userLogger.InfoLn("Platform package not found, skipping") + return nil + } + + return svc.userLogger.Process("Unpack platform", func() error { + pkg, err := svc.pusher.OpenPackage(svc.options.BundleDir, platformPackage) + if err != nil { + return fmt.Errorf("open package: %w", err) + } + defer pkg.Close() + + // Platform unpacks directly to root + if err := bundle.Unpack(ctx, pkg, unifiedDir); err != nil { + return fmt.Errorf("unpack: %w", err) + } + return nil + }) +} + +// unpackSecurity unpacks security.tar to unified directory +// security.tar contains: security/trivy-db/, security/trivy-java-db/, etc. +// These paths already include "security/" prefix, so unpack to root +func (svc *PushService) unpackSecurity(ctx context.Context, unifiedDir string) error { + if !svc.pusher.PackageExists(svc.options.BundleDir, securityPackage) { + svc.userLogger.InfoLn("Security package not found, skipping") + return nil + } + + return svc.userLogger.Process("Unpack security", func() error { + pkg, err := svc.pusher.OpenPackage(svc.options.BundleDir, securityPackage) + if err != nil { + return fmt.Errorf("open package: %w", err) + } + defer pkg.Close() + + // Security tar already has security/ prefix inside + if err := bundle.Unpack(ctx, pkg, unifiedDir); err != nil { + return fmt.Errorf("unpack: %w", err) + } + return nil + }) +} + +// unpackModules unpacks all module-*.tar files +// Each module tar contains: module/, release/, extra/ +// We need to transform paths: +// - module/ -> modules/{name}/ +// - release/ -> modules/{name}/release/ +// - extra/ -> modules/{name}/extra/ +func (svc *PushService) unpackModules(ctx context.Context, unifiedDir string) error { + // Find all module packages + modulePackages, err := svc.findModulePackages() + if err != nil { + return fmt.Errorf("find module packages: %w", err) + } + + if len(modulePackages) == 0 { + svc.userLogger.InfoLn("No module packages found, skipping") + return nil + } + + slices.Sort(modulePackages) + svc.userLogger.Infof("Found %d module packages", len(modulePackages)) + + for _, moduleName := range modulePackages { + if err := svc.unpackModule(ctx, unifiedDir, moduleName); err != nil { + return fmt.Errorf("unpack module %s: %w", moduleName, err) + } + } + + return nil +} + +func (svc *PushService) unpackModule(ctx context.Context, unifiedDir, moduleName string) error { + packageName := modulesPrefix + moduleName + + return svc.userLogger.Process(fmt.Sprintf("Unpack module %s", moduleName), func() error { + pkg, err := svc.pusher.OpenPackage(svc.options.BundleDir, packageName) + if err != nil { + return fmt.Errorf("open package: %w", err) + } + defer pkg.Close() + + // Unpack to temp directory first + tempDir := filepath.Join(svc.options.WorkingDir, "temp-"+moduleName) + if err := os.MkdirAll(tempDir, defaultDirPermissions); err != nil { + return fmt.Errorf("create temp dir: %w", err) + } + defer os.RemoveAll(tempDir) + + if err := bundle.Unpack(ctx, pkg, tempDir); err != nil { + return fmt.Errorf("unpack: %w", err) + } + + // Transform paths and move to unified directory + // module/ -> modules/{name}/ + // release/ -> modules/{name}/release/ + // extra/ -> modules/{name}/extra/ + targetDir := filepath.Join(unifiedDir, "modules", moduleName) + if err := os.MkdirAll(targetDir, defaultDirPermissions); err != nil { + return fmt.Errorf("create target dir: %w", err) + } + + // Move module/ contents to modules/{name}/ + moduleDir := filepath.Join(tempDir, "module") + if _, err := os.Stat(moduleDir); err == nil { + if err := copyDir(moduleDir, targetDir); err != nil { + return fmt.Errorf("copy module layout: %w", err) + } + } + + // Move release/ to modules/{name}/release/ + releaseDir := filepath.Join(tempDir, "release") + if _, err := os.Stat(releaseDir); err == nil { + targetRelease := filepath.Join(targetDir, "release") + if err := copyDir(releaseDir, targetRelease); err != nil { + return fmt.Errorf("copy release layout: %w", err) + } + } + + // Move extra/ to modules/{name}/extra/ + extraDir := filepath.Join(tempDir, "extra") + if _, err := os.Stat(extraDir); err == nil { + targetExtra := filepath.Join(targetDir, "extra") + if err := copyDir(extraDir, targetExtra); err != nil { + return fmt.Errorf("copy extra layout: %w", err) + } + } + + return nil + }) +} + +func (svc *PushService) findModulePackages() ([]string, error) { + entries, err := os.ReadDir(svc.options.BundleDir) + if err != nil { + return nil, fmt.Errorf("read bundle dir: %w", err) + } + + modules := make([]string, 0, len(entries)) + for _, entry := range entries { + name := entry.Name() + if !strings.HasPrefix(name, modulesPrefix) { + continue + } + + // Extract module name: "module-virtualization.tar" -> "virtualization" + moduleName := strings.TrimPrefix(name, modulesPrefix) + moduleName = strings.TrimSuffix(moduleName, ".tar") + + // Handle chunked files: "module-virtualization.tar.chunk000" -> "virtualization" + if idx := strings.Index(moduleName, ".tar.chunk"); idx != -1 { + moduleName = moduleName[:idx] + } + + modules = append(modules, moduleName) + } + + // Deduplicate (in case of chunked files) + slices.Sort(modules) + return slices.Compact(modules), nil +} + +// copyDir copies directory contents recursively +func copyDir(src, dst string) error { + return filepath.WalkDir(src, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + relPath, _ := filepath.Rel(src, path) + targetPath := filepath.Join(dst, relPath) + + if d.IsDir() { + return os.MkdirAll(targetPath, defaultDirPermissions) + } + + // Copy file + data, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("read %s: %w", path, err) + } + + return os.WriteFile(targetPath, data, 0644) + }) +} diff --git a/internal/mirror/pusher/pusher.go b/internal/mirror/pusher/pusher.go index 7e51e7ba..01740230 100644 --- a/internal/mirror/pusher/pusher.go +++ b/internal/mirror/pusher/pusher.go @@ -31,7 +31,6 @@ import ( "github.com/deckhouse/deckhouse/pkg/registry" "github.com/deckhouse/deckhouse-cli/internal/mirror/chunked" - "github.com/deckhouse/deckhouse-cli/pkg/libmirror/bundle" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/errorutil" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/retry" @@ -39,44 +38,10 @@ import ( ) const ( - defaultDirPermissions = 0755 - pushRetryAttempts = 4 - pushRetryDelay = 3 * time.Second + pushRetryAttempts = 4 + pushRetryDelay = 3 * time.Second ) -// LayoutMapping defines the mapping between bundle layout path and registry segment -type LayoutMapping struct { - // LayoutPath is the path within the unpacked package (e.g., "module", "release") - LayoutPath string - // Segment is the registry path segment to push to (e.g., "", "release") - Segment string -} - -// PackagePushConfig defines the configuration for pushing a package -type PackagePushConfig struct { - // PackageName is the name of the package file (without .tar extension) - PackageName string - // ProcessName is the name shown in logs (e.g., "Push Deckhouse platform") - ProcessName string - // WorkingDir is the temp directory for unpacking - WorkingDir string - // BundleDir is the directory containing the bundle - BundleDir string - // Client is the registry client to use for pushing - Client registry.Client - // MandatoryLayouts returns paths that must exist after unpacking (for validation) - // Key is description, value is path. Used if MandatoryLayoutsFunc is nil. - MandatoryLayouts map[string]string - // MandatoryLayoutsFunc dynamically builds the mandatory layouts for validation - // If set, MandatoryLayouts field is ignored. packageDir is the unpacked path. - MandatoryLayoutsFunc func(packageDir string) map[string]string - // Layouts is the static list of layouts to push (used if LayoutsFunc is nil) - Layouts []LayoutMapping - // LayoutsFunc dynamically builds the layouts list after unpacking - // If set, Layouts field is ignored. packageDir is the unpacked path. - LayoutsFunc func(packageDir string) []LayoutMapping -} - // Service handles the pushing of images to the registry type Service struct { logger *dkplog.Logger @@ -104,95 +69,6 @@ func (s *Service) PackageExists(bundleDir, pkgName string) bool { return false } -// PushPackage handles the common flow of pushing a package: -// 1. Check if package exists -// 2. Create temp directory -// 3. Unpack package -// 4. Validate structure -// 5. Push all layouts -// 6. Cleanup temp directory -func (s *Service) PushPackage(ctx context.Context, config PackagePushConfig) error { - // Check if package exists - if !s.PackageExists(config.BundleDir, config.PackageName) { - s.userLogger.Infof("%s package is not present, skipping", config.PackageName) - return nil - } - - return s.userLogger.Process(config.ProcessName, func() error { - return s.pushPackageInternal(ctx, config) - }) -} - -func (s *Service) pushPackageInternal(ctx context.Context, config PackagePushConfig) error { - // Create temp directory - packageDir := filepath.Join(config.WorkingDir, config.PackageName) - if err := os.MkdirAll(packageDir, defaultDirPermissions); err != nil { - return fmt.Errorf("create temp directory: %w", err) - } - defer func() { - if err := os.RemoveAll(packageDir); err != nil { - s.logger.Warn("Failed to cleanup temp directory", - slog.String("path", packageDir), - slog.Any("error", err)) - } - }() - - // Open and unpack - pkg, err := s.OpenPackage(config.BundleDir, config.PackageName) - if err != nil { - return fmt.Errorf("open package: %w", err) - } - defer pkg.Close() - - s.userLogger.InfoLn("Unpacking package") - if err := bundle.Unpack(ctx, pkg, packageDir); err != nil { - return fmt.Errorf("unpack package: %w", err) - } - - // Validate structure (dynamic or static) - mandatoryLayouts := config.MandatoryLayouts - if config.MandatoryLayoutsFunc != nil { - mandatoryLayouts = config.MandatoryLayoutsFunc(packageDir) - } - if len(mandatoryLayouts) > 0 { - s.userLogger.InfoLn("Validating package structure") - if err := bundle.ValidateUnpackedPackage(mandatoryLayouts); err != nil { - return fmt.Errorf("invalid package structure: %w", err) - } - } - - // Get layouts to push (dynamic or static) - layouts := config.Layouts - if config.LayoutsFunc != nil { - layouts = config.LayoutsFunc(packageDir) - } - - // Push layouts - for _, layoutMapping := range layouts { - layoutFullPath := filepath.Join(packageDir, layoutMapping.LayoutPath) - - // Check if layout exists - if _, err := os.Stat(filepath.Join(layoutFullPath, "index.json")); os.IsNotExist(err) { - s.logger.Debug("Layout does not exist, skipping", slog.String("layout", layoutMapping.LayoutPath)) - continue - } - - client := config.Client - if layoutMapping.Segment != "" { - client = client.WithSegment(layoutMapping.Segment) - } - - repoRef := client.GetRegistry() - s.userLogger.InfoLn("Pushing", repoRef) - - if err := s.PushLayout(ctx, layout.Path(layoutFullPath), client); err != nil { - return fmt.Errorf("push layout %q: %w", layoutMapping.LayoutPath, err) - } - } - - return nil -} - // PushLayout pushes all images from an OCI layout to the registry func (s *Service) PushLayout(ctx context.Context, layoutPath layout.Path, client registry.Client) error { index, err := layoutPath.ImageIndex() @@ -206,7 +82,6 @@ func (s *Service) PushLayout(ctx context.Context, layoutPath layout.Path, client } if len(indexManifest.Manifests) == 0 { - s.userLogger.InfoLn("No images to push") return nil } @@ -222,9 +97,9 @@ func (s *Service) PushLayout(ctx context.Context, layoutPath layout.Path, client s.userLogger.Infof("[%d / %d] Pushing image %s:%s", i+1, len(indexManifest.Manifests), client.GetRegistry(), tag) img, err := index.Image(manifest.Digest) - if err != nil { + if err != nil { return fmt.Errorf("read image %s: %w", tag, err) - } + } err = retry.RunTaskWithContext( ctx, silentLogger{}, "push", @@ -235,7 +110,7 @@ func (s *Service) PushLayout(ctx context.Context, layoutPath layout.Path, client } return fmt.Errorf("write %s:%s to registry: %w", client.GetRegistry(), tag, err) } - return nil + return nil }), ) if err != nil { @@ -272,10 +147,10 @@ func (s *Service) openChunkedPackage(bundleDir, pkgName string) (io.ReadCloser, // silentLogger suppresses retry task logging type silentLogger struct{} -func (silentLogger) Debugf(_ string, _ ...interface{}) {} -func (silentLogger) DebugLn(_ ...interface{}) {} -func (silentLogger) Infof(_ string, _ ...interface{}) {} -func (silentLogger) InfoLn(_ ...interface{}) {} -func (silentLogger) Warnf(_ string, _ ...interface{}) {} -func (silentLogger) WarnLn(_ ...interface{}) {} -func (silentLogger) Process(_ string, _ func() error) error { return nil } +func (silentLogger) Debugf(_ string, _ ...interface{}) {} +func (silentLogger) DebugLn(_ ...interface{}) {} +func (silentLogger) Infof(_ string, _ ...interface{}) {} +func (silentLogger) InfoLn(_ ...interface{}) {} +func (silentLogger) Warnf(_ string, _ ...interface{}) {} +func (silentLogger) WarnLn(_ ...interface{}) {} +func (silentLogger) Process(_ string, _ func() error) error { return nil } diff --git a/internal/mirror/releases/versions.go b/internal/mirror/releases/versions.go index 4f6808d7..313a9787 100644 --- a/internal/mirror/releases/versions.go +++ b/internal/mirror/releases/versions.go @@ -165,9 +165,9 @@ func getReleaseChannelVersionFromRegistry(mirrorCtx *params.PullParams, releaseC return nil, fmt.Errorf("cannot find release channel version: %w", err) } - // if releaseInfo.Suspended { - // return nil, fmt.Errorf("cannot mirror Deckhouse: source registry contains suspended release channel %q, try again later", releaseChannel) - // } + if releaseInfo.Suspended && !mirrorCtx.IgnoreSuspend { + return nil, fmt.Errorf("cannot mirror Deckhouse: source registry contains suspended release channel %q, try again later (use --ignore-suspend to override)", releaseChannel) + } ver, err := semver.NewVersion(releaseInfo.Version) if err != nil { diff --git a/internal/mirror/security/push.go b/internal/mirror/security/push.go deleted file mode 100644 index e0617af4..00000000 --- a/internal/mirror/security/push.go +++ /dev/null @@ -1,85 +0,0 @@ -/* -Copyright 2025 Flant JSC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package security - -import ( - "context" - "path/filepath" - - dkplog "github.com/deckhouse/deckhouse/pkg/log" - "github.com/deckhouse/deckhouse/pkg/registry" - - "github.com/deckhouse/deckhouse-cli/internal/mirror/pusher" - "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" -) - -// PushOptions contains options for pushing security images -type PushOptions struct { - BundleDir string - WorkingDir string -} - -// PushService handles pushing security database images to registry -type PushService struct { - client registry.Client - pusherService *pusher.Service - options *PushOptions -} - -// NewPushService creates a new security push service -func NewPushService( - client registry.Client, - options *PushOptions, - logger *dkplog.Logger, - userLogger *log.SLogger, -) *PushService { - if options == nil { - options = &PushOptions{} - } - - return &PushService{ - client: client, - pusherService: pusher.NewService(logger, userLogger), - options: options, - } -} - -// Push pushes the security package to the registry -func (svc *PushService) Push(ctx context.Context) error { - return svc.pusherService.PushPackage(ctx, pusher.PackagePushConfig{ - PackageName: "security", - ProcessName: "Push security databases", - WorkingDir: svc.options.WorkingDir, - BundleDir: svc.options.BundleDir, - Client: svc.client.WithSegment("security"), - // New pull creates layouts at security/trivy-db, security/trivy-bdu, etc. - MandatoryLayoutsFunc: func(packageDir string) map[string]string { - return map[string]string{ - "trivy database layout": filepath.Join(packageDir, "security", "trivy-db"), - "trivy bdu layout": filepath.Join(packageDir, "security", "trivy-bdu"), - "trivy java database layout": filepath.Join(packageDir, "security", "trivy-java-db"), - "trivy checks layout": filepath.Join(packageDir, "security", "trivy-checks"), - } - }, - Layouts: []pusher.LayoutMapping{ - {LayoutPath: "security/trivy-db", Segment: "trivy-db"}, - {LayoutPath: "security/trivy-java-db", Segment: "trivy-java-db"}, - {LayoutPath: "security/trivy-bdu", Segment: "trivy-bdu"}, - {LayoutPath: "security/trivy-checks", Segment: "trivy-checks"}, - }, - }) -} diff --git a/pkg/libmirror/operations/params/pull.go b/pkg/libmirror/operations/params/pull.go index 0477ce8b..c674840b 100644 --- a/pkg/libmirror/operations/params/pull.go +++ b/pkg/libmirror/operations/params/pull.go @@ -29,6 +29,7 @@ type PullParams struct { SkipSecurityDatabases bool // --no-security-db SkipModules bool // --no-modules OnlyExtraImages bool // --only-extra-images + IgnoreSuspend bool // --ignore-suspend BundleChunkSize int64 // Plain bytes // Only one of those 2 is filled at a single time or none at all. From 0c479d0736dbd02449dd5727f46b602cfe3797d4 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Thu, 18 Dec 2025 16:19:32 +0300 Subject: [PATCH 03/14] fix tests Signed-off-by: Timur Tuktamyshev --- internal/mirror/cmd/push/push.go | 2 +- internal/mirror/pusher/pusher.go | 4 ++-- internal/mirror/releases/versions.go | 4 ++-- internal/mirror/releases/versions_test.go | 8 ++++---- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/mirror/cmd/push/push.go b/internal/mirror/cmd/push/push.go index 62916683..658a4214 100644 --- a/internal/mirror/cmd/push/push.go +++ b/internal/mirror/cmd/push/push.go @@ -191,7 +191,7 @@ func pushStaticPackages(pushParams *params.PushParams, logger params.Logger, cli } if err = pkg.Close(); err != nil { - logger.Warnf("Could not close bundle package %s: %w", pkgName, err) + logger.Warnf("Could not close bundle package %s: %v", pkgName, err) } } return nil diff --git a/internal/mirror/pusher/pusher.go b/internal/mirror/pusher/pusher.go index 01740230..8517c81e 100644 --- a/internal/mirror/pusher/pusher.go +++ b/internal/mirror/pusher/pusher.go @@ -97,9 +97,9 @@ func (s *Service) PushLayout(ctx context.Context, layoutPath layout.Path, client s.userLogger.Infof("[%d / %d] Pushing image %s:%s", i+1, len(indexManifest.Manifests), client.GetRegistry(), tag) img, err := index.Image(manifest.Digest) - if err != nil { + if err != nil { return fmt.Errorf("read image %s: %w", tag, err) - } + } err = retry.RunTaskWithContext( ctx, silentLogger{}, "push", diff --git a/internal/mirror/releases/versions.go b/internal/mirror/releases/versions.go index 313a9787..2edb12d4 100644 --- a/internal/mirror/releases/versions.go +++ b/internal/mirror/releases/versions.go @@ -73,7 +73,7 @@ func VersionsToMirror(pullParams *params.PullParams, client registry.Client) ([] alphaChannelVersion := releaseChannelsVersions[internal.AlphaChannel] - versionsAboveMinimal := parseAndFilterVersionsAboveMinimalAnbBelowAlpha(&mirrorFromVersion, tags, alphaChannelVersion) + versionsAboveMinimal := parseAndFilterVersionsAboveMinimalAndBelowAlpha(&mirrorFromVersion, tags, alphaChannelVersion) versionsAboveMinimal = FilterOnlyLatestPatches(versionsAboveMinimal) vers := make([]*semver.Version, 0, len(releaseChannelsVersions)) @@ -103,7 +103,7 @@ func getReleasedTagsFromRegistry(pullParams *params.PullParams, client registry. return tags, nil } -func parseAndFilterVersionsAboveMinimalAnbBelowAlpha( +func parseAndFilterVersionsAboveMinimalAndBelowAlpha( minVersion *semver.Version, tags []string, alphaChannelVersion *semver.Version, diff --git a/internal/mirror/releases/versions_test.go b/internal/mirror/releases/versions_test.go index 70025af5..cf241a45 100644 --- a/internal/mirror/releases/versions_test.go +++ b/internal/mirror/releases/versions_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestParseAndFilterVersionsAboveMinimalAnbBelowAlpha(t *testing.T) { +func TestParseAndFilterVersionsAboveMinimalAndBelowAlpha(t *testing.T) { minVersion := semver.MustParse("v1.50.0") alphaVersion := semver.MustParse("v1.60.0") @@ -66,7 +66,7 @@ func TestParseAndFilterVersionsAboveMinimalAnbBelowAlpha(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := parseAndFilterVersionsAboveMinimalAnbBelowAlpha(minVersion, tt.tags, alphaVersion) + result := parseAndFilterVersionsAboveMinimalAndBelowAlpha(minVersion, tt.tags, alphaVersion) resultStrs := make([]string, len(result)) for i, v := range result { @@ -184,14 +184,14 @@ func TestDeduplicateVersions(t *testing.T) { } // Benchmark tests -func BenchmarkParseAndFilterVersionsAboveMinimalAnbBelowAlpha(b *testing.B) { +func BenchmarkParseAndFilterVersionsAboveMinimalAndBelowAlpha(b *testing.B) { minVersion := semver.MustParse("v1.50.0") alphaVersion := semver.MustParse("v1.60.0") tags := []string{"v1.49.0", "v1.50.0", "v1.51.0", "v1.52.0", "v1.59.0", "v1.60.0", "v1.61.0"} b.ResetTimer() for i := 0; i < b.N; i++ { - parseAndFilterVersionsAboveMinimalAnbBelowAlpha(minVersion, tags, alphaVersion) + parseAndFilterVersionsAboveMinimalAndBelowAlpha(minVersion, tags, alphaVersion) } } From 1ae966140779859c97e36a3fd3cbbd2eb821cb49 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Thu, 18 Dec 2025 16:29:16 +0300 Subject: [PATCH 04/14] fix tests Signed-off-by: Timur Tuktamyshev --- internal/mirror/cmd/pull/pull.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/mirror/cmd/pull/pull.go b/internal/mirror/cmd/pull/pull.go index 6d40e349..861ea15f 100644 --- a/internal/mirror/cmd/pull/pull.go +++ b/internal/mirror/cmd/pull/pull.go @@ -105,7 +105,11 @@ func NewCommand() *cobra.Command { func pull(cmd *cobra.Command, _ []string) error { // Set up graceful cancellation on Ctrl+C - ctx, cancel := signal.NotifyContext(cmd.Context(), syscall.SIGINT, syscall.SIGTERM) + parentCtx := cmd.Context() + if parentCtx == nil { + parentCtx = context.Background() + } + ctx, cancel := signal.NotifyContext(parentCtx, syscall.SIGINT, syscall.SIGTERM) defer cancel() puller := NewPuller(cmd) From f16bcf5c94253cee0255671a5824d646cde2facb Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Thu, 18 Dec 2025 16:34:15 +0300 Subject: [PATCH 05/14] fix lint Signed-off-by: Timur Tuktamyshev --- internal/mirror/pull.go | 14 +++++++------- internal/mirror/pusher/pusher.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/mirror/pull.go b/internal/mirror/pull.go index 6ca5b5c9..0c9ab239 100644 --- a/internal/mirror/pull.go +++ b/internal/mirror/pull.go @@ -130,23 +130,23 @@ func NewPullService( // Pull downloads Deckhouse components from registry func (svc *PullService) Pull(ctx context.Context) error { if !svc.options.SkipPlatform { - err := svc.platformService.PullPlatform(ctx) - if err != nil { - return fmt.Errorf("pull platform: %w", err) + err := svc.platformService.PullPlatform(ctx) + if err != nil { + return fmt.Errorf("pull platform: %w", err) } } if !svc.options.SkipSecurity { err := svc.securityService.PullSecurity(ctx) - if err != nil { - return fmt.Errorf("pull security databases: %w", err) + if err != nil { + return fmt.Errorf("pull security databases: %w", err) } } if !svc.options.SkipModules || svc.options.OnlyExtraImages { err := svc.modulesService.PullModules(ctx) - if err != nil { - return fmt.Errorf("pull modules: %w", err) + if err != nil { + return fmt.Errorf("pull modules: %w", err) } } diff --git a/internal/mirror/pusher/pusher.go b/internal/mirror/pusher/pusher.go index 8517c81e..a1b76067 100644 --- a/internal/mirror/pusher/pusher.go +++ b/internal/mirror/pusher/pusher.go @@ -110,7 +110,7 @@ func (s *Service) PushLayout(ctx context.Context, layoutPath layout.Path, client } return fmt.Errorf("write %s:%s to registry: %w", client.GetRegistry(), tag, err) } - return nil + return nil }), ) if err != nil { From c176681d68319dbe0def85fa7499e0801b53f9a2 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Fri, 19 Dec 2025 13:27:36 +0300 Subject: [PATCH 06/14] fix: contracts, pull fixes, push fixes Signed-off-by: Timur Tuktamyshev --- internal/data/dataexport/util/logger.go | 1 + internal/layout.go | 38 +- internal/mirror.go | 1 - internal/mirror/cmd/push/push.go | 5 +- internal/mirror/modules/layout.go | 40 ++- internal/mirror/modules/modules.go | 201 ++++++++++- internal/mirror/platform/layout.go | 2 + internal/mirror/platform/platform.go | 9 +- internal/mirror/pull.go | 14 +- internal/mirror/puller/puller.go | 20 ++ internal/mirror/puller/types.go | 4 +- internal/mirror/push.go | 443 ++++++++++++------------ internal/mirror/security/security.go | 6 +- pkg/libmirror/bundle/bundle.go | 17 +- pkg/registry/image/layout.go | 6 +- pkg/registry/service/basic_service.go | 9 + 16 files changed, 537 insertions(+), 279 deletions(-) diff --git a/internal/data/dataexport/util/logger.go b/internal/data/dataexport/util/logger.go index bec14fb7..3cedb63c 100644 --- a/internal/data/dataexport/util/logger.go +++ b/internal/data/dataexport/util/logger.go @@ -1,3 +1,4 @@ +// nolint package util import ( diff --git a/internal/layout.go b/internal/layout.go index 9d8e562d..d8b9eee0 100644 --- a/internal/layout.go +++ b/internal/layout.go @@ -18,22 +18,32 @@ package internal import "path" -// deckhouse repo structure -// root-segment: -// root-segment/install: -// root-segment/install-standalone: -// root-segment/release-channel: -// root-segment/modules/: -// root-segment/modules//releases: -// root-segment/modules//extra/: +// deckhouse repo structure (relative to root path like registry.deckhouse.io/deckhouse/fe) +// +// Platform: +// +// : - Deckhouse main image +// /release-channel: - Release channel metadata +// /install: - Installer image +// /install-standalone: - Standalone installer +// +// Security: +// +// /security/: - Security databases (trivy-db, trivy-bdu, etc.) +// +// Modules: +// +// /modules/: - Module main image +// /modules//release: - Module release channel metadata +// /modules//extra/: - Module extra images const ( InstallSegment = "install" InstallStandaloneSegment = "install-standalone" ReleaseChannelSegment = "release-channel" - ModulesSegment = "module" - ModulesExtraSegment = "extra" - ModulesReleasesSegment = "release" + ModulesSegment = "modules" + ModulesReleaseSegment = "release" + ModulesExtraSegment = "extra" SecuritySegment = "security" @@ -49,9 +59,9 @@ var pathByMirrorType = map[MirrorType]string{ MirrorTypeDeckhouseInstallStandalone: InstallStandaloneSegment, MirrorTypeDeckhouseReleaseChannels: ReleaseChannelSegment, - MirrorTypeModules: ModulesSegment, - MirrorTypeModulesReleaseChannels: ModulesReleasesSegment, - MirrorTypeModulesExtra: ModulesExtraSegment, + // Module paths are relative to modules// directory + MirrorTypeModules: "", // Module main image at root of module dir + MirrorTypeModulesReleaseChannels: ModulesReleaseSegment, // modules//release MirrorTypeSecurity: SecuritySegment, MirrorTypeSecurityTrivyDBSegment: path.Join(SecuritySegment, SecurityTrivyDBSegment), diff --git a/internal/mirror.go b/internal/mirror.go index 456f0c92..3e87f9e8 100644 --- a/internal/mirror.go +++ b/internal/mirror.go @@ -25,7 +25,6 @@ const ( MirrorTypeDeckhouseReleaseChannels MirrorTypeModules MirrorTypeModulesReleaseChannels - MirrorTypeModulesExtra MirrorTypeSecurity MirrorTypeSecurityTrivyDBSegment MirrorTypeSecurityTrivyBDUSegment diff --git a/internal/mirror/cmd/push/push.go b/internal/mirror/cmd/push/push.go index 658a4214..af3a0452 100644 --- a/internal/mirror/cmd/push/push.go +++ b/internal/mirror/cmd/push/push.go @@ -308,7 +308,10 @@ func (p *Pusher) Execute() error { return nil } -// executeNewPush runs the push using the new service architecture +// executeNewPush runs the push using the push service. +// This service expects the bundle to have the exact same structure as the registry: +// - Each OCI layout's relative path becomes its registry segment +// - Works with unified bundles where pull saved the structure as-is func (p *Pusher) executeNewPush() error { // Set up graceful cancellation on Ctrl+C ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) diff --git a/internal/mirror/modules/layout.go b/internal/mirror/modules/layout.go index d818cf58..0f30707e 100644 --- a/internal/mirror/modules/layout.go +++ b/internal/mirror/modules/layout.go @@ -117,15 +117,20 @@ type ImageLayouts struct { platform v1.Platform workingDir string - Modules *regimage.ImageLayout + // Modules is the main module image layout (modules//) + Modules *regimage.ImageLayout + // ModulesReleaseChannels is the release channel layout (modules//release/) ModulesReleaseChannels *regimage.ImageLayout - ModulesExtra *regimage.ImageLayout + // ExtraImages holds layouts for each extra image (modules///) + // Key is the extra image name (e.g., "scanner", "enforcer") + ExtraImages map[string]*regimage.ImageLayout } func NewImageLayouts(rootFolder string) *ImageLayouts { l := &ImageLayouts{ - workingDir: rootFolder, - platform: v1.Platform{Architecture: "amd64", OS: "linux"}, + workingDir: rootFolder, + platform: v1.Platform{Architecture: "amd64", OS: "linux"}, + ExtraImages: make(map[string]*regimage.ImageLayout), } return l @@ -144,8 +149,6 @@ func (l *ImageLayouts) setLayoutByMirrorType(rootFolder string, mirrorType inter l.Modules = layout case internal.MirrorTypeModulesReleaseChannels: l.ModulesReleaseChannels = layout - case internal.MirrorTypeModulesExtra: - l.ModulesExtra = layout default: return fmt.Errorf("wrong mirror type in modules image layout: %v", mirrorType) } @@ -153,6 +156,24 @@ func (l *ImageLayouts) setLayoutByMirrorType(rootFolder string, mirrorType inter return nil } +// GetOrCreateExtraLayout returns or creates a layout for a specific extra image. +// Extra images are stored under: modules//extra// +func (l *ImageLayouts) GetOrCreateExtraLayout(extraName string) (*regimage.ImageLayout, error) { + if existing, ok := l.ExtraImages[extraName]; ok { + return existing, nil + } + + // Create layout at modules//extra// + layoutPath := filepath.Join(l.workingDir, "extra", extraName) + layout, err := regimage.NewImageLayout(layoutPath) + if err != nil { + return nil, fmt.Errorf("create extra image layout for %s: %w", extraName, err) + } + + l.ExtraImages[extraName] = layout + return layout, nil +} + // AsList returns a list of layout.Path's in it. Undefined path's are not included in the list. func (l *ImageLayouts) AsList() []layout.Path { paths := make([]layout.Path, 0) @@ -162,8 +183,11 @@ func (l *ImageLayouts) AsList() []layout.Path { if l.ModulesReleaseChannels != nil { paths = append(paths, l.ModulesReleaseChannels.Path()) } - if l.ModulesExtra != nil { - paths = append(paths, l.ModulesExtra.Path()) + // Add all extra image layouts + for _, extraLayout := range l.ExtraImages { + if extraLayout != nil { + paths = append(paths, extraLayout.Path()) + } } return paths } diff --git a/internal/mirror/modules/modules.go b/internal/mirror/modules/modules.go index ed052684..e893ca68 100644 --- a/internal/mirror/modules/modules.go +++ b/internal/mirror/modules/modules.go @@ -25,6 +25,7 @@ import ( "io" "os" "path/filepath" + "regexp" "strings" "time" @@ -323,26 +324,88 @@ func (svc *Service) pullSingleModule(ctx context.Context, module moduleData) err return fmt.Errorf("pull module images: %w", err) } } + + // Also pull release images with version tags (modules//release:v1.x.x) + // These are in addition to channel tags (alpha, beta, etc.) + if len(moduleVersions) > 0 { + releaseVersionSet := make(map[string]*puller.ImageMeta) + for _, version := range moduleVersions { + releaseVersionSet[svc.rootURL+"/modules/"+module.name+"/release:"+version] = nil + downloadList.ModuleReleaseChannels[svc.rootURL+"/modules/"+module.name+"/release:"+version] = nil + } + + config := puller.PullConfig{ + Name: module.name + " release versions", + ImageSet: releaseVersionSet, + Layout: svc.layout.Module(module.name).ModulesReleaseChannels, + AllowMissingTags: true, + GetterService: svc.modulesService.Module(module.name).ReleaseChannels(), + } + + if err := svc.pullerService.PullImages(ctx, config); err != nil { + svc.logger.Debug(fmt.Sprintf("Failed to pull release version images for %s: %v", module.name, err)) + // Don't fail - version release images may not exist for all versions + } + } + + // Extract and pull internal digest images from module versions (images_digests.json) + // These are internal images that module uses at runtime + digestImages := svc.extractInternalDigestImages(ctx, module.name, moduleVersions) + if len(digestImages) > 0 { + // Add digest images to download list + digestImageSet := make(map[string]*puller.ImageMeta) + for _, digestRef := range digestImages { + digestImageSet[digestRef] = nil + downloadList.Module[digestRef] = nil + } + + config := puller.PullConfig{ + Name: module.name + " internal images", + ImageSet: digestImageSet, + Layout: svc.layout.Module(module.name).Modules, + AllowMissingTags: true, + GetterService: svc.modulesService.Module(module.name), + } + + if err := svc.pullerService.PullImages(ctx, config); err != nil { + svc.logger.Debug(fmt.Sprintf("Failed to pull internal digest images for %s: %v", module.name, err)) + // Don't fail on missing internal images, just log warning + } + } } // Extract and pull extra images from module versions - extraImages := svc.findExtraImages(ctx, module.name, moduleVersions) + // Each extra image gets its own layout: modules//extra// + extraImagesByName := svc.findExtraImages(ctx, module.name, moduleVersions) + + for extraName, images := range extraImagesByName { + if len(images) == 0 { + continue + } - if len(extraImages) > 0 { - for img := range extraImages { - downloadList.ModuleExtra[img] = nil + // Get or create layout for this extra image + extraLayout, err := svc.layout.Module(module.name).GetOrCreateExtraLayout(extraName) + if err != nil { + return fmt.Errorf("create layout for extra image %s: %w", extraName, err) + } + + // Build image set for this extra + imageSet := make(map[string]*puller.ImageMeta) + for _, img := range images { + imageSet[img.FullRef] = nil + downloadList.ModuleExtra[img.FullRef] = nil } config := puller.PullConfig{ - Name: module.name + " extra", - ImageSet: downloadList.ModuleExtra, - Layout: svc.layout.Module(module.name).ModulesExtra, + Name: module.name + "/" + extraName, + ImageSet: imageSet, + Layout: extraLayout, AllowMissingTags: true, - GetterService: svc.modulesService.Module(module.name).Extra(), + GetterService: svc.modulesService.Module(module.name).Extra().WithSegment(extraName), } if err := svc.pullerService.PullImages(ctx, config); err != nil { - return fmt.Errorf("pull extra images: %w", err) + return fmt.Errorf("pull extra image %s: %w", extraName, err) } } @@ -411,9 +474,22 @@ func extractVersionJSON(img interface{ Extract() io.ReadCloser }) (*versionJSON, } } -// findExtraImages finds extra images from module images -func (svc *Service) findExtraImages(ctx context.Context, moduleName string, versions []string) map[string]struct{} { - extraImages := make(map[string]struct{}) +// extraImageInfo holds information about an extra image to pull +type extraImageInfo struct { + // Name is the extra image name (e.g., "scanner", "enforcer") + Name string + // Tag is the image tag + Tag string + // FullRef is the full image reference for pulling + FullRef string +} + +// findExtraImages finds extra images from module images. +// Returns a map where key is extra image name, value is list of image refs to pull. +// Extra images are stored under: modules//extra/: +func (svc *Service) findExtraImages(ctx context.Context, moduleName string, versions []string) map[string][]extraImageInfo { + // Map of extra-name -> list of images to pull + extraImages := make(map[string][]extraImageInfo) for _, version := range versions { // Skip digest references @@ -452,8 +528,14 @@ func (svc *Service) findExtraImages(ctx context.Context, moduleName string, vers continue } + // Extra images go under: modules//extra/: fullImagePath := svc.rootURL + "/modules/" + moduleName + "/extra/" + imageName + ":" + imageTag - extraImages[fullImagePath] = struct{}{} + + extraImages[imageName] = append(extraImages[imageName], extraImageInfo{ + Name: imageName, + Tag: imageTag, + FullRef: fullImagePath, + }) } } @@ -485,6 +567,90 @@ func extractExtraImagesJSON(img interface{ Extract() io.ReadCloser }) (map[strin } } +// digestRegex matches sha256 digests in images_digests.json +var digestRegex = regexp.MustCompile(`sha256:[a-f0-9]{64}`) + +// extractImagesDigestsJSON extracts images_digests.json from module image +// and returns list of sha256 digests. These are internal images that module uses at runtime. +func extractImagesDigestsJSON(img interface{ Extract() io.ReadCloser }) ([]string, error) { + rc := img.Extract() + defer rc.Close() + + tr := tar.NewReader(rc) + for { + hdr, err := tr.Next() + if err == io.EOF { + return nil, fmt.Errorf("images_digests.json not found in image") + } + if err != nil { + return nil, err + } + + if hdr.Name == "images_digests.json" { + data, err := io.ReadAll(tr) + if err != nil { + return nil, fmt.Errorf("read images_digests.json: %w", err) + } + // Extract all sha256:... digests from JSON file + digests := digestRegex.FindAllString(string(data), -1) + return digests, nil + } + } +} + +// extractInternalDigestImages extracts internal digest images from module versions. +// It reads images_digests.json from each module version image and returns +// list of image references in format "repo@sha256:..." which will be pulled +// and stored with tag = hex part of digest. +func (svc *Service) extractInternalDigestImages(ctx context.Context, moduleName string, versions []string) []string { + seenDigests := make(map[string]struct{}) + var digestRefs []string + + moduleRepo := svc.rootURL + "/modules/" + moduleName + + for _, version := range versions { + // Skip digest references + if strings.Contains(version, "@sha256:") { + continue + } + + tag := version + if strings.Contains(version, ":") { + parts := strings.SplitN(version, ":", 2) + tag = parts[1] + } + + img, err := svc.modulesService.Module(moduleName).GetImage(ctx, tag) + if err != nil { + svc.logger.Debug(fmt.Sprintf("Failed to get module image %s:%s for digest extraction: %v", moduleName, tag, err)) + continue + } + + // Extract images_digests.json + digests, err := extractImagesDigestsJSON(img) + if err != nil { + svc.logger.Debug(fmt.Sprintf("No images_digests.json in %s:%s: %v", moduleName, tag, err)) + continue + } + + svc.logger.Debug(fmt.Sprintf("Found %d internal digests in %s:%s", len(digests), moduleName, tag)) + + for _, digest := range digests { + if _, seen := seenDigests[digest]; seen { + continue + } + seenDigests[digest] = struct{}{} + + // Create reference in format repo@sha256:... + // When pulled, the tag will be the hex part (after last ":") + digestRef := moduleRepo + "@" + digest + digestRefs = append(digestRefs, digestRef) + } + } + + return digestRefs +} + // pullVexImages finds and pulls VEX attestation images for module images func (svc *Service) pullVexImages(ctx context.Context, moduleName string, downloadList *ImageDownloadList) { allImages := make([]string, 0) @@ -597,9 +763,11 @@ func (svc *Service) packModules(modules []moduleData) error { pkg = f } - // Pack from the module's working directory + // Pack from the module's working directory with prefix to create correct registry structure. + // This ensures the tar contains paths like "modules//index.json" instead of just "index.json". moduleDir := filepath.Join(svc.layout.workingDir, module.name) - if err := bundle.Pack(context.Background(), moduleDir, pkg); err != nil { + tarPrefix := filepath.Join("modules", module.name) + if err := bundle.PackWithPrefix(context.Background(), moduleDir, tarPrefix, pkg); err != nil { return fmt.Errorf("pack module %s: %w", pkgName, err) } @@ -656,10 +824,11 @@ func createOCIImageLayoutsForModule( ) (*ImageLayouts, error) { layouts := NewImageLayouts(rootFolder) + // Only create layouts for main module and release channels. + // Extra image layouts are created dynamically when extra images are discovered. mirrorTypes := []internal.MirrorType{ internal.MirrorTypeModules, internal.MirrorTypeModulesReleaseChannels, - internal.MirrorTypeModulesExtra, } for _, mtype := range mirrorTypes { diff --git a/internal/mirror/platform/layout.go b/internal/mirror/platform/layout.go index 00907045..36f7618c 100644 --- a/internal/mirror/platform/layout.go +++ b/internal/mirror/platform/layout.go @@ -56,6 +56,8 @@ func (l *ImageDownloadList) FillDeckhouseImages(deckhouseVersions []string) { l.Deckhouse[l.rootURL+":"+version] = nil l.DeckhouseInstall[path.Join(l.rootURL, internal.InstallSegment)+":"+version] = nil l.DeckhouseInstallStandalone[path.Join(l.rootURL, internal.InstallStandaloneSegment)+":"+version] = nil + // Also add version tags to release-channel (e.g., release-channel:v1.74.0) + l.DeckhouseReleaseChannel[path.Join(l.rootURL, internal.ReleaseChannelSegment)+":"+version] = nil } } diff --git a/internal/mirror/platform/platform.go b/internal/mirror/platform/platform.go index dd4ed42f..2cfca9b6 100644 --- a/internal/mirror/platform/platform.go +++ b/internal/mirror/platform/platform.go @@ -301,13 +301,8 @@ func (svc *Service) getReleaseChannelVersionFromRegistry(ctx context.Context, re svc.userLogger.Debugf("image reference: %s@%s", imageMeta, digest.String()) - // Use just the channel name (e.g., "alpha") as the tag for the layout, not the full reference - err = svc.layout.DeckhouseReleaseChannel.AddImage(image, releaseChannel) - if err != nil { - return nil, fmt.Errorf("append %s release channel image to layout: %w", releaseChannel, err) - } - - // But use full reference for internal tracking + // Don't add to layout here - pullDeckhouseReleaseChannels will add it + // Just record in downloadList for later pull svc.downloadList.DeckhouseReleaseChannel[imageMeta.GetTagReference()] = puller.NewImageMeta(meta.Version, imageMeta.GetTagReference(), &digest) return ver, nil diff --git a/internal/mirror/pull.go b/internal/mirror/pull.go index 0c9ab239..6ca5b5c9 100644 --- a/internal/mirror/pull.go +++ b/internal/mirror/pull.go @@ -130,23 +130,23 @@ func NewPullService( // Pull downloads Deckhouse components from registry func (svc *PullService) Pull(ctx context.Context) error { if !svc.options.SkipPlatform { - err := svc.platformService.PullPlatform(ctx) - if err != nil { - return fmt.Errorf("pull platform: %w", err) + err := svc.platformService.PullPlatform(ctx) + if err != nil { + return fmt.Errorf("pull platform: %w", err) } } if !svc.options.SkipSecurity { err := svc.securityService.PullSecurity(ctx) - if err != nil { - return fmt.Errorf("pull security databases: %w", err) + if err != nil { + return fmt.Errorf("pull security databases: %w", err) } } if !svc.options.SkipModules || svc.options.OnlyExtraImages { err := svc.modulesService.PullModules(ctx) - if err != nil { - return fmt.Errorf("pull modules: %w", err) + if err != nil { + return fmt.Errorf("pull modules: %w", err) } } diff --git a/internal/mirror/puller/puller.go b/internal/mirror/puller/puller.go index f85b8b00..b0c284fb 100644 --- a/internal/mirror/puller/puller.go +++ b/internal/mirror/puller/puller.go @@ -19,8 +19,11 @@ package puller import ( "context" "fmt" + "strings" "time" + v1 "github.com/google/go-containerregistry/pkg/v1" + dkplog "github.com/deckhouse/deckhouse/pkg/log" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" @@ -58,6 +61,23 @@ func (ps *PullerService) PullImages(ctx context.Context, config PullConfig) erro _, tag := SplitImageRefByRepoAndTag(image) + // Check if this is a digest reference (repo@sha256:abc...) + // For digest references, we already know the digest - it's in the reference itself + if strings.Contains(image, "@sha256:") { + // Extract digest from reference + digestStr := image[strings.Index(image, "@sha256:")+1:] // "sha256:abc..." + digest, err := v1.NewHash(digestStr) + if err != nil { + ps.userLogger.Debugf("failed to parse digest from %s: %v", image, err) + if config.AllowMissingTags { + continue + } + return fmt.Errorf("parse digest from reference %s: %w", image, err) + } + config.ImageSet[image] = NewImageMeta(tag, image, &digest) + continue + } + digest, err := config.GetterService.GetDigest(ctx, tag) if err != nil { if config.AllowMissingTags { diff --git a/internal/mirror/puller/types.go b/internal/mirror/puller/types.go index f5f4b974..bb6ddf6b 100644 --- a/internal/mirror/puller/types.go +++ b/internal/mirror/puller/types.go @@ -65,6 +65,7 @@ func NewImageMeta(version string, tagReference string, digest *v1.Hash) *ImageMe } // SplitImageRefByRepoAndTag splits an image reference into repository and tag parts +// For digest references (repo@sha256:abc), returns just the hex part as tag func SplitImageRefByRepoAndTag(imageReferenceString string) (string, string) { splitIndex := strings.LastIndex(imageReferenceString, ":") repo := imageReferenceString[:splitIndex] @@ -72,7 +73,8 @@ func SplitImageRefByRepoAndTag(imageReferenceString string) (string, string) { if strings.HasSuffix(repo, "@sha256") { repo = strings.TrimSuffix(repo, "@sha256") - tag = "@sha256:" + tag + // Return just the hex digest without @sha256: prefix + // This makes it a valid registry tag } return repo, tag diff --git a/internal/mirror/push.go b/internal/mirror/push.go index 83bd4be5..37fc1e23 100644 --- a/internal/mirror/push.go +++ b/internal/mirror/push.go @@ -19,6 +19,7 @@ package mirror import ( "context" "fmt" + "io" "io/fs" "log/slog" "os" @@ -27,21 +28,20 @@ import ( "strings" "github.com/google/go-containerregistry/pkg/v1/layout" + "github.com/google/go-containerregistry/pkg/v1/random" dkplog "github.com/deckhouse/deckhouse/pkg/log" "github.com/deckhouse/deckhouse/pkg/registry" + "github.com/deckhouse/deckhouse-cli/internal" + "github.com/deckhouse/deckhouse-cli/internal/mirror/chunked" "github.com/deckhouse/deckhouse-cli/internal/mirror/pusher" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/bundle" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" ) const ( - defaultDirPermissions = 0755 - - platformPackage = "platform" - securityPackage = "security" - modulesPrefix = "module-" + dirPermissions = 0755 ) // PushServiceOptions contains configuration options for PushService @@ -52,7 +52,30 @@ type PushServiceOptions struct { WorkingDir string } -// PushService orchestrates pushing Deckhouse components to registry +// PushService handles pushing OCI layouts to registry. +// It treats the layout structure as the source of truth - the relative path of each layout +// becomes the registry segment directly. +// +// Expected layout structure (after unpack): +// +// / +// ├── index.json # Deckhouse main images +// ├── blobs/ +// ├── install/ # Deckhouse Install +// │ ├── index.json +// │ └── blobs/ +// ├── install-standalone/ # Deckhouse Standalone Install +// ├── release-channel/ # Deckhouse release channels +// ├── security/ # Security databases +// │ ├── trivy-db/ +// │ ├── trivy-bdu/ +// │ ├── trivy-java-db/ +// │ └── trivy-checks/ +// └── modules/ # Modules +// └── / +// ├── index.json +// ├── release/ +// └── / type PushService struct { client registry.Client options *PushServiceOptions @@ -81,314 +104,296 @@ func NewPushService( } } -// Push uploads Deckhouse components to registry -// It unpacks all packages into a unified directory structure and pushes -// each OCI layout based on its path (path = registry segment) +// Push uploads all OCI layouts from the bundle to the registry. +// It unpacks all packages into a unified directory and pushes each layout +// using its relative path as the registry segment. +// +// The key principle: no path transformations. Whatever path the layout has +// in the unpacked directory becomes its path in the registry. func (svc *PushService) Push(ctx context.Context) error { // Create unified directory for unpacking - unifiedDir := filepath.Join(svc.options.WorkingDir, "unified") - if err := os.MkdirAll(unifiedDir, defaultDirPermissions); err != nil { + dirPath := filepath.Join(svc.options.WorkingDir, "unified") + if err := os.MkdirAll(dirPath, dirPermissions); err != nil { return fmt.Errorf("create unified directory: %w", err) } defer func() { - if err := os.RemoveAll(unifiedDir); err != nil { + if err := os.RemoveAll(dirPath); err != nil { svc.logger.Warn("Failed to cleanup unified directory", - slog.String("path", unifiedDir), + slog.String("path", dirPath), slog.Any("error", err)) } }() // Unpack all packages into unified structure - if err := svc.unpackPlatform(ctx, unifiedDir); err != nil { - return fmt.Errorf("unpack platform: %w", err) - } - - if err := svc.unpackSecurity(ctx, unifiedDir); err != nil { - return fmt.Errorf("unpack security: %w", err) + if err := svc.unpackAllPackages(ctx, dirPath); err != nil { + return fmt.Errorf("unpack packages: %w", err) } - if err := svc.unpackModules(ctx, unifiedDir); err != nil { - return fmt.Errorf("unpack modules: %w", err) + // Push all layouts recursively + if err := svc.userLogger.Process("Push to registry", func() error { + return svc.pushAllLayouts(ctx, dirPath) + }); err != nil { + return err } - // Push all layouts recursively - return svc.userLogger.Process("Push to registry", func() error { - return svc.pushDir(ctx, unifiedDir, svc.client) + // Create modules index (deckhouse/modules: tags for discovery) + return svc.userLogger.Process("Create modules index", func() error { + return svc.createModulesIndex(ctx, dirPath) }) } -// pushDir recursively walks directory and pushes each OCI layout -// The relative path from root becomes the registry segment -func (svc *PushService) pushDir(ctx context.Context, rootDir string, client registry.Client) error { - var layouts []string - - // First, collect all layouts - err := filepath.WalkDir(rootDir, func(path string, d fs.DirEntry, err error) error { - if err != nil { - return err - } - if d.IsDir() || d.Name() != "index.json" { - return nil - } - layouts = append(layouts, filepath.Dir(path)) - return nil - }) +// unpackAllPackages unpacks all tar packages from bundle directory into unified directory. +// All packages are unpacked to the same root - the structure inside each tar +// should already have the correct paths. +func (svc *PushService) unpackAllPackages(ctx context.Context, dirPath string) error { + entries, err := os.ReadDir(svc.options.BundleDir) if err != nil { - return fmt.Errorf("scan layouts: %w", err) + return fmt.Errorf("read bundle dir: %w", err) } - if len(layouts) == 0 { - svc.userLogger.InfoLn("No layouts to push") - return nil + packages := svc.findPackages(entries) + if len(packages) == 0 { + return fmt.Errorf("no packages found in bundle directory") } - // Sort for predictable output - slices.Sort(layouts) + svc.userLogger.Infof("Found %d packages to unpack", len(packages)) - svc.userLogger.Infof("Found %d layouts to push", len(layouts)) - - // Push each layout - for _, layoutDir := range layouts { - // Skip empty layouts (no images) - hasImages, err := svc.layoutHasImages(layoutDir) - if err != nil { - svc.logger.Warn("Failed to check layout", slog.String("path", layoutDir), slog.Any("error", err)) - continue - } - if !hasImages { - continue + for _, pkgName := range packages { + if err := svc.unpackPackage(ctx, dirPath, pkgName); err != nil { + // Log warning but continue with other packages + svc.userLogger.Warnf("Failed to unpack %s: %v", pkgName, err) } + } - relPath, _ := filepath.Rel(rootDir, layoutDir) - - // Build registry segment from relative path - segment := "" - if relPath != "." { - segment = relPath - } + return nil +} - targetClient := client - if segment != "" { - // WithSegment expects single segment, but we have path like "modules/virtualization" - // Split and apply each segment - for _, seg := range strings.Split(segment, string(os.PathSeparator)) { - targetClient = targetClient.WithSegment(seg) - } - } +// findPackages finds all package names (without .tar extension) in the bundle directory. +// It handles both regular .tar files and chunked packages (.tar.chunk000). +func (svc *PushService) findPackages(entries []os.DirEntry) []string { + packagesSet := make(map[string]struct{}) - svc.userLogger.Infof("Pushing %s", targetClient.GetRegistry()) + for _, entry := range entries { + name := entry.Name() - if err := svc.pusher.PushLayout(ctx, layout.Path(layoutDir), targetClient); err != nil { - return fmt.Errorf("push layout %q: %w", relPath, err) + // Handle regular tar files + if strings.HasSuffix(name, ".tar") { + pkgName := strings.TrimSuffix(name, ".tar") + packagesSet[pkgName] = struct{}{} + continue } - } - return nil -} - -// layoutHasImages checks if an OCI layout has any images to push -func (svc *PushService) layoutHasImages(layoutDir string) (bool, error) { - layoutPath := layout.Path(layoutDir) - index, err := layoutPath.ImageIndex() - if err != nil { - return false, fmt.Errorf("read index: %w", err) + // Handle chunked files (e.g., "platform.tar.chunk000") + if idx := strings.Index(name, ".tar.chunk"); idx != -1 { + pkgName := name[:idx] + packagesSet[pkgName] = struct{}{} + } } - indexManifest, err := index.IndexManifest() - if err != nil { - return false, fmt.Errorf("parse index manifest: %w", err) + packages := make([]string, 0, len(packagesSet)) + for pkg := range packagesSet { + packages = append(packages, pkg) } + slices.Sort(packages) - return len(indexManifest.Manifests) > 0, nil + return packages } -// unpackPlatform unpacks platform.tar to root of unified directory -// platform.tar contains: index.json, install/, install-standalone/, release-channel/ -func (svc *PushService) unpackPlatform(ctx context.Context, unifiedDir string) error { - if !svc.pusher.PackageExists(svc.options.BundleDir, platformPackage) { - svc.userLogger.InfoLn("Platform package not found, skipping") - return nil - } - - return svc.userLogger.Process("Unpack platform", func() error { - pkg, err := svc.pusher.OpenPackage(svc.options.BundleDir, platformPackage) +// unpackPackage unpacks a single package to the unified directory. +func (svc *PushService) unpackPackage(ctx context.Context, dirPath, pkgName string) error { + return svc.userLogger.Process(fmt.Sprintf("Unpack %s", pkgName), func() error { + pkg, err := svc.openPackage(pkgName) if err != nil { return fmt.Errorf("open package: %w", err) } defer pkg.Close() - // Platform unpacks directly to root - if err := bundle.Unpack(ctx, pkg, unifiedDir); err != nil { + // Unpack directly to unified directory - no path transformations + if err := bundle.Unpack(ctx, pkg, dirPath); err != nil { return fmt.Errorf("unpack: %w", err) } + return nil }) } -// unpackSecurity unpacks security.tar to unified directory -// security.tar contains: security/trivy-db/, security/trivy-java-db/, etc. -// These paths already include "security/" prefix, so unpack to root -func (svc *PushService) unpackSecurity(ctx context.Context, unifiedDir string) error { - if !svc.pusher.PackageExists(svc.options.BundleDir, securityPackage) { - svc.userLogger.InfoLn("Security package not found, skipping") - return nil +// openPackage opens a package file, trying .tar first, then chunked format. +func (svc *PushService) openPackage(pkgName string) (io.ReadCloser, error) { + tarPath := filepath.Join(svc.options.BundleDir, pkgName+".tar") + + pkg, err := os.Open(tarPath) + if err == nil { + return pkg, nil } - return svc.userLogger.Process("Unpack security", func() error { - pkg, err := svc.pusher.OpenPackage(svc.options.BundleDir, securityPackage) - if err != nil { - return fmt.Errorf("open package: %w", err) - } - defer pkg.Close() + if !os.IsNotExist(err) { + return nil, fmt.Errorf("open %s: %w", tarPath, err) + } - // Security tar already has security/ prefix inside - if err := bundle.Unpack(ctx, pkg, unifiedDir); err != nil { - return fmt.Errorf("unpack: %w", err) - } - return nil - }) + // Try chunked format + return chunked.Open(svc.options.BundleDir, pkgName+".tar") } -// unpackModules unpacks all module-*.tar files -// Each module tar contains: module/, release/, extra/ -// We need to transform paths: -// - module/ -> modules/{name}/ -// - release/ -> modules/{name}/release/ -// - extra/ -> modules/{name}/extra/ -func (svc *PushService) unpackModules(ctx context.Context, unifiedDir string) error { - // Find all module packages - modulePackages, err := svc.findModulePackages() +// pushAllLayouts recursively walks the directory and pushes each OCI layout found. +// The relative path from root becomes the registry segment. +func (svc *PushService) pushAllLayouts(ctx context.Context, rootDir string) error { + layouts, err := svc.findLayouts(rootDir) if err != nil { - return fmt.Errorf("find module packages: %w", err) + return fmt.Errorf("scan layouts: %w", err) } - if len(modulePackages) == 0 { - svc.userLogger.InfoLn("No module packages found, skipping") + if len(layouts) == 0 { + svc.userLogger.InfoLn("No layouts to push") return nil } - slices.Sort(modulePackages) - svc.userLogger.Infof("Found %d module packages", len(modulePackages)) + svc.userLogger.Infof("Found %d layouts to push", len(layouts)) - for _, moduleName := range modulePackages { - if err := svc.unpackModule(ctx, unifiedDir, moduleName); err != nil { - return fmt.Errorf("unpack module %s: %w", moduleName, err) + for _, layoutDir := range layouts { + if err := ctx.Err(); err != nil { + return err + } + + if err := svc.pushSingleLayout(ctx, rootDir, layoutDir); err != nil { + return err } } return nil } -func (svc *PushService) unpackModule(ctx context.Context, unifiedDir, moduleName string) error { - packageName := modulesPrefix + moduleName +// findLayouts finds all OCI layouts in the directory by looking for index.json files. +func (svc *PushService) findLayouts(rootDir string) ([]string, error) { + var layouts []string - return svc.userLogger.Process(fmt.Sprintf("Unpack module %s", moduleName), func() error { - pkg, err := svc.pusher.OpenPackage(svc.options.BundleDir, packageName) + err := filepath.WalkDir(rootDir, func(path string, d fs.DirEntry, err error) error { if err != nil { - return fmt.Errorf("open package: %w", err) + return err } - defer pkg.Close() - - // Unpack to temp directory first - tempDir := filepath.Join(svc.options.WorkingDir, "temp-"+moduleName) - if err := os.MkdirAll(tempDir, defaultDirPermissions); err != nil { - return fmt.Errorf("create temp dir: %w", err) + if d.IsDir() || d.Name() != "index.json" { + return nil } - defer os.RemoveAll(tempDir) + layouts = append(layouts, filepath.Dir(path)) + return nil + }) + if err != nil { + return nil, err + } - if err := bundle.Unpack(ctx, pkg, tempDir); err != nil { - return fmt.Errorf("unpack: %w", err) - } + slices.Sort(layouts) + return layouts, nil +} - // Transform paths and move to unified directory - // module/ -> modules/{name}/ - // release/ -> modules/{name}/release/ - // extra/ -> modules/{name}/extra/ - targetDir := filepath.Join(unifiedDir, "modules", moduleName) - if err := os.MkdirAll(targetDir, defaultDirPermissions); err != nil { - return fmt.Errorf("create target dir: %w", err) - } +// pushSingleLayout pushes a single OCI layout to the registry. +func (svc *PushService) pushSingleLayout(ctx context.Context, rootDir, layoutDir string) error { + // Check if layout has any images + hasImages, err := svc.layoutHasImages(layoutDir) + if err != nil { + svc.logger.Warn("Failed to check layout", + slog.String("path", layoutDir), + slog.Any("error", err)) + return nil + } + if !hasImages { + return nil + } - // Move module/ contents to modules/{name}/ - moduleDir := filepath.Join(tempDir, "module") - if _, err := os.Stat(moduleDir); err == nil { - if err := copyDir(moduleDir, targetDir); err != nil { - return fmt.Errorf("copy module layout: %w", err) - } - } + // Build registry segment from relative path + relPath, _ := filepath.Rel(rootDir, layoutDir) + segment := "" + if relPath != "." { + segment = relPath + } - // Move release/ to modules/{name}/release/ - releaseDir := filepath.Join(tempDir, "release") - if _, err := os.Stat(releaseDir); err == nil { - targetRelease := filepath.Join(targetDir, "release") - if err := copyDir(releaseDir, targetRelease); err != nil { - return fmt.Errorf("copy release layout: %w", err) - } + // Create client with appropriate segments + targetClient := svc.client + if segment != "" { + // Apply each path component as a segment + for _, seg := range strings.Split(segment, string(os.PathSeparator)) { + targetClient = targetClient.WithSegment(seg) } + } - // Move extra/ to modules/{name}/extra/ - extraDir := filepath.Join(tempDir, "extra") - if _, err := os.Stat(extraDir); err == nil { - targetExtra := filepath.Join(targetDir, "extra") - if err := copyDir(extraDir, targetExtra); err != nil { - return fmt.Errorf("copy extra layout: %w", err) - } - } + svc.userLogger.Infof("Pushing %s", targetClient.GetRegistry()) - return nil - }) + if err := svc.pusher.PushLayout(ctx, layout.Path(layoutDir), targetClient); err != nil { + return fmt.Errorf("push layout %q: %w", relPath, err) + } + + return nil } -func (svc *PushService) findModulePackages() ([]string, error) { - entries, err := os.ReadDir(svc.options.BundleDir) +// layoutHasImages checks if an OCI layout has any images to push. +func (svc *PushService) layoutHasImages(layoutDir string) (bool, error) { + layoutPath := layout.Path(layoutDir) + index, err := layoutPath.ImageIndex() if err != nil { - return nil, fmt.Errorf("read bundle dir: %w", err) + return false, fmt.Errorf("read index: %w", err) } - modules := make([]string, 0, len(entries)) - for _, entry := range entries { - name := entry.Name() - if !strings.HasPrefix(name, modulesPrefix) { - continue - } + indexManifest, err := index.IndexManifest() + if err != nil { + return false, fmt.Errorf("parse index manifest: %w", err) + } - // Extract module name: "module-virtualization.tar" -> "virtualization" - moduleName := strings.TrimPrefix(name, modulesPrefix) - moduleName = strings.TrimSuffix(moduleName, ".tar") + return len(indexManifest.Manifests) > 0, nil +} - // Handle chunked files: "module-virtualization.tar.chunk000" -> "virtualization" - if idx := strings.Index(moduleName, ".tar.chunk"); idx != -1 { - moduleName = moduleName[:idx] +// createModulesIndex creates the modules index in the registry. +// This pushes a small random image for each module with tag = module name +// to deckhouse/modules repo, enabling module discovery via ListTags. +func (svc *PushService) createModulesIndex(ctx context.Context, rootDir string) error { + modulesDir := filepath.Join(rootDir, internal.ModulesSegment) + + // Check if modules directory exists + entries, err := os.ReadDir(modulesDir) + if err != nil { + if os.IsNotExist(err) { + svc.userLogger.InfoLn("No modules directory found, skipping modules index") + return nil } + return fmt.Errorf("read modules directory: %w", err) + } - modules = append(modules, moduleName) + // Find all module directories + var moduleNames []string + for _, entry := range entries { + if entry.IsDir() { + moduleNames = append(moduleNames, entry.Name()) + } } - // Deduplicate (in case of chunked files) - slices.Sort(modules) - return slices.Compact(modules), nil -} + if len(moduleNames) == 0 { + svc.userLogger.InfoLn("No modules found, skipping modules index") + return nil + } -// copyDir copies directory contents recursively -func copyDir(src, dst string) error { - return filepath.WalkDir(src, func(path string, d fs.DirEntry, err error) error { - if err != nil { + slices.Sort(moduleNames) + svc.userLogger.Infof("Creating modules index with %d modules", len(moduleNames)) + + // Get client scoped to modules repo + modulesClient := svc.client.WithSegment(internal.ModulesSegment) + + // Push a small random image for each module with tag = module name + for _, moduleName := range moduleNames { + if err := ctx.Err(); err != nil { return err } - relPath, _ := filepath.Rel(src, path) - targetPath := filepath.Join(dst, relPath) + svc.userLogger.Infof("Creating index tag: %s:%s", modulesClient.GetRegistry(), moduleName) - if d.IsDir() { - return os.MkdirAll(targetPath, defaultDirPermissions) + // Create minimal random image (32 bytes, 1 layer) + img, err := random.Image(32, 1) + if err != nil { + return fmt.Errorf("create random image for module %s: %w", moduleName, err) } - // Copy file - data, err := os.ReadFile(path) - if err != nil { - return fmt.Errorf("read %s: %w", path, err) + // Push with module name as tag + if err := modulesClient.PushImage(ctx, moduleName, img); err != nil { + return fmt.Errorf("push module index tag %s: %w", moduleName, err) } + } - return os.WriteFile(targetPath, data, 0644) - }) + svc.userLogger.Infof("Modules index created successfully") + return nil } diff --git a/internal/mirror/security/security.go b/internal/mirror/security/security.go index 6defd517..ab174dff 100644 --- a/internal/mirror/security/security.go +++ b/internal/mirror/security/security.go @@ -77,9 +77,9 @@ func NewService( options = &Options{} } - tmpDir := filepath.Join(workingDir, "security") - - layout, err := createOCIImageLayoutsForSecurity(tmpDir) + // workingDir is the root where we create layouts + // Layouts will be created at workingDir/security/trivy-db, etc. + layout, err := createOCIImageLayoutsForSecurity(workingDir) if err != nil { //TODO: handle error userLogger.Warnf("Create OCI Image Layouts: %v", err) diff --git a/pkg/libmirror/bundle/bundle.go b/pkg/libmirror/bundle/bundle.go index 8b409f03..f0ed3225 100644 --- a/pkg/libmirror/bundle/bundle.go +++ b/pkg/libmirror/bundle/bundle.go @@ -71,8 +71,15 @@ func Unpack(ctx context.Context, source io.Reader, targetPath string) error { } func Pack(ctx context.Context, sourcePath string, sink io.Writer) error { + return PackWithPrefix(ctx, sourcePath, "", sink) +} + +// PackWithPrefix packs directory contents into tar with an optional prefix for all paths. +// For example, PackWithPrefix(ctx, "/tmp/module", "modules/stronghold", sink) will create +// tar entries like "modules/stronghold/index.json" instead of just "index.json". +func PackWithPrefix(ctx context.Context, sourcePath string, prefix string, sink io.Writer) error { tarWriter := tar.NewWriter(sink) - if err := filepath.Walk(sourcePath, packFunc(ctx, sourcePath, tarWriter)); err != nil { + if err := filepath.Walk(sourcePath, packFuncWithPrefix(ctx, sourcePath, prefix, tarWriter)); err != nil { return fmt.Errorf("pack mirrored images into tar: %w", err) } @@ -84,6 +91,10 @@ func Pack(ctx context.Context, sourcePath string, sink io.Writer) error { } func packFunc(ctx context.Context, pathPrefix string, writer *tar.Writer) filepath.WalkFunc { + return packFuncWithPrefix(ctx, pathPrefix, "", writer) +} + +func packFuncWithPrefix(ctx context.Context, pathPrefix string, tarPrefix string, writer *tar.Writer) filepath.WalkFunc { unixEpochStart := time.Unix(0, 0) return func(path string, info fs.FileInfo, err error) error { if ctx.Err() != nil { @@ -102,6 +113,10 @@ func packFunc(ctx context.Context, pathPrefix string, writer *tar.Writer) filepa } pathInTar := strings.TrimPrefix(path, pathPrefix+string(os.PathSeparator)) + // Add prefix if specified + if tarPrefix != "" { + pathInTar = tarPrefix + "/" + pathInTar + } err = writer.WriteHeader(&tar.Header{ Typeflag: tar.TypeReg, Format: tar.FormatGNU, diff --git a/pkg/registry/image/layout.go b/pkg/registry/image/layout.go index cb3203b6..0838b837 100644 --- a/pkg/registry/image/layout.go +++ b/pkg/registry/image/layout.go @@ -98,12 +98,16 @@ func (l *ImageLayout) Path() layout.Path { } func (l *ImageLayout) AddImage(img pkg.RegistryImage, tag string) error { + // Skip if tag already exists to prevent duplicates + if _, exists := l.metaByTag[tag]; exists { + return nil + } + meta, err := img.GetMetadata() if err != nil { return fmt.Errorf("get image tag reference: %w", err) } - // TODO: support nesting tags in image l.metaByTag[tag] = meta.(*ImageMeta) err = l.wrapped.AppendImage(img, diff --git a/pkg/registry/service/basic_service.go b/pkg/registry/service/basic_service.go index d03b1ce0..8fdc3a8f 100644 --- a/pkg/registry/service/basic_service.go +++ b/pkg/registry/service/basic_service.go @@ -132,3 +132,12 @@ func (s *BasicService) ListTags(ctx context.Context) ([]string, error) { return tags, nil } + +// WithSegment returns a new BasicService scoped to the given segment +func (s *BasicService) WithSegment(segment string) *BasicService { + return NewBasicService( + s.name+"/"+segment, + s.client.WithSegment(segment), + s.logger, + ) +} From 3072d13d47f60265017e49624522dccdccb91ecd Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Fri, 19 Dec 2025 15:12:11 +0300 Subject: [PATCH 07/14] fix: move WithSegment from service layer Signed-off-by: Timur Tuktamyshev --- internal/mirror/modules/layout.go | 2 +- internal/mirror/modules/modules.go | 21 ++++++++++++++++++++- internal/mirror/puller/types.go | 6 +----- pkg/registry/service/basic_service.go | 9 --------- pkg/registry/service/module_service.go | 16 ++++++++++++++++ 5 files changed, 38 insertions(+), 16 deletions(-) diff --git a/internal/mirror/modules/layout.go b/internal/mirror/modules/layout.go index 0f30707e..2fc853a4 100644 --- a/internal/mirror/modules/layout.go +++ b/internal/mirror/modules/layout.go @@ -121,7 +121,7 @@ type ImageLayouts struct { Modules *regimage.ImageLayout // ModulesReleaseChannels is the release channel layout (modules//release/) ModulesReleaseChannels *regimage.ImageLayout - // ExtraImages holds layouts for each extra image (modules///) + // ExtraImages holds layouts for each extra image (modules//extra//) // Key is the extra image name (e.g., "scanner", "enforcer") ExtraImages map[string]*regimage.ImageLayout } diff --git a/internal/mirror/modules/modules.go b/internal/mirror/modules/modules.go index e893ca68..f6dfc5f0 100644 --- a/internal/mirror/modules/modules.go +++ b/internal/mirror/modules/modules.go @@ -401,7 +401,7 @@ func (svc *Service) pullSingleModule(ctx context.Context, module moduleData) err ImageSet: imageSet, Layout: extraLayout, AllowMissingTags: true, - GetterService: svc.modulesService.Module(module.name).Extra().WithSegment(extraName), + GetterService: svc.modulesService.Module(module.name).ExtraImage(extraName), } if err := svc.pullerService.PullImages(ctx, config); err != nil { @@ -662,6 +662,8 @@ func (svc *Service) pullVexImages(ctx context.Context, moduleName string, downlo allImages = append(allImages, img) } + // Find VEX images and add to a separate set for pulling + vexImageSet := make(map[string]*puller.ImageMeta) for _, img := range allImages { vexImageName, err := svc.findVexImage(ctx, moduleName, img) if err != nil { @@ -670,9 +672,26 @@ func (svc *Service) pullVexImages(ctx context.Context, moduleName string, downlo } if vexImageName != "" { svc.logger.Debug(fmt.Sprintf("Found VEX image: %s", vexImageName)) + vexImageSet[vexImageName] = nil downloadList.Module[vexImageName] = nil } } + + // Pull VEX images if any found + if len(vexImageSet) > 0 { + config := puller.PullConfig{ + Name: moduleName + " VEX images", + ImageSet: vexImageSet, + Layout: svc.layout.Module(moduleName).Modules, + AllowMissingTags: true, // VEX images may not exist + GetterService: svc.modulesService.Module(moduleName), + } + + if err := svc.pullerService.PullImages(ctx, config); err != nil { + svc.logger.Debug(fmt.Sprintf("Failed to pull VEX images for %s: %v", moduleName, err)) + // Don't fail on VEX image pull errors + } + } } // findVexImage checks if a VEX attestation image exists for the given image diff --git a/internal/mirror/puller/types.go b/internal/mirror/puller/types.go index bb6ddf6b..42eaf880 100644 --- a/internal/mirror/puller/types.go +++ b/internal/mirror/puller/types.go @@ -71,11 +71,7 @@ func SplitImageRefByRepoAndTag(imageReferenceString string) (string, string) { repo := imageReferenceString[:splitIndex] tag := imageReferenceString[splitIndex+1:] - if strings.HasSuffix(repo, "@sha256") { - repo = strings.TrimSuffix(repo, "@sha256") - // Return just the hex digest without @sha256: prefix - // This makes it a valid registry tag - } + repo = strings.TrimSuffix(repo, "@sha256") return repo, tag } diff --git a/pkg/registry/service/basic_service.go b/pkg/registry/service/basic_service.go index 8fdc3a8f..d03b1ce0 100644 --- a/pkg/registry/service/basic_service.go +++ b/pkg/registry/service/basic_service.go @@ -132,12 +132,3 @@ func (s *BasicService) ListTags(ctx context.Context) ([]string, error) { return tags, nil } - -// WithSegment returns a new BasicService scoped to the given segment -func (s *BasicService) WithSegment(segment string) *BasicService { - return NewBasicService( - s.name+"/"+segment, - s.client.WithSegment(segment), - s.logger, - ) -} diff --git a/pkg/registry/service/module_service.go b/pkg/registry/service/module_service.go index 4238c950..57470b67 100644 --- a/pkg/registry/service/module_service.go +++ b/pkg/registry/service/module_service.go @@ -38,6 +38,7 @@ type ModuleService struct { *BasicService moduleReleaseChannels *ModuleReleaseService extra *BasicService + extraImages map[string]*BasicService logger *log.Logger } @@ -50,6 +51,7 @@ func NewModuleService(client registry.Client, logger *log.Logger) *ModuleService BasicService: NewBasicService(moduleServiceName, client, logger), moduleReleaseChannels: NewModuleReleaseService(NewBasicService(moduleReleaseChannelsServiceName, client.WithSegment(moduleReleaseChannelsSegment), logger)), extra: NewBasicService(moduleExtraServiceName, client.WithSegment(moduleExtraSegment), logger), + extraImages: make(map[string]*BasicService), logger: logger, } @@ -63,6 +65,20 @@ func (s *ModuleService) Extra() *BasicService { return s.extra } +// ExtraImage returns a BasicService scoped to a specific extra image (e.g., modules//extra/) +func (s *ModuleService) ExtraImage(extraName string) *BasicService { + if s.extraImages == nil { + s.extraImages = make(map[string]*BasicService) + } + + if _, exists := s.extraImages[extraName]; !exists { + extraClient := s.client.WithSegment(moduleExtraSegment).WithSegment(extraName) + s.extraImages[extraName] = NewBasicService(moduleExtraServiceName+"/"+extraName, extraClient, s.logger) + } + + return s.extraImages[extraName] +} + type ModulesService struct { client registry.Client From 964f6d7b63514dbb59ccb3b5c13559f3c33cd510 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Fri, 19 Dec 2025 15:18:05 +0300 Subject: [PATCH 08/14] fix lint Signed-off-by: Timur Tuktamyshev --- internal/data/dataexport/util/logger.go | 2 +- internal/mirror/pull.go | 14 +++++++------- pkg/libmirror/bundle/bundle.go | 4 ---- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/internal/data/dataexport/util/logger.go b/internal/data/dataexport/util/logger.go index 3cedb63c..b1afb775 100644 --- a/internal/data/dataexport/util/logger.go +++ b/internal/data/dataexport/util/logger.go @@ -1,4 +1,4 @@ -// nolint +//nolint package util import ( diff --git a/internal/mirror/pull.go b/internal/mirror/pull.go index 6ca5b5c9..0c9ab239 100644 --- a/internal/mirror/pull.go +++ b/internal/mirror/pull.go @@ -130,23 +130,23 @@ func NewPullService( // Pull downloads Deckhouse components from registry func (svc *PullService) Pull(ctx context.Context) error { if !svc.options.SkipPlatform { - err := svc.platformService.PullPlatform(ctx) - if err != nil { - return fmt.Errorf("pull platform: %w", err) + err := svc.platformService.PullPlatform(ctx) + if err != nil { + return fmt.Errorf("pull platform: %w", err) } } if !svc.options.SkipSecurity { err := svc.securityService.PullSecurity(ctx) - if err != nil { - return fmt.Errorf("pull security databases: %w", err) + if err != nil { + return fmt.Errorf("pull security databases: %w", err) } } if !svc.options.SkipModules || svc.options.OnlyExtraImages { err := svc.modulesService.PullModules(ctx) - if err != nil { - return fmt.Errorf("pull modules: %w", err) + if err != nil { + return fmt.Errorf("pull modules: %w", err) } } diff --git a/pkg/libmirror/bundle/bundle.go b/pkg/libmirror/bundle/bundle.go index f0ed3225..b3b96faa 100644 --- a/pkg/libmirror/bundle/bundle.go +++ b/pkg/libmirror/bundle/bundle.go @@ -90,10 +90,6 @@ func PackWithPrefix(ctx context.Context, sourcePath string, prefix string, sink return nil } -func packFunc(ctx context.Context, pathPrefix string, writer *tar.Writer) filepath.WalkFunc { - return packFuncWithPrefix(ctx, pathPrefix, "", writer) -} - func packFuncWithPrefix(ctx context.Context, pathPrefix string, tarPrefix string, writer *tar.Writer) filepath.WalkFunc { unixEpochStart := time.Unix(0, 0) return func(path string, info fs.FileInfo, err error) error { From f781319c78f6cc5806ec5058ed9e28657178d30e Mon Sep 17 00:00:00 2001 From: Pavel Okhlopkov Date: Fri, 19 Dec 2025 16:28:45 +0300 Subject: [PATCH 09/14] merge Signed-off-by: Pavel Okhlopkov --- internal/mirror/releases/versions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/mirror/releases/versions.go b/internal/mirror/releases/versions.go index 570ab860..15989a17 100644 --- a/internal/mirror/releases/versions.go +++ b/internal/mirror/releases/versions.go @@ -118,7 +118,7 @@ func VersionsToMirror(pullParams *params.PullParams, client registry.Client, tag alphaChannelVersion, found := releaseChannelsVersions[internal.AlphaChannel] if found { - versionsAboveMinimal := parseAndFilterVersionsAboveMinimalAnbBelowAlpha(mirrorFromVersion, tags, alphaChannelVersion) + versionsAboveMinimal := parseAndFilterVersionsAboveMinimalAndBelowAlpha(mirrorFromVersion, tags, alphaChannelVersion) versionsAboveMinimal = FilterOnlyLatestPatches(versionsAboveMinimal) return deduplicateVersions(append(vers, versionsAboveMinimal...)), channels, nil From ff321d5351c869c0a9138b33629c3e2d795d6f4d Mon Sep 17 00:00:00 2001 From: Pavel Okhlopkov Date: Fri, 19 Dec 2025 16:29:10 +0300 Subject: [PATCH 10/14] lint Signed-off-by: Pavel Okhlopkov --- internal/data/dataexport/util/logger.go | 1 - my-app | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) create mode 160000 my-app diff --git a/internal/data/dataexport/util/logger.go b/internal/data/dataexport/util/logger.go index b1afb775..bec14fb7 100644 --- a/internal/data/dataexport/util/logger.go +++ b/internal/data/dataexport/util/logger.go @@ -1,4 +1,3 @@ -//nolint package util import ( diff --git a/my-app b/my-app new file mode 160000 index 00000000..7cbea2c3 --- /dev/null +++ b/my-app @@ -0,0 +1 @@ +Subproject commit 7cbea2c320b18fe090d605db333fa637474a5991 From 9715e2998bfaf1dd0caf8247f738ce2df053d96e Mon Sep 17 00:00:00 2001 From: Pavel Okhlopkov Date: Tue, 13 Jan 2026 10:50:32 +0300 Subject: [PATCH 11/14] fix Signed-off-by: Pavel Okhlopkov --- my-app | 1 - 1 file changed, 1 deletion(-) delete mode 160000 my-app diff --git a/my-app b/my-app deleted file mode 160000 index 7cbea2c3..00000000 --- a/my-app +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 7cbea2c320b18fe090d605db333fa637474a5991 From b5c11a76fa48d8e7ee1b6a41a94b3f4e0c6bd599 Mon Sep 17 00:00:00 2001 From: Pavel Okhlopkov Date: Tue, 13 Jan 2026 10:51:31 +0300 Subject: [PATCH 12/14] remove tag check Signed-off-by: Pavel Okhlopkov --- pkg/registry/image/layout.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/registry/image/layout.go b/pkg/registry/image/layout.go index 0838b837..cb3203b6 100644 --- a/pkg/registry/image/layout.go +++ b/pkg/registry/image/layout.go @@ -98,16 +98,12 @@ func (l *ImageLayout) Path() layout.Path { } func (l *ImageLayout) AddImage(img pkg.RegistryImage, tag string) error { - // Skip if tag already exists to prevent duplicates - if _, exists := l.metaByTag[tag]; exists { - return nil - } - meta, err := img.GetMetadata() if err != nil { return fmt.Errorf("get image tag reference: %w", err) } + // TODO: support nesting tags in image l.metaByTag[tag] = meta.(*ImageMeta) err = l.wrapped.AppendImage(img, From 068243cab73af30824a8ebc466a459e271a0f673 Mon Sep 17 00:00:00 2001 From: Pavel Okhlopkov Date: Tue, 13 Jan 2026 10:56:19 +0300 Subject: [PATCH 13/14] refactor Signed-off-by: Pavel Okhlopkov --- internal/mirror/puller/puller.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/mirror/puller/puller.go b/internal/mirror/puller/puller.go index b0c284fb..e940db91 100644 --- a/internal/mirror/puller/puller.go +++ b/internal/mirror/puller/puller.go @@ -66,15 +66,20 @@ func (ps *PullerService) PullImages(ctx context.Context, config PullConfig) erro if strings.Contains(image, "@sha256:") { // Extract digest from reference digestStr := image[strings.Index(image, "@sha256:")+1:] // "sha256:abc..." + digest, err := v1.NewHash(digestStr) if err != nil { ps.userLogger.Debugf("failed to parse digest from %s: %v", image, err) + if config.AllowMissingTags { continue } + return fmt.Errorf("parse digest from reference %s: %w", image, err) } + config.ImageSet[image] = NewImageMeta(tag, image, &digest) + continue } @@ -89,6 +94,7 @@ func (ps *PullerService) PullImages(ctx context.Context, config PullConfig) erro config.ImageSet[image] = NewImageMeta(tag, image, digest) } + ps.userLogger.InfoLn("All required " + config.Name + " meta are pulled!") if err := ps.PullImageSet(ctx, config.ImageSet, config.Layout, config.GetterService.GetImage); err != nil { From 89a83d4f0a0b4c6846027e12bd03f7c35152586b Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Tue, 13 Jan 2026 12:43:22 +0300 Subject: [PATCH 14/14] fix: remove unnecessary trim and fix retry logging Signed-off-by: Timur Tuktamyshev --- internal/mirror/puller/types.go | 3 - internal/mirror/pusher/pusher.go | 23 ++------ pkg/libmirror/layouts/push.go | 92 ++++++++++++++++--------------- pkg/libmirror/util/retry/retry.go | 4 +- 4 files changed, 56 insertions(+), 66 deletions(-) diff --git a/internal/mirror/puller/types.go b/internal/mirror/puller/types.go index 42eaf880..2e85c7da 100644 --- a/internal/mirror/puller/types.go +++ b/internal/mirror/puller/types.go @@ -65,13 +65,10 @@ func NewImageMeta(version string, tagReference string, digest *v1.Hash) *ImageMe } // SplitImageRefByRepoAndTag splits an image reference into repository and tag parts -// For digest references (repo@sha256:abc), returns just the hex part as tag func SplitImageRefByRepoAndTag(imageReferenceString string) (string, string) { splitIndex := strings.LastIndex(imageReferenceString, ":") repo := imageReferenceString[:splitIndex] tag := imageReferenceString[splitIndex+1:] - repo = strings.TrimSuffix(repo, "@sha256") - return repo, tag } diff --git a/internal/mirror/pusher/pusher.go b/internal/mirror/pusher/pusher.go index a1b76067..e093abff 100644 --- a/internal/mirror/pusher/pusher.go +++ b/internal/mirror/pusher/pusher.go @@ -94,15 +94,16 @@ func (s *Service) PushLayout(ctx context.Context, layoutPath layout.Path, client continue } - s.userLogger.Infof("[%d / %d] Pushing image %s:%s", i+1, len(indexManifest.Manifests), client.GetRegistry(), tag) - img, err := index.Image(manifest.Digest) if err != nil { return fmt.Errorf("read image %s: %w", tag, err) } - err = retry.RunTaskWithContext( - ctx, silentLogger{}, "push", + imageReferenceString := fmt.Sprintf("%s:%s", client.GetRegistry(), tag) + err = retry.RunTask( + ctx, + s.userLogger, + fmt.Sprintf("[%d / %d] Pushing %s", i+1, len(indexManifest.Manifests), imageReferenceString), task.WithConstantRetries(pushRetryAttempts, pushRetryDelay, func(ctx context.Context) error { if err := client.PushImage(ctx, tag, img); err != nil { if errorutil.IsTrivyMediaTypeNotAllowedError(err) { @@ -111,8 +112,7 @@ func (s *Service) PushLayout(ctx context.Context, layoutPath layout.Path, client return fmt.Errorf("write %s:%s to registry: %w", client.GetRegistry(), tag, err) } return nil - }), - ) + })) if err != nil { return fmt.Errorf("push image %s: %w", tag, err) } @@ -143,14 +143,3 @@ func (s *Service) openChunkedPackage(bundleDir, pkgName string) (io.ReadCloser, return pkg, nil } - -// silentLogger suppresses retry task logging -type silentLogger struct{} - -func (silentLogger) Debugf(_ string, _ ...interface{}) {} -func (silentLogger) DebugLn(_ ...interface{}) {} -func (silentLogger) Infof(_ string, _ ...interface{}) {} -func (silentLogger) InfoLn(_ ...interface{}) {} -func (silentLogger) Warnf(_ string, _ ...interface{}) {} -func (silentLogger) WarnLn(_ ...interface{}) {} -func (silentLogger) Process(_ string, _ func() error) error { return nil } diff --git a/pkg/libmirror/layouts/push.go b/pkg/libmirror/layouts/push.go index ab44507f..2563c901 100644 --- a/pkg/libmirror/layouts/push.go +++ b/pkg/libmirror/layouts/push.go @@ -26,7 +26,6 @@ import ( "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/layout" - "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/hashicorp/go-multierror" "github.com/samber/lo" "github.com/samber/lo/parallel" @@ -72,10 +71,7 @@ func PushLayoutToRepoContext( parallelismConfig params.ParallelismConfig, insecure, skipVerifyTLS bool, ) error { - refOpts, remoteOpts := auth.MakeRemoteRegistryRequestOptions(authProvider, insecure, skipVerifyTLS) - if parallelismConfig.Blobs != 0 { - remoteOpts = append(remoteOpts, remote.WithJobs(parallelismConfig.Blobs)) - } + refOpts, _ := auth.MakeRemoteRegistryRequestOptions(authProvider, insecure, skipVerifyTLS) index, err := imagesLayout.ImageIndex() if err != nil { @@ -96,12 +92,17 @@ func PushLayoutToRepoContext( for _, manifestSet := range batches { if parallelismConfig.Images == 1 { - tag := manifestSet[0].Annotations["io.deckhouse.image.short_tag"] - - imageRef := registryRepo + ":" + tag - - logger.Infof("[%d / %d] Pushing image %s", imagesCount, len(indexManifest.Manifests), imageRef) - if err = pushImage(ctx, client, registryRepo, index, manifestSet[0], refOpts, remoteOpts, logger); err != nil { + cfg := &pushImageConfig{ + client: client, + registryRepo: registryRepo, + index: index, + manifest: manifestSet[0], + refOpts: refOpts, + logger: logger, + imageNum: imagesCount, + totalImages: len(indexManifest.Manifests), + } + if err = pushImage(ctx, cfg); err != nil { return fmt.Errorf("Push Image: %w", err) } imagesCount++ @@ -117,8 +118,20 @@ func PushLayoutToRepoContext( errMu := &sync.Mutex{} merr := &multierror.Error{} - parallel.ForEach(manifestSet, func(item v1.Descriptor, _ int) { - if err = pushImage(ctx, client, registryRepo, index, item, refOpts, remoteOpts, logger); err != nil { + currentImagesCount := imagesCount + parallel.ForEach(manifestSet, func(item v1.Descriptor, idx int) { + imageNum := currentImagesCount + idx + cfg := &pushImageConfig{ + client: client, + registryRepo: registryRepo, + index: index, + manifest: item, + refOpts: refOpts, + logger: logger, + imageNum: imageNum, + totalImages: len(indexManifest.Manifests), + } + if err = pushImage(ctx, cfg); err != nil { errMu.Lock() defer errMu.Unlock() merr = multierror.Append(merr, err) @@ -137,53 +150,44 @@ func PushLayoutToRepoContext( return nil } -func pushImage( - ctx context.Context, - client registry.Client, - registryRepo string, - index v1.ImageIndex, - manifest v1.Descriptor, - refOpts []name.Option, - _ []remote.Option, - _ params.Logger, -) error { - tag := manifest.Annotations["io.deckhouse.image.short_tag"] - imageRef := registryRepo + ":" + tag - img, err := index.Image(manifest.Digest) +type pushImageConfig struct { + client registry.Client + registryRepo string + index v1.ImageIndex + manifest v1.Descriptor + refOpts []name.Option + logger params.Logger + imageNum int + totalImages int +} + +func pushImage(ctx context.Context, cfg *pushImageConfig) error { + tag := cfg.manifest.Annotations["io.deckhouse.image.short_tag"] + imageRef := cfg.registryRepo + ":" + tag + img, err := cfg.index.Image(cfg.manifest.Digest) if err != nil { return fmt.Errorf("Read image: %v", err) } - ref, err := name.ParseReference(imageRef, refOpts...) + ref, err := name.ParseReference(imageRef, cfg.refOpts...) if err != nil { return fmt.Errorf("Parse image reference: %v", err) } - err = retry.RunTaskWithContext( - ctx, silentLogger{}, "push", + err = retry.RunTask( + ctx, + cfg.logger, + fmt.Sprintf("[%d / %d] Pushing %s", cfg.imageNum, cfg.totalImages, imageRef), task.WithConstantRetries(4, 3*time.Second, func(ctx context.Context) error { - if err = client.PushImage(ctx, tag, img); err != nil { + if err = cfg.client.PushImage(ctx, tag, img); err != nil { if errorutil.IsTrivyMediaTypeNotAllowedError(err) { return fmt.Errorf(errorutil.CustomTrivyMediaTypesWarning) } return fmt.Errorf("Write %s to registry: %w", ref.String(), err) } return nil - }), - ) + })) if err != nil { return fmt.Errorf("Run push task: %v", err) } return nil } - -type silentLogger struct{} - -var _ params.Logger = silentLogger{} - -func (silentLogger) Debugf(_ string, _ ...interface{}) {} -func (silentLogger) DebugLn(_ ...interface{}) {} -func (silentLogger) Infof(_ string, _ ...interface{}) {} -func (silentLogger) InfoLn(_ ...interface{}) {} -func (silentLogger) Warnf(_ string, _ ...interface{}) {} -func (silentLogger) WarnLn(_ ...interface{}) {} -func (silentLogger) Process(_ string, _ func() error) error { return nil } diff --git a/pkg/libmirror/util/retry/retry.go b/pkg/libmirror/util/retry/retry.go index 0521a0b7..cde1f680 100644 --- a/pkg/libmirror/util/retry/retry.go +++ b/pkg/libmirror/util/retry/retry.go @@ -31,10 +31,10 @@ type Task interface { } func RunTask(ctx context.Context, logger params.Logger, name string, task Task) error { - return RunTaskWithContext(ctx, logger, name, task) + return runTaskWithContext(ctx, logger, name, task) } -func RunTaskWithContext(ctx context.Context, logger params.Logger, name string, task Task) error { +func runTaskWithContext(ctx context.Context, logger params.Logger, name string, task Task) error { restarts := uint(0) var lastErr error for restarts < task.MaxRetries() {