From 967c20e7e95328a4444c283abce516a84ff61ecd Mon Sep 17 00:00:00 2001 From: Matios102 Date: Tue, 9 Dec 2025 14:14:56 +0100 Subject: [PATCH 1/5] feat: add integration tests for Docker client and update CI workflow --- .github/workflows/publish-coverage.yaml | 27 +- .../docker/docker_client_integration_test.go | 708 ++++++++++++++++++ run_integration_tests.sh | 75 ++ 3 files changed, 808 insertions(+), 2 deletions(-) create mode 100644 internal/docker/docker_client_integration_test.go create mode 100755 run_integration_tests.sh diff --git a/.github/workflows/publish-coverage.yaml b/.github/workflows/publish-coverage.yaml index 94dd1d6..bd77573 100644 --- a/.github/workflows/publish-coverage.yaml +++ b/.github/workflows/publish-coverage.yaml @@ -10,6 +10,17 @@ jobs: test: name: Run tests and collect coverage runs-on: ubuntu-latest + services: + docker: + image: docker:dind + options: >- + --privileged + --health-cmd="docker ps" + --health-interval=10s + --health-timeout=5s + --health-retries=5 + volumes: + - /var/run/docker.sock:/var/run/docker.sock steps: - name: Checkout uses: actions/checkout@v4 @@ -24,10 +35,22 @@ jobs: - name: Install dependencies run: go mod download - - name: Run tests - run: go test -coverprofile=coverage.txt ./... + - name: Run unit tests with coverage + run: go test -coverprofile=coverage-unit.txt -covermode=atomic ./... + + - name: Run integration tests with coverage + env: + DOCKER_HOST: unix:///var/run/docker.sock + run: go test -tags=integration -coverprofile=coverage-integration.txt -covermode=atomic ./... + + - name: Merge coverage reports + run: | + go install github.com/wadey/gocovmerge@latest + gocovmerge coverage-unit.txt coverage-integration.txt > coverage.txt - name: Upload results to Codecov uses: codecov/codecov-action@v5 with: token: ${{ secrets.CODECOV_TOKEN }} + files: ./coverage.txt + fail_ci_if_error: true diff --git a/internal/docker/docker_client_integration_test.go b/internal/docker/docker_client_integration_test.go new file mode 100644 index 0000000..295002b --- /dev/null +++ b/internal/docker/docker_client_integration_test.go @@ -0,0 +1,708 @@ +package docker_test + +import ( + "context" + "errors" + "fmt" + "io" + "testing" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/mount" + "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/client" + "github.com/mini-maxit/worker/internal/docker" + "github.com/mini-maxit/worker/pkg/constants" + pkgerrors "github.com/mini-maxit/worker/pkg/errors" + "github.com/mini-maxit/worker/pkg/languages" +) + +const ( + testVolumePrefix = "docker_client_test_vol_" + testContainerName = "docker_client_test_container_" +) + +var ( + // testLanguages contains all supported languages to test. + testLanguages []languages.LanguageType + // testImageName is set from the first supported language. + testImageName string +) + +func setupTestLanguages() { + // Get all supported languages + supportedLangs := languages.GetSupportedLanguages() + for _, langStr := range supportedLangs { + if lang, err := languages.ParseLanguageType(langStr); err == nil { + testLanguages = append(testLanguages, lang) + } + } + + // Use the first language's image for basic tests + if len(testLanguages) > 0 { + if image, err := testLanguages[0].GetDockerImage(""); err == nil { + testImageName = image + } + } +} + +// TestNewDockerClient_Success tests successful creation of a docker client. +func TestNewDockerClient_Success(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Setup test languages on first test run + if len(testLanguages) == 0 { + setupTestLanguages() + } + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + if dc == nil { + t.Fatal("docker client is nil") + } +} + +// TestNewDockerClient_WithVolume tests creating a client with a volume check. +func TestNewDockerClient_WithVolume(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + volumeName := createTestVolume(t) + defer cleanupVolume(t, volumeName) + + containerID := createContainerWithVolume(t, volumeName) + defer cleanupContainer(t, containerID) + + dc, err := docker.NewDockerClient(volumeName) + if err != nil { + t.Fatalf("failed to create docker client with volume: %v", err) + } + if dc.DataVolumeName() != volumeName { + t.Errorf("expected volume name %s, got %s", volumeName, dc.DataVolumeName()) + } +} + +// TestDataVolumeName tests the DataVolumeName getter. +func TestDataVolumeName(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + if dc.DataVolumeName() != "" { + t.Errorf("expected empty volume name, got %s", dc.DataVolumeName()) + } +} + +// TestCheckDataVolume_NotMounted tests error when volume is not mounted. +func TestCheckDataVolume_NotMounted(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + err = dc.CheckDataVolume("non_existent_volume_test") + if !errors.Is(err, pkgerrors.ErrVolumeNotMounted) { + t.Errorf("expected ErrVolumeNotMounted, got %v", err) + } +} + +// TestCheckDataVolume_Mounted tests successful volume mount check. +func TestCheckDataVolume_Mounted(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + volumeName := createTestVolume(t) + defer cleanupVolume(t, volumeName) + + containerID := createContainerWithVolume(t, volumeName) + defer cleanupContainer(t, containerID) + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + err = dc.CheckDataVolume(volumeName) + if err != nil { + t.Errorf("expected no error when checking mounted volume, got %v", err) + } +} + +// TestEnsureImage_PullImage tests pulling an image if it doesn't exist. +func TestEnsureImage_PullImage(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + removeImageIfExists(t, testImageName) + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + err = dc.EnsureImage(ctx, testImageName) + if err != nil { + t.Fatalf("failed to ensure image: %v", err) + } + + verifyImageExists(t, testImageName) +} + +// TestEnsureImage_AllSupportedLanguages tests pulling all supported language runtime images. +func TestEnsureImage_AllSupportedLanguages(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + if len(testLanguages) == 0 { + t.Skip("No supported languages configured") + } + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + for _, lang := range testLanguages { + langName := lang.String() + imageName, err := lang.GetDockerImage("") + if err != nil { + t.Fatalf("failed to get docker image for language %s: %v", langName, err) + } + + t.Logf("Testing language: %s (image: %s)", langName, imageName) + + // Ensure the image can be pulled + err = dc.EnsureImage(ctx, imageName) + if err != nil { + t.Errorf("failed to ensure image for language %s: %v", langName, err) + continue + } + + // Verify the image exists + verifyImageExists(t, imageName) + } +} + +// TestEnsureImage_ImageAlreadyExists tests when image is already present. +func TestEnsureImage_ImageAlreadyExists(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + err = dc.EnsureImage(ctx, testImageName) + if err != nil { + t.Fatalf("failed to ensure image first time: %v", err) + } + + err = dc.EnsureImage(ctx, testImageName) + if err != nil { + t.Errorf("expected no error when image already exists, got %v", err) + } +} + +// TestCreateAndStartContainer_Success tests creating and starting a container. +func TestCreateAndStartContainer_Success(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + err = dc.EnsureImage(ctx, testImageName) + if err != nil { + t.Fatalf("failed to ensure image: %v", err) + } + + containerCfg := &container.Config{ + Image: testImageName, + Cmd: []string{"echo", "hello world"}, + Entrypoint: nil, + } + + hostCfg := &container.HostConfig{} + containerName := fmt.Sprintf("%s%d", testContainerName, time.Now().UnixNano()) + + containerID, err := dc.CreateAndStartContainer(ctx, containerCfg, hostCfg, containerName) + if err != nil { + t.Fatalf("failed to create and start container: %v", err) + } + defer cleanupContainer(t, containerID) + + if containerID == "" { + t.Fatal("container ID is empty") + } + + verifyContainerExists(t, containerID) +} + +// TestWaitContainer_Success tests waiting for a container to finish. +func TestWaitContainer_Success(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + err = dc.EnsureImage(ctx, testImageName) + if err != nil { + t.Fatalf("failed to ensure image: %v", err) + } + + containerCfg := &container.Config{ + Image: testImageName, + Cmd: []string{"sh", "-c", "echo hello"}, + } + + hostCfg := &container.HostConfig{} + containerName := fmt.Sprintf("%s%d", testContainerName, time.Now().UnixNano()) + + containerID, err := dc.CreateAndStartContainer(ctx, containerCfg, hostCfg, containerName) + if err != nil { + t.Fatalf("failed to create and start container: %v", err) + } + defer cleanupContainer(t, containerID) + + exitCode, err := dc.WaitContainer(ctx, containerID) + if err != nil { + t.Errorf("failed to wait for container: %v", err) + } + + if exitCode != 0 { + t.Errorf("expected exit code 0, got %d", exitCode) + } +} + +// TestWaitContainer_WithTimeout tests waiting for a container with timeout. +func TestWaitContainer_WithTimeout(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + err = dc.EnsureImage(ctx, testImageName) + if err != nil { + t.Fatalf("failed to ensure image: %v", err) + } + + containerCfg := &container.Config{ + Image: testImageName, + Cmd: []string{"sh", "-c", "sleep 2"}, + } + + hostCfg := &container.HostConfig{} + containerName := fmt.Sprintf("%s%d", testContainerName, time.Now().UnixNano()) + + containerID, err := dc.CreateAndStartContainer(ctx, containerCfg, hostCfg, containerName) + if err != nil { + t.Fatalf("failed to create and start container: %v", err) + } + defer cleanupContainer(t, containerID) + + shortCtx, shortCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer shortCancel() + + _, err = dc.WaitContainer(shortCtx, containerID) + if err == nil { + t.Error("expected timeout error, got nil") + } +} + +// TestContainerWait_Success tests the ContainerWait method. +func TestContainerWait_Success(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + err = dc.EnsureImage(ctx, testImageName) + if err != nil { + t.Fatalf("failed to ensure image: %v", err) + } + + containerCfg := &container.Config{ + Image: testImageName, + Cmd: []string{"sh", "-c", "exit 5"}, + } + + hostCfg := &container.HostConfig{} + containerName := fmt.Sprintf("%s%d", testContainerName, time.Now().UnixNano()) + + containerID, err := dc.CreateAndStartContainer(ctx, containerCfg, hostCfg, containerName) + if err != nil { + t.Fatalf("failed to create and start container: %v", err) + } + defer cleanupContainer(t, containerID) + + statusCh, errCh := dc.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) + + select { + case err := <-errCh: + t.Errorf("unexpected error from ContainerWait: %v", err) + case status := <-statusCh: + if status.StatusCode != 5 { + t.Errorf("expected exit code 5, got %d", status.StatusCode) + } + case <-time.After(10 * time.Second): + t.Error("ContainerWait timed out") + } +} + +// TestContainerKill tests killing a running container. +func TestContainerKill(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + err = dc.EnsureImage(ctx, testImageName) + if err != nil { + t.Fatalf("failed to ensure image: %v", err) + } + + containerCfg := &container.Config{ + Image: testImageName, + Cmd: []string{"sh", "-c", "sleep 100"}, + } + + hostCfg := &container.HostConfig{} + containerName := fmt.Sprintf("%s%d", testContainerName, time.Now().UnixNano()) + + containerID, err := dc.CreateAndStartContainer(ctx, containerCfg, hostCfg, containerName) + if err != nil { + t.Fatalf("failed to create and start container: %v", err) + } + defer cleanupContainer(t, containerID) + + err = dc.ContainerKill(ctx, containerID, "KILL") + if err != nil { + t.Errorf("failed to kill container: %v", err) + } + + time.Sleep(500 * time.Millisecond) + verifyContainerStopped(t, containerID) +} + +// TestCreateContainerWithVolume tests creating a container with a mounted volume. +func TestCreateContainerWithVolume(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + dc, err := docker.NewDockerClient("") + if err != nil { + t.Fatalf("failed to create docker client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + err = dc.EnsureImage(ctx, testImageName) + if err != nil { + t.Fatalf("failed to ensure image: %v", err) + } + + volumeName := createTestVolume(t) + defer cleanupVolume(t, volumeName) + + containerCfg := &container.Config{ + Image: testImageName, + Cmd: []string{"sh", "-c", "echo hello > " + constants.TmpDirPath + "/test.txt && sleep 1"}, + } + + hostCfg := &container.HostConfig{ + Mounts: []mount.Mount{ + { + Type: mount.TypeVolume, + Source: volumeName, + Target: constants.TmpDirPath, + }, + }, + } + + containerName := fmt.Sprintf("%s%d", testContainerName, time.Now().UnixNano()) + + containerID, err := dc.CreateAndStartContainer(ctx, containerCfg, hostCfg, containerName) + if err != nil { + t.Fatalf("failed to create and start container with volume: %v", err) + } + defer cleanupContainer(t, containerID) + + exitCode, err := dc.WaitContainer(ctx, containerID) + if err != nil { + t.Errorf("failed to wait for container: %v", err) + } + + if exitCode != 0 { + t.Errorf("expected exit code 0, got %d", exitCode) + } +} + +// Helper functions + +func createTestVolume(t *testing.T) string { + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + t.Fatalf("failed to create docker client for volume creation: %v", err) + } + defer func() { + _ = cli.Close() + }() + + volumeName := fmt.Sprintf("%s%d", testVolumePrefix, time.Now().UnixNano()) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err = cli.VolumeCreate(ctx, volume.CreateOptions{Name: volumeName}) + if err != nil { + t.Fatalf("failed to create test volume: %v", err) + } + + return volumeName +} + +func cleanupVolume(t *testing.T, volumeName string) { + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + t.Logf("failed to create docker client for volume cleanup: %v", err) + return + } + defer func() { + _ = cli.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := cli.VolumeRemove(ctx, volumeName, false); err != nil { + t.Logf("failed to remove test volume %s: %v", volumeName, err) + } +} + +func createContainerWithVolume(t *testing.T, volumeName string) string { + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + t.Fatalf("failed to create docker client for container creation: %v", err) + } + defer func() { + _ = cli.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, err = cli.ImageInspect(ctx, testImageName) + if err != nil { + reader, err := cli.ImagePull(ctx, testImageName, image.PullOptions{}) + if err != nil { + t.Fatalf("failed to pull image: %v", err) + } + defer reader.Close() + _, _ = io.Copy(io.Discard, reader) + } + + containerCfg := &container.Config{ + Image: testImageName, + Cmd: []string{"sleep", "1000"}, + } + + hostCfg := &container.HostConfig{ + Mounts: []mount.Mount{ + { + Type: mount.TypeVolume, + Source: volumeName, + Target: constants.TmpDirPath, + }, + }, + } + + resp, err := cli.ContainerCreate(ctx, containerCfg, hostCfg, nil, nil, "") + if err != nil { + t.Fatalf("failed to create container: %v", err) + } + + if err := cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { + t.Fatalf("failed to start container: %v", err) + } + + return resp.ID +} + +func cleanupContainer(t *testing.T, containerID string) { + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + t.Logf("failed to create docker client for container cleanup: %v", err) + return + } + defer func() { + _ = cli.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _ = cli.ContainerKill(ctx, containerID, "KILL") + + if err := cli.ContainerRemove(ctx, containerID, container.RemoveOptions{Force: true}); err != nil { + t.Logf("failed to remove test container %s: %v", containerID, err) + } +} + +func removeImageIfExists(t *testing.T, imageName string) { + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + t.Logf("failed to create docker client for image removal: %v", err) + return + } + defer func() { + _ = cli.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + images, err := cli.ImageList(ctx, image.ListOptions{}) + if err != nil { + t.Logf("failed to list images: %v", err) + return + } + + for _, img := range images { + for _, repoTag := range img.RepoTags { + if repoTag == imageName { + _, err := cli.ImageRemove(ctx, imageName, image.RemoveOptions{}) + if err != nil { + t.Logf("failed to remove image %s: %v", imageName, err) + } + return + } + } + } +} + +func verifyImageExists(t *testing.T, imageName string) { + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + t.Logf("failed to create docker client for image verification: %v", err) + return + } + defer func() { + _ = cli.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err = cli.ImageInspect(ctx, imageName) + if err != nil { + t.Errorf("failed to verify image exists: %v", err) + } +} + +func verifyContainerExists(t *testing.T, containerID string) { + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + t.Logf("failed to create docker client for container verification: %v", err) + return + } + defer func() { + _ = cli.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err = cli.ContainerInspect(ctx, containerID) + if err != nil { + t.Errorf("failed to verify container exists: %v", err) + } +} + +func verifyContainerStopped(t *testing.T, containerID string) { + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + t.Logf("failed to create docker client for container status verification: %v", err) + return + } + defer func() { + _ = cli.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + inspect, err := cli.ContainerInspect(ctx, containerID) + if err != nil { + t.Errorf("failed to inspect container: %v", err) + return + } + + if inspect.State.Running { + t.Errorf("expected container to be stopped, but it's still running") + } +} diff --git a/run_integration_tests.sh b/run_integration_tests.sh new file mode 100755 index 0000000..cff8607 --- /dev/null +++ b/run_integration_tests.sh @@ -0,0 +1,75 @@ +#!/bin/bash + +set -e + +TEST_SUITES=( + "./internal/docker/..." +) + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +# Detect platform and configure DOCKER_HOST if needed +if [[ "$OSTYPE" == "darwin"* ]]; then + # macOS + if [ -z "$DOCKER_HOST" ]; then + DOCKER_HOST="unix://$HOME/.docker/run/docker.sock" + export DOCKER_HOST + echo "[*] Detected macOS. Setting DOCKER_HOST=$DOCKER_HOST" + fi +elif [[ "$OSTYPE" == "linux-gnu"* ]]; then + # Linux + if [ -z "$DOCKER_HOST" ]; then + DOCKER_HOST="unix:///var/run/docker.sock" + export DOCKER_HOST + echo "[*] Detected Linux. Setting DOCKER_HOST=$DOCKER_HOST" + fi +elif [[ "$OSTYPE" == "msys" || "$OSTYPE" == "cygwin" ]]; then + # Windows + echo "[*] Detected Windows. Using Docker Desktop configuration." +fi + +# Check if Docker is accessible +if ! docker ps &>/dev/null; then + echo "[ERROR] Docker daemon is not accessible at $DOCKER_HOST" + exit 1 +fi + +echo "[OK] Docker daemon is accessible" +echo "" + +# Run the tests +echo "[*] Running integration tests..." +cd "$SCRIPT_DIR" + +TEST_FLAGS="-v -tags=integration -timeout 5m" + +# Print test suites to be executed +echo "Test suites to be executed (${#TEST_SUITES[@]} total):" +for suite in "${TEST_SUITES[@]}"; do + echo " - $suite" +done +echo "" + +EXIT_CODE=0 + +# Run each test suite +for suite in "${TEST_SUITES[@]}"; do + echo "Running: go test $TEST_FLAGS $suite" + echo "" + + if ! go test $TEST_FLAGS "$suite"; then + EXIT_CODE=1 + fi + + echo "" +done + +if [ $EXIT_CODE -eq 0 ]; then + echo "" + echo "[OK] All tests passed!" +else + echo "" + echo "[ERROR] Tests failed with exit code $EXIT_CODE" +fi + +exit $EXIT_CODE From d13cb1cd82c1f26e11217eacc054925e1b1d1146 Mon Sep 17 00:00:00 2001 From: Matios102 Date: Tue, 9 Dec 2025 14:26:59 +0100 Subject: [PATCH 2/5] fix: add build constraint for integration tests in docker client --- internal/docker/docker_client_integration_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/docker/docker_client_integration_test.go b/internal/docker/docker_client_integration_test.go index 295002b..ce19d55 100644 --- a/internal/docker/docker_client_integration_test.go +++ b/internal/docker/docker_client_integration_test.go @@ -1,3 +1,5 @@ +//go:build integration + package docker_test import ( From 047f27ed1d33ca318e12dbed3a04f0eb8f05cdea Mon Sep 17 00:00:00 2001 From: Matios102 Date: Sat, 20 Dec 2025 22:36:27 +0100 Subject: [PATCH 3/5] feat: ensure test languages are initialized exactly once in integration tests --- .../docker/docker_client_integration_test.go | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/internal/docker/docker_client_integration_test.go b/internal/docker/docker_client_integration_test.go index ce19d55..5407105 100644 --- a/internal/docker/docker_client_integration_test.go +++ b/internal/docker/docker_client_integration_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "sync" "testing" "time" @@ -31,6 +32,8 @@ var ( testLanguages []languages.LanguageType // testImageName is set from the first supported language. testImageName string + // setupOnce ensures setupTestLanguages is called exactly once, preventing race conditions. + setupOnce sync.Once ) func setupTestLanguages() { @@ -50,6 +53,16 @@ func setupTestLanguages() { } } +// ensureTestLanguagesSetup ensures test languages are initialized exactly once. +func ensureTestLanguagesSetup(t *testing.T) { + setupOnce.Do(func() { + setupTestLanguages() + if testImageName == "" { + t.Fatal("no supported languages configured or docker image not available") + } + }) +} + // TestNewDockerClient_Success tests successful creation of a docker client. func TestNewDockerClient_Success(t *testing.T) { if testing.Short() { @@ -153,6 +166,7 @@ func TestEnsureImage_PullImage(t *testing.T) { t.Skip("Skipping integration test in short mode") } + ensureTestLanguagesSetup(t) removeImageIfExists(t, testImageName) dc, err := docker.NewDockerClient("") @@ -177,9 +191,7 @@ func TestEnsureImage_AllSupportedLanguages(t *testing.T) { t.Skip("Skipping integration test in short mode") } - if len(testLanguages) == 0 { - t.Skip("No supported languages configured") - } + ensureTestLanguagesSetup(t) dc, err := docker.NewDockerClient("") if err != nil { @@ -216,6 +228,8 @@ func TestEnsureImage_ImageAlreadyExists(t *testing.T) { t.Skip("Skipping integration test in short mode") } + ensureTestLanguagesSetup(t) + dc, err := docker.NewDockerClient("") if err != nil { t.Fatalf("failed to create docker client: %v", err) @@ -241,6 +255,8 @@ func TestCreateAndStartContainer_Success(t *testing.T) { t.Skip("Skipping integration test in short mode") } + ensureTestLanguagesSetup(t) + dc, err := docker.NewDockerClient("") if err != nil { t.Fatalf("failed to create docker client: %v", err) @@ -282,6 +298,8 @@ func TestWaitContainer_Success(t *testing.T) { t.Skip("Skipping integration test in short mode") } + ensureTestLanguagesSetup(t) + dc, err := docker.NewDockerClient("") if err != nil { t.Fatalf("failed to create docker client: %v", err) @@ -325,6 +343,8 @@ func TestWaitContainer_WithTimeout(t *testing.T) { t.Skip("Skipping integration test in short mode") } + ensureTestLanguagesSetup(t) + dc, err := docker.NewDockerClient("") if err != nil { t.Fatalf("failed to create docker client: %v", err) @@ -367,6 +387,8 @@ func TestContainerWait_Success(t *testing.T) { t.Skip("Skipping integration test in short mode") } + ensureTestLanguagesSetup(t) + dc, err := docker.NewDockerClient("") if err != nil { t.Fatalf("failed to create docker client: %v", err) @@ -414,6 +436,8 @@ func TestContainerKill(t *testing.T) { t.Skip("Skipping integration test in short mode") } + ensureTestLanguagesSetup(t) + dc, err := docker.NewDockerClient("") if err != nil { t.Fatalf("failed to create docker client: %v", err) @@ -456,6 +480,8 @@ func TestCreateContainerWithVolume(t *testing.T) { t.Skip("Skipping integration test in short mode") } + ensureTestLanguagesSetup(t) + dc, err := docker.NewDockerClient("") if err != nil { t.Fatalf("failed to create docker client: %v", err) @@ -547,6 +573,8 @@ func cleanupVolume(t *testing.T, volumeName string) { } func createContainerWithVolume(t *testing.T, volumeName string) string { + ensureTestLanguagesSetup(t) + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { t.Fatalf("failed to create docker client for container creation: %v", err) From d2b40916fb8be82bde6938b2098de40ca6bd3aed Mon Sep 17 00:00:00 2001 From: Matios102 Date: Sun, 11 Jan 2026 11:10:33 +0100 Subject: [PATCH 4/5] feat: add RabbitMQ integration tests and setup in Docker environment --- .../rabbitmq/connection_integration_test.go | 155 +++++ .../consumer/consumer_integration_test.go | 117 ++++ .../responder/responder_integration_test.go | 620 ++++++++++++++++++ run_integration_tests.sh | 31 +- 4 files changed, 922 insertions(+), 1 deletion(-) create mode 100644 internal/rabbitmq/connection_integration_test.go create mode 100644 internal/rabbitmq/consumer/consumer_integration_test.go create mode 100644 internal/rabbitmq/responder/responder_integration_test.go diff --git a/internal/rabbitmq/connection_integration_test.go b/internal/rabbitmq/connection_integration_test.go new file mode 100644 index 0000000..255d246 --- /dev/null +++ b/internal/rabbitmq/connection_integration_test.go @@ -0,0 +1,155 @@ +//go:build integration + +package rabbitmq_test + +import ( + "testing" + "time" + + "github.com/mini-maxit/worker/internal/config" + "github.com/mini-maxit/worker/internal/rabbitmq" + "github.com/mini-maxit/worker/pkg/constants" + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + testRabbitMQURL = "amqp://guest:guest@localhost:5672/" +) + +// TestNewRabbitMqConnection_Success tests successful connection to RabbitMQ. +func TestNewRabbitMqConnection_Success(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + cfg := &config.Config{ + RabbitMQURL: testRabbitMQURL, + } + + conn := rabbitmq.NewRabbitMqConnection(cfg) + if conn == nil { + t.Fatal("expected non-nil connection") + } + defer func() { + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + if conn.IsClosed() { + t.Fatal("connection should not be closed") + } +} + +// TestNewRabbitMQChannel_Success tests creating a channel from connection. +func TestNewRabbitMQChannel_Success(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + cfg := &config.Config{ + RabbitMQURL: testRabbitMQURL, + } + + conn := rabbitmq.NewRabbitMqConnection(cfg) + if conn == nil { + t.Fatal("expected non-nil connection") + } + defer func() { + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + channel := rabbitmq.NewRabbitMQChannel(conn) + if channel == nil { + t.Fatal("expected non-nil channel") + } +} + +// TestChannelOperations_QueueDeclare tests basic queue declaration. +func TestChannelOperations_QueueDeclare(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + cfg := &config.Config{ + RabbitMQURL: testRabbitMQURL, + } + + conn := rabbitmq.NewRabbitMqConnection(cfg) + defer func() { + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + channel := rabbitmq.NewRabbitMQChannel(conn) + + queueName := "test_queue_" + time.Now().Format("20060102150405") + args := make(amqp.Table) + args["x-max-priority"] = constants.RabbitMQMaxPriority + + queue, err := channel.QueueDeclare(queueName, false, true, false, false, args) + if err != nil { + t.Fatalf("failed to declare queue: %v", err) + } + + if queue.Name != queueName { + t.Fatalf("expected queue name %s, got %s", queueName, queue.Name) + } +} + +// TestChannelOperations_PublishAndConsume tests publishing and consuming messages. +func TestChannelOperations_PublishAndConsume(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + cfg := &config.Config{ + RabbitMQURL: testRabbitMQURL, + } + + conn := rabbitmq.NewRabbitMqConnection(cfg) + defer func() { + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + channel := rabbitmq.NewRabbitMQChannel(conn) + + queueName := "test_pubsub_" + time.Now().Format("20060102150405") + + // Declare queue + queue, err := channel.QueueDeclare(queueName, false, true, false, false, nil) + if err != nil { + t.Fatalf("failed to declare queue: %v", err) + } + + // Publish a message + testMessage := "Hello, RabbitMQ!" + err = channel.Publish("", queue.Name, false, false, amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(testMessage), + }) + if err != nil { + t.Fatalf("failed to publish message: %v", err) + } + + // Consume messages + msgs, err := channel.Consume(queue.Name, "", true, false, false, false, nil) + if err != nil { + t.Fatalf("failed to consume messages: %v", err) + } + + // Wait for message + select { + case msg := <-msgs: + if string(msg.Body) != testMessage { + t.Fatalf("expected message %s, got %s", testMessage, string(msg.Body)) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for message") + } +} diff --git a/internal/rabbitmq/consumer/consumer_integration_test.go b/internal/rabbitmq/consumer/consumer_integration_test.go new file mode 100644 index 0000000..2a9fe5e --- /dev/null +++ b/internal/rabbitmq/consumer/consumer_integration_test.go @@ -0,0 +1,117 @@ +//go:build integration + +package consumer_test + +import ( + "encoding/json" + "testing" + "time" + + "go.uber.org/mock/gomock" + + "github.com/mini-maxit/worker/internal/config" + "github.com/mini-maxit/worker/internal/rabbitmq" + "github.com/mini-maxit/worker/internal/rabbitmq/consumer" + "github.com/mini-maxit/worker/internal/rabbitmq/responder" + "github.com/mini-maxit/worker/pkg/constants" + "github.com/mini-maxit/worker/pkg/languages" + "github.com/mini-maxit/worker/pkg/messages" + "github.com/mini-maxit/worker/tests/mocks" + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + testRabbitMQURL = "amqp://guest:guest@localhost:5672/" +) + +// TestConsumer_ProcessHandshakeMessage tests processing handshake messages. +func TestConsumer_ProcessHandshakeMessage(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + cfg := &config.Config{ + RabbitMQURL: testRabbitMQURL, + } + + conn := rabbitmq.NewRabbitMqConnection(cfg) + defer conn.Close() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + channel := rabbitmq.NewRabbitMQChannel(conn) + resp := responder.NewResponder(channel, constants.DefaultRabbitmqPublishChanSize) + defer resp.Close() + + mockScheduler := mocks.NewMockScheduler(ctrl) + workerQueueName := "test_consumer_handshake_" + time.Now().Format("20060102150405") + + cons := consumer.NewConsumer(channel, workerQueueName, mockScheduler, resp) + + // Create a response queue + responseQueueName := "test_response_handshake_" + time.Now().Format("20060102150405") + responseQueue, err := channel.QueueDeclare(responseQueueName, false, true, false, false, nil) + if err != nil { + t.Fatalf("failed to declare response queue: %v", err) + } + + // Create a handshake message + handshakeMessage := messages.QueueMessage{ + Type: constants.QueueMessageTypeHandshake, + MessageID: "handshake-msg-id", + Payload: json.RawMessage("{}"), + } + + messageJSON, err := json.Marshal(handshakeMessage) + if err != nil { + t.Fatalf("failed to marshal message: %v", err) + } + + // Create a delivery to simulate message consumption + delivery := amqp.Delivery{ + Body: messageJSON, + ReplyTo: responseQueue.Name, + } + + // Process the message + cons.ProcessMessage(delivery) + + // Consume the response + msgs, err := channel.Consume(responseQueue.Name, "", true, false, false, false, nil) + if err != nil { + t.Fatalf("failed to consume messages: %v", err) + } + + // Wait for response + select { + case msg := <-msgs: + var responseMsg messages.ResponseQueueMessage + if err := json.Unmarshal(msg.Body, &responseMsg); err != nil { + t.Fatalf("failed to unmarshal response: %v", err) + } + + if responseMsg.Type != constants.QueueMessageTypeHandshake { + t.Fatalf("expected type %s, got %s", constants.QueueMessageTypeHandshake, responseMsg.Type) + } + + if !responseMsg.Ok { + t.Fatal("expected Ok to be true for handshake response") + } + + // Verify the payload contains language information + var handshakePayload struct { + Languages []languages.LanguageSpec `json:"languages"` + } + if err := json.Unmarshal(responseMsg.Payload, &handshakePayload); err != nil { + t.Fatalf("failed to unmarshal handshake payload: %v", err) + } + + if len(handshakePayload.Languages) == 0 { + t.Fatal("expected at least one language in handshake response") + } + + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for handshake response") + } +} diff --git a/internal/rabbitmq/responder/responder_integration_test.go b/internal/rabbitmq/responder/responder_integration_test.go new file mode 100644 index 0000000..40419b4 --- /dev/null +++ b/internal/rabbitmq/responder/responder_integration_test.go @@ -0,0 +1,620 @@ +//go:build integration + +package responder_test + +import ( + "encoding/json" + "testing" + "time" + + "github.com/mini-maxit/worker/internal/config" + "github.com/mini-maxit/worker/internal/rabbitmq" + "github.com/mini-maxit/worker/internal/rabbitmq/responder" + "github.com/mini-maxit/worker/pkg/constants" + "github.com/mini-maxit/worker/pkg/errors" + "github.com/mini-maxit/worker/pkg/languages" + "github.com/mini-maxit/worker/pkg/messages" + "github.com/mini-maxit/worker/pkg/solution" + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + testRabbitMQURL = "amqp://guest:guest@localhost:5672/" +) + +// setupResponderTest creates a connection, channel, and responder for testing. +func setupResponderTest(t *testing.T) (*amqp.Connection, responder.Responder) { + cfg := &config.Config{ + RabbitMQURL: testRabbitMQURL, + } + + conn := rabbitmq.NewRabbitMqConnection(cfg) + if conn == nil { + t.Fatal("expected non-nil connection") + } + + channel := rabbitmq.NewRabbitMQChannel(conn) + if channel == nil { + t.Fatal("expected non-nil channel") + } + + resp := responder.NewResponder(channel, constants.DefaultRabbitmqPublishChanSize) + if resp == nil { + t.Fatal("expected non-nil responder") + } + + return conn, resp +} + +// TestNewResponder_Success tests creating a new responder. +func TestNewResponder_Success(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + conn, resp := setupResponderTest(t) + defer func() { + if err := resp.Close(); err != nil { + t.Logf("failed to close responder: %v", err) + } + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + // Test that responder was created successfully + if resp == nil { + t.Fatal("responder should not be nil") + } +} + +// TestResponder_Publish tests publishing a message to a queue. +func TestResponder_Publish(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + conn, resp := setupResponderTest(t) + defer func() { + if err := resp.Close(); err != nil { + t.Logf("failed to close responder: %v", err) + } + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + // Create a test queue + channel := rabbitmq.NewRabbitMQChannel(conn) + queueName := "test_publish_" + time.Now().Format("20060102150405") + queue, err := channel.QueueDeclare(queueName, false, true, false, false, nil) + if err != nil { + t.Fatalf("failed to declare queue: %v", err) + } + + // Publish a message + testMessage := "test message" + err = resp.Publish(queue.Name, amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(testMessage), + }) + if err != nil { + t.Fatalf("failed to publish message: %v", err) + } + + // Verify message was published by consuming it + msgs, err := channel.Consume(queue.Name, "", true, false, false, false, nil) + if err != nil { + t.Fatalf("failed to consume messages: %v", err) + } + + select { + case msg := <-msgs: + if string(msg.Body) != testMessage { + t.Fatalf("expected message %s, got %s", testMessage, string(msg.Body)) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for message") + } +} + +// TestResponder_PublishMultiple tests publishing multiple messages. +func TestResponder_PublishMultiple(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + conn, resp := setupResponderTest(t) + defer func() { + if err := resp.Close(); err != nil { + t.Logf("failed to close responder: %v", err) + } + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + // Create a test queue + channel := rabbitmq.NewRabbitMQChannel(conn) + queueName := "test_publish_multi_" + time.Now().Format("20060102150405") + queue, err := channel.QueueDeclare(queueName, false, true, false, false, nil) + if err != nil { + t.Fatalf("failed to declare queue: %v", err) + } + + // Publish multiple messages + messageCount := 10 + for i := 0; i < messageCount; i++ { + err = resp.Publish(queue.Name, amqp.Publishing{ + ContentType: "text/plain", + Body: []byte("Message " + string(rune(i+'0'))), + }) + if err != nil { + t.Fatalf("failed to publish message %d: %v", i, err) + } + } + + // Verify all messages were published + msgs, err := channel.Consume(queue.Name, "", true, false, false, false, nil) + if err != nil { + t.Fatalf("failed to consume messages: %v", err) + } + + received := 0 + timeout := time.After(10 * time.Second) + + for received < messageCount { + select { + case <-msgs: + received++ + case <-timeout: + t.Fatalf("timeout waiting for messages, received %d out of %d", received, messageCount) + } + } +} + +// TestResponder_PublishErrorToResponseQueue tests publishing error messages. +func TestResponder_PublishErrorToResponseQueue(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + conn, resp := setupResponderTest(t) + defer func() { + if err := resp.Close(); err != nil { + t.Logf("failed to close responder: %v", err) + } + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + // Create a test response queue + channel := rabbitmq.NewRabbitMQChannel(conn) + queueName := "test_error_response_" + time.Now().Format("20060102150405") + queue, err := channel.QueueDeclare(queueName, false, true, false, false, nil) + if err != nil { + t.Fatalf("failed to declare queue: %v", err) + } + + // Publish an error message + messageType := constants.QueueMessageTypeTask + messageID := "test-message-id" + testError := errors.ErrInvalidLanguageType + + resp.PublishErrorToResponseQueue(messageType, messageID, queue.Name, testError) + + // Consume and verify the error message + msgs, err := channel.Consume(queue.Name, "", true, false, false, false, nil) + if err != nil { + t.Fatalf("failed to consume messages: %v", err) + } + + select { + case msg := <-msgs: + var responseMsg messages.ResponseQueueMessage + if err := json.Unmarshal(msg.Body, &responseMsg); err != nil { + t.Fatalf("failed to unmarshal response message: %v", err) + } + + if responseMsg.Type != messageType { + t.Fatalf("expected message type %s, got %s", messageType, responseMsg.Type) + } + if responseMsg.MessageID != messageID { + t.Fatalf("expected message id %s, got %s", messageID, responseMsg.MessageID) + } + if responseMsg.Ok { + t.Fatal("expected Ok to be false for error message") + } + + var errorPayload map[string]string + if err := json.Unmarshal(responseMsg.Payload, &errorPayload); err != nil { + t.Fatalf("failed to unmarshal error payload: %v", err) + } + + if errorPayload["error"] != testError.Error() { + t.Fatalf("expected error %s, got %s", testError.Error(), errorPayload["error"]) + } + + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for error message") + } +} + +// TestResponder_PublishSuccessHandshakeRespond tests publishing handshake response. +func TestResponder_PublishSuccessHandshakeRespond(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + conn, resp := setupResponderTest(t) + defer func() { + if err := resp.Close(); err != nil { + t.Logf("failed to close responder: %v", err) + } + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + // Create a test response queue + channel := rabbitmq.NewRabbitMQChannel(conn) + queueName := "test_handshake_" + time.Now().Format("20060102150405") + queue, err := channel.QueueDeclare(queueName, false, true, false, false, nil) + if err != nil { + t.Fatalf("failed to declare queue: %v", err) + } + + // Create test language specs + languageSpecs := []languages.LanguageSpec{ + {LanguageName: "cpp", Versions: []string{"17"}, Extension: "cpp"}, + {LanguageName: "python", Versions: []string{"3.11"}, Extension: "py"}, + } + + messageType := constants.QueueMessageTypeHandshake + messageID := "handshake-message-id" + + // Publish handshake response + err = resp.PublishSuccessHandshakeRespond(messageType, messageID, queue.Name, languageSpecs) + if err != nil { + t.Fatalf("failed to publish handshake response: %v", err) + } + + // Consume and verify the response + msgs, err := channel.Consume(queue.Name, "", true, false, false, false, nil) + if err != nil { + t.Fatalf("failed to consume messages: %v", err) + } + + select { + case msg := <-msgs: + var responseMsg messages.ResponseQueueMessage + if err := json.Unmarshal(msg.Body, &responseMsg); err != nil { + t.Fatalf("failed to unmarshal response message: %v", err) + } + + if responseMsg.Type != messageType { + t.Fatalf("expected message type %s, got %s", messageType, responseMsg.Type) + } + if responseMsg.MessageID != messageID { + t.Fatalf("expected message id %s, got %s", messageID, responseMsg.MessageID) + } + if !responseMsg.Ok { + t.Fatal("expected Ok to be true for success message") + } + + var handshakePayload struct { + Languages []languages.LanguageSpec `json:"languages"` + } + if err := json.Unmarshal(responseMsg.Payload, &handshakePayload); err != nil { + t.Fatalf("failed to unmarshal handshake payload: %v", err) + } + + if len(handshakePayload.Languages) != len(languageSpecs) { + t.Fatalf("expected %d languages, got %d", len(languageSpecs), len(handshakePayload.Languages)) + } + + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for handshake response") + } +} + +// TestResponder_PublishSuccessStatusRespond tests publishing status response. +func TestResponder_PublishSuccessStatusRespond(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + conn, resp := setupResponderTest(t) + defer func() { + if err := resp.Close(); err != nil { + t.Logf("failed to close responder: %v", err) + } + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + // Create a test response queue + channel := rabbitmq.NewRabbitMQChannel(conn) + queueName := "test_status_" + time.Now().Format("20060102150405") + queue, err := channel.QueueDeclare(queueName, false, true, false, false, nil) + if err != nil { + t.Fatalf("failed to declare queue: %v", err) + } + + // Create test status + status := messages.ResponseWorkerStatusPayload{ + BusyWorkers: 2, + TotalWorkers: 5, + WorkerStatus: []messages.WorkerStatus{ + {WorkerID: 1, Status: constants.WorkerStatusBusy, ProcessingMessageID: "msg-1"}, + {WorkerID: 2, Status: constants.WorkerStatusIdle, ProcessingMessageID: ""}, + }, + } + + messageType := constants.QueueMessageTypeStatus + messageID := "status-message-id" + + // Publish status response + err = resp.PublishSuccessStatusRespond(messageType, messageID, queue.Name, status) + if err != nil { + t.Fatalf("failed to publish status response: %v", err) + } + + // Consume and verify the response + msgs, err := channel.Consume(queue.Name, "", true, false, false, false, nil) + if err != nil { + t.Fatalf("failed to consume messages: %v", err) + } + + select { + case msg := <-msgs: + var responseMsg messages.ResponseQueueMessage + if err := json.Unmarshal(msg.Body, &responseMsg); err != nil { + t.Fatalf("failed to unmarshal response message: %v", err) + } + + if responseMsg.Type != messageType { + t.Fatalf("expected message type %s, got %s", messageType, responseMsg.Type) + } + if responseMsg.MessageID != messageID { + t.Fatalf("expected message id %s, got %s", messageID, responseMsg.MessageID) + } + if !responseMsg.Ok { + t.Fatal("expected Ok to be true for success message") + } + + var statusPayload messages.ResponseWorkerStatusPayload + if err := json.Unmarshal(responseMsg.Payload, &statusPayload); err != nil { + t.Fatalf("failed to unmarshal status payload: %v", err) + } + + if statusPayload.BusyWorkers != status.BusyWorkers { + t.Fatalf("expected busy workers %d, got %d", status.BusyWorkers, statusPayload.BusyWorkers) + } + if statusPayload.TotalWorkers != status.TotalWorkers { + t.Fatalf("expected total workers %d, got %d", status.TotalWorkers, statusPayload.TotalWorkers) + } + + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for status response") + } +} + +// TestResponder_PublishPayloadTaskRespond tests publishing task result response. +func TestResponder_PublishPayloadTaskRespond(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + conn, resp := setupResponderTest(t) + defer func() { + if err := resp.Close(); err != nil { + t.Logf("failed to close responder: %v", err) + } + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + // Create a test response queue + channel := rabbitmq.NewRabbitMQChannel(conn) + queueName := "test_task_result_" + time.Now().Format("20060102150405") + queue, err := channel.QueueDeclare(queueName, false, true, false, false, nil) + if err != nil { + t.Fatalf("failed to declare queue: %v", err) + } + + // Create test result + taskResult := solution.Result{ + StatusCode: solution.Success, + Message: constants.SolutionMessageSuccess, + TestResults: []solution.TestResult{ + { + Order: 1, + StatusCode: solution.TestCasePassed, + Passed: true, + ExecutionTime: 100.5, + ErrorMessage: "", + }, + }, + } + + messageType := constants.QueueMessageTypeTask + messageID := "task-result-message-id" + + // Publish task result response + err = resp.PublishPayloadTaskRespond(messageType, messageID, queue.Name, taskResult) + if err != nil { + t.Fatalf("failed to publish task result response: %v", err) + } + + // Consume and verify the response + msgs, err := channel.Consume(queue.Name, "", true, false, false, false, nil) + if err != nil { + t.Fatalf("failed to consume messages: %v", err) + } + + select { + case msg := <-msgs: + var responseMsg messages.ResponseQueueMessage + if err := json.Unmarshal(msg.Body, &responseMsg); err != nil { + t.Fatalf("failed to unmarshal response message: %v", err) + } + + if responseMsg.Type != messageType { + t.Fatalf("expected message type %s, got %s", messageType, responseMsg.Type) + } + if responseMsg.MessageID != messageID { + t.Fatalf("expected message id %s, got %s", messageID, responseMsg.MessageID) + } + if !responseMsg.Ok { + t.Fatal("expected Ok to be true for success message") + } + + var resultPayload solution.Result + if err := json.Unmarshal(responseMsg.Payload, &resultPayload); err != nil { + t.Fatalf("failed to unmarshal result payload: %v", err) + } + + if resultPayload.Message != taskResult.Message { + t.Fatalf("expected message %s, got %s", taskResult.Message, resultPayload.Message) + } + if resultPayload.StatusCode != taskResult.StatusCode { + t.Fatalf("expected status code %d, got %d", taskResult.StatusCode, resultPayload.StatusCode) + } + + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for task result response") + } +} + +// TestResponder_Close tests closing the responder. +func TestResponder_Close(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + conn, resp := setupResponderTest(t) + defer func() { + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + // Close the responder + err := resp.Close() + if err != nil { + t.Fatalf("failed to close responder: %v", err) + } + + // Try to close again - should return error + err = resp.Close() + if err != errors.ErrResponderClosed { + t.Fatalf("expected ErrResponderClosed, got %v", err) + } +} + +// TestResponder_PublishAfterClose tests publishing after closing responder. +func TestResponder_PublishAfterClose(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + conn, resp := setupResponderTest(t) + defer func() { + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + // Close the responder + err := resp.Close() + if err != nil { + t.Fatalf("failed to close responder: %v", err) + } + + // Try to publish after closing + err = resp.Publish("test_queue", amqp.Publishing{ + ContentType: "text/plain", + Body: []byte("test"), + }) + + if err != errors.ErrResponderClosed { + t.Fatalf("expected ErrResponderClosed, got %v", err) + } +} + +// TestResponder_ConcurrentPublish tests concurrent publishing. +func TestResponder_ConcurrentPublish(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + conn, resp := setupResponderTest(t) + defer func() { + if err := resp.Close(); err != nil { + t.Logf("failed to close responder: %v", err) + } + if err := conn.Close(); err != nil { + t.Logf("failed to close connection: %v", err) + } + }() + + // Create a test queue + channel := rabbitmq.NewRabbitMQChannel(conn) + queueName := "test_concurrent_" + time.Now().Format("20060102150405") + queue, err := channel.QueueDeclare(queueName, false, true, false, false, nil) + if err != nil { + t.Fatalf("failed to declare queue: %v", err) + } + + // Publish messages concurrently + goroutineCount := 10 + messagesPerGoroutine := 5 + totalMessages := goroutineCount * messagesPerGoroutine + + done := make(chan bool, goroutineCount) + for i := 0; i < goroutineCount; i++ { + go func(id int) { + for j := 0; j < messagesPerGoroutine; j++ { + err := resp.Publish(queue.Name, amqp.Publishing{ + ContentType: "text/plain", + Body: []byte("Message from goroutine " + string(rune(id+'0'))), + }) + if err != nil { + t.Errorf("failed to publish message: %v", err) + } + } + done <- true + }(i) + } + + // Wait for all goroutines to finish + for i := 0; i < goroutineCount; i++ { + <-done + } + + // Verify all messages were published + msgs, err := channel.Consume(queue.Name, "", true, false, false, false, nil) + if err != nil { + t.Fatalf("failed to consume messages: %v", err) + } + + received := 0 + timeout := time.After(15 * time.Second) + + for received < totalMessages { + select { + case <-msgs: + received++ + case <-timeout: + t.Fatalf("timeout waiting for messages, received %d out of %d", received, totalMessages) + } + } + + if received != totalMessages { + t.Fatalf("expected %d messages, got %d", totalMessages, received) + } +} diff --git a/run_integration_tests.sh b/run_integration_tests.sh index cff8607..5737e6b 100755 --- a/run_integration_tests.sh +++ b/run_integration_tests.sh @@ -4,6 +4,7 @@ set -e TEST_SUITES=( "./internal/docker/..." + "./internal/rabbitmq/..." ) SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" @@ -37,9 +38,32 @@ fi echo "[OK] Docker daemon is accessible" echo "" +# Start RabbitMQ container +echo "[*] Starting RabbitMQ container..." +cd "$SCRIPT_DIR" + +docker-compose -f docker-compose.test.yml up -d rabbitmq + +# Wait for RabbitMQ to be ready +echo "[*] Waiting for RabbitMQ to be ready..." +max_attempts=30 +attempt=0 +until docker exec rabbitmq rabbitmqctl status &>/dev/null; do + attempt=$((attempt + 1)) + if [ $attempt -ge $max_attempts ]; then + echo "[ERROR] RabbitMQ failed to start after $max_attempts attempts" + docker-compose -f docker-compose.test.yml down + exit 1 + fi + echo "Waiting for RabbitMQ... (attempt $attempt/$max_attempts)" + sleep 1 +done + +echo "[OK] RabbitMQ is ready" +echo "" + # Run the tests echo "[*] Running integration tests..." -cd "$SCRIPT_DIR" TEST_FLAGS="-v -tags=integration -timeout 5m" @@ -72,4 +96,9 @@ else echo "[ERROR] Tests failed with exit code $EXIT_CODE" fi +# Cleanup: Stop RabbitMQ container +echo "" +echo "[*] Stopping RabbitMQ container..." +docker-compose -f docker-compose.test.yml down + exit $EXIT_CODE From 1db39e0258313018c3afc88ad4b19e1ead5947c5 Mon Sep 17 00:00:00 2001 From: Matios102 Date: Sun, 11 Jan 2026 11:12:52 +0100 Subject: [PATCH 5/5] feat: add RabbitMQ service configuration to publish coverage workflow --- .github/workflows/publish-coverage.yaml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/workflows/publish-coverage.yaml b/.github/workflows/publish-coverage.yaml index bd77573..1e77a19 100644 --- a/.github/workflows/publish-coverage.yaml +++ b/.github/workflows/publish-coverage.yaml @@ -21,6 +21,16 @@ jobs: --health-retries=5 volumes: - /var/run/docker.sock:/var/run/docker.sock + rabbitmq: + image: rabbitmq:3.13-management + ports: + - 5672:5672 + - 15672:15672 + options: >- + --health-cmd="rabbitmq-diagnostics -q ping" + --health-interval=10s + --health-timeout=5s + --health-retries=5 steps: - name: Checkout uses: actions/checkout@v4