From 16cfb77088b2af41c97bfc8568e4dcb10d9952e0 Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Wed, 1 Oct 2025 18:45:57 +0900 Subject: [PATCH] guestagent: k8s: use kubectl instead of client-go Part of issue 3237 TODO: drop dependency on k8s.io/api Signed-off-by: Akihiro Suda --- go.mod | 14 -- go.sum | 32 --- .../kubernetesservice/kubernetesservice.go | 210 ++++++++++++------ .../kubernetesservice_test.go | 61 ++--- 4 files changed, 175 insertions(+), 142 deletions(-) diff --git a/go.mod b/go.mod index 07c4ba3f569..d4bd59b9844 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,6 @@ require ( gotest.tools/v3 v3.5.2 k8s.io/api v0.34.2 k8s.io/apimachinery v0.34.2 - k8s.io/client-go v0.34.2 ) require ( @@ -72,26 +71,19 @@ require ( github.com/dimchansky/utfbom v1.1.1 // indirect github.com/djherbis/times v1.6.0 // indirect github.com/elliotchance/orderedmap v1.8.0 // indirect - github.com/emicklei/go-restful/v3 v3.12.2 // indirect github.com/fatih/color v1.18.0 // indirect // gomodjail:unconfined github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect - github.com/go-openapi/jsonpointer v0.21.0 // indirect - github.com/go-openapi/jsonreference v0.21.0 // indirect - github.com/go-openapi/swag v0.23.0 // indirect github.com/goccy/go-json v0.10.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/btree v1.1.3 // indirect - github.com/google/gnostic-models v0.7.0 // indirect github.com/google/gopacket v1.1.19 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect // gomodjail:unconfined github.com/insomniacslk/dhcp v0.0.0-20240710054256-ddd8a41251c9 // indirect github.com/jinzhu/copier v0.4.0 // indirect - github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kr/fs v0.1.0 // indirect @@ -105,7 +97,6 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -124,25 +115,20 @@ require ( golang.org/x/time v0.9.0 // indirect golang.org/x/tools v0.38.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect - gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect // gomodjail:unconfined gvisor.dev/gvisor v0.0.0-20240916094835-a174eb65023f // indirect k8s.io/klog/v2 v2.130.1 // indirect - k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect - sigs.k8s.io/yaml v1.6.0 // indirect ) require ( github.com/go-ini/ini v1.67.0 // indirect github.com/google/jsonschema-go v0.3.0 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect - go.yaml.in/yaml/v3 v3.0.4 // indirect go.yaml.in/yaml/v4 v4.0.0-rc.3 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect diff --git a/go.sum b/go.sum index 4ece814744e..d08e977b258 100644 --- a/go.sum +++ b/go.sum @@ -73,8 +73,6 @@ github.com/elliotchance/orderedmap v1.8.0 h1:TrOREecvh3JbS+NCgwposXG5ZTFHtEsQiCG github.com/elliotchance/orderedmap v1.8.0/go.mod h1:wsDwEaX5jEoyhbs7x93zk2H/qv0zwuhg4inXhDkYqys= github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab h1:h1UgjJdAAhj+uPL68n7XASS6bU+07ZX1WJvVS2eyoeY= github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab/go.mod h1:GLo/8fDswSAniFG+BFIaiSPcK610jyzgEhWYPQwuQdw= -github.com/emicklei/go-restful/v3 v3.12.2 h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf9B/a0/xU= -github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/foxcpp/go-mockdns v1.1.0 h1:jI0rD8M0wuYAxL7r/ynTrCQQq0BVqfB99Vgk7DlmewI= @@ -90,16 +88,8 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= -github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= -github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= -github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= -github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= -github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= github.com/go-quicktest/qt v1.101.1-0.20240301121107-c6c8733fa1e6 h1:teYtXy9B7y5lHTp8V9KPxpYRAVA7dozigQcMiBust1s= github.com/go-quicktest/qt v1.101.1-0.20240301121107-c6c8733fa1e6/go.mod h1:p4lGIVX+8Wa6ZPNDvqcxq36XpUDLh42FLetFU7odllI= -github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= -github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM= github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= @@ -112,8 +102,6 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= -github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo= -github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -121,8 +109,6 @@ github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/google/jsonschema-go v0.3.0 h1:6AH2TxVNtk3IlvkkhjrtbUc4S8AvO0Xii0DxIygDg+Q= github.com/google/jsonschema-go v0.3.0/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE= -github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgYQBbFN4U4JNXUNYpxael3UzMyo= -github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -141,8 +127,6 @@ github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcI github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= -github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= -github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/jsimonetti/rtnetlink v1.3.5 h1:hVlNQNRlLDGZz31gBPicsG7Q53rnlsz1l1Ix/9XlpVA= @@ -207,14 +191,10 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= -github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= -github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= github.com/onsi/gomega v1.38.0 h1:c/WX+w8SLAinvuKKQFh77WEucCnPk4j2OTUr7lt7BeY= github.com/onsi/gomega v1.38.0/go.mod h1:OcXcwId0b9QsE7Y49u+BTrL4IdKOBOKnD6VQNTJEB6o= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -258,8 +238,6 @@ github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= -github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -293,12 +271,8 @@ go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6 go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= -go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= -go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= go.yaml.in/yaml/v4 v4.0.0-rc.3 h1:3h1fjsh1CTAPjW7q/EMe+C8shx5d8ctzZTrLcs/j8Go= go.yaml.in/yaml/v4 v4.0.0-rc.3/go.mod h1:aZqd9kCMsGL7AuUv/m/PvWLdg5sjJsZ4oHDEnfPPfY0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -418,8 +392,6 @@ google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4= -gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 h1:6D+BvnJ/j6e222UW8s2qTSe3wGBtvo0MbVQG/c5k8RE= @@ -437,12 +409,8 @@ k8s.io/api v0.34.2 h1:fsSUNZhV+bnL6Aqrp6O7lMTy6o5x2C4XLjnh//8SLYY= k8s.io/api v0.34.2/go.mod h1:MMBPaWlED2a8w4RSeanD76f7opUoypY8TFYkSM+3XHw= k8s.io/apimachinery v0.34.2 h1:zQ12Uk3eMHPxrsbUJgNF8bTauTVR2WgqJsTmwTE/NW4= k8s.io/apimachinery v0.34.2/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= -k8s.io/client-go v0.34.2 h1:Co6XiknN+uUZqiddlfAjT68184/37PS4QAzYvQvDR8M= -k8s.io/client-go v0.34.2/go.mod h1:2VYDl1XXJsdcAxw7BenFslRQX28Dxz91U9MWKjX97fE= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA= -k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b/go.mod h1:UZ2yyWbFTpuhSbFhv24aGNOdoRdJZgsIObGBUaYVsts= k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 h1:hwvWFiBzdWw1FhfY1FooPn3kzWuJ8tmbZBHi4zVsl1Y= k8s.io/utils v0.0.0-20250604170112-4c0f3b243397/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE= diff --git a/pkg/guestagent/kubernetesservice/kubernetesservice.go b/pkg/guestagent/kubernetesservice/kubernetesservice.go index 133174af634..9f2e322d23c 100644 --- a/pkg/guestagent/kubernetesservice/kubernetesservice.go +++ b/pkg/guestagent/kubernetesservice/kubernetesservice.go @@ -4,23 +4,24 @@ package kubernetesservice import ( + "bufio" + "bytes" "context" + "encoding/json" "errors" "fmt" + "io" "net" - "net/url" "os" + "os/exec" "strings" "sync" "time" + "github.com/docker/go-units" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/clientcmd" + "k8s.io/apimachinery/pkg/watch" ) type Protocol string @@ -37,114 +38,185 @@ type Entry struct { } type ServiceWatcher struct { - rwMutex sync.RWMutex - serviceInformer cache.SharedIndexInformer + rwMutex sync.RWMutex + // key: namespace/name + serviceSpecs map[string]*corev1.ServiceSpec } func NewServiceWatcher() *ServiceWatcher { - return &ServiceWatcher{} -} - -func (s *ServiceWatcher) setServiceInformer(serviceInformer cache.SharedIndexInformer) { - s.rwMutex.Lock() - defer s.rwMutex.Unlock() - s.serviceInformer = serviceInformer -} - -func (s *ServiceWatcher) getServiceInformer() cache.SharedIndexInformer { - s.rwMutex.RLock() - defer s.rwMutex.RUnlock() - return s.serviceInformer + return &ServiceWatcher{serviceSpecs: make(map[string]*corev1.ServiceSpec)} } func (s *ServiceWatcher) Start(ctx context.Context) { logrus.Info("Monitoring kubernetes services") + go s.loopAttemptToStartKubectl(ctx) +} + +func (s *ServiceWatcher) loopAttemptToStartKubectl(ctx context.Context) { const retryInterval = 10 * time.Second - const pollImmediately = false - _ = wait.PollUntilContextCancel(ctx, retryInterval, pollImmediately, func(ctx context.Context) (done bool, err error) { - kubeClient, err := tryGetKubeClient() - if err != nil { - logrus.Tracef("failed to get kube client: %v, will retry in %v", err, retryInterval) - return false, nil + + for i := 0; ; i++ { + // The first iteration does not need to wait for retryInterval. + if i > 0 { + select { + case <-ctx.Done(): + return + case <-time.After(retryInterval): + } } + s.attemptToStartKubectl(ctx) + } +} - informerFactory := informers.NewSharedInformerFactory(kubeClient, time.Hour) - serviceInformer := informerFactory.Core().V1().Services().Informer() - informerFactory.Start(ctx.Done()) - cache.WaitForCacheSync(ctx.Done(), serviceInformer.HasSynced) +func (s *ServiceWatcher) attemptToStartKubectl(ctx context.Context) { + kubectl, err := exec.LookPath("kubectl") + if err != nil { + logrus.WithError(err).Debugf("kubectl not available; will retry") + return + } + kubeconfig := chooseKubeconfig() + // TODO: ensure that kubeconfig points to a local cluster + if err := canGetServices(ctx, kubectl, kubeconfig); err != nil { + logrus.WithError(err).Debugf("kubectl auth can-i ... failed; will retry") + return + } - s.setServiceInformer(serviceInformer) - return true, nil - }) + cmd := exec.CommandContext(ctx, kubectl, + "get", "--all-namespaces", "service", "--watch", "--output-watch-events", "--output", "json") + if kubeconfig != "" { + cmd.Env = append(os.Environ(), "KUBECONFIG="+kubeconfig) + } + if err := s.startAndStreamKubectl(cmd); err != nil { + logrus.WithError(err).Warn("kubectl watch failed; will retry") + } } -func tryGetKubeClient() (kubernetes.Interface, error) { +func chooseKubeconfig() string { + if kubeconfig := os.Getenv("KUBECONFIG"); kubeconfig != "" { + return kubeconfig + } candidateKubeConfigs := []string{ "/etc/rancher/k3s/k3s.yaml", "/root/.kube/config.localhost", // Created by template:k8s "/root/.kube/config", } + for _, kc := range candidateKubeConfigs { + if _, err := os.Stat(kc); !errors.Is(err, os.ErrNotExist) { + return kc + } + } + return "" +} - for _, kubeconfig := range candidateKubeConfigs { - _, err := os.Stat(kubeconfig) - if err != nil { - if os.IsNotExist(err) { - continue - } +func canGetServices(ctx context.Context, kubectl, kubeconfig string) error { + cmd := exec.CommandContext(ctx, kubectl, "auth", "can-i", "get", "service") + if kubeconfig != "" { + cmd.Env = append(os.Environ(), "KUBECONFIG="+kubeconfig) + } + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to run %v: %w; stdout=%q, stderr=%q", cmd.Args, err, stdout.String(), stderr.String()) + } + if strings.TrimSpace(stdout.String()) != "yes" { + return fmt.Errorf("failed to run %v: expected \"yes\", got %q", cmd.Args, stdout.String()) + } + return nil +} - return nil, fmt.Errorf("stat kubeconfig %s failed: %w", kubeconfig, err) - } +func (s *ServiceWatcher) startAndStreamKubectl(cmd *exec.Cmd) error { + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + + var stderr bytes.Buffer + cmd.Stderr = &stderr - restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - return nil, fmt.Errorf("build kubeconfig from %s failed: %w", kubeconfig, err) + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to run %v: %w; stderr=%q", cmd.Args, err, stderr.String()) + } + + readErr := s.readKubectlStream(stdout) + waitErr := cmd.Wait() + if waitErr != nil { + waitErr = fmt.Errorf("failed to run %v: %w; stderr=%q", cmd.Args, waitErr, stderr.String()) + } + return errors.Join(readErr, waitErr) +} + +// readKubectlStream reads kubectl JSON watch events from r and updates the internal +// services map. The stream is newline-delimited JSON objects representing "WatchEvent". +func (s *ServiceWatcher) readKubectlStream(r io.Reader) error { + scanner := bufio.NewScanner(r) + // increase buffer in case of large JSON objects + const maxBuf = 10 * units.MiB + buf := make([]byte, 0, 64*units.KiB) + scanner.Buffer(buf, maxBuf) + + for scanner.Scan() { + line := scanner.Bytes() + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue } - u, err := url.Parse(restConfig.Host) - if err != nil { - return nil, fmt.Errorf("parse kubeconfig host %s failed: %w", restConfig.Host, err) + + var ev struct { + Type watch.EventType `json:"type"` + Object json.RawMessage `json:"object"` } - if u.Hostname() != "127.0.0.1" { // might need to support IPv6 - // ensures the kubeconfig points to local k8s - continue + if err := json.Unmarshal(line, &ev); err != nil { + return fmt.Errorf("failed to unmarshal line %q: %w", string(line), err) } - kubeClient, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return nil, err + var svc corev1.Service + if err := json.Unmarshal(ev.Object, &svc); err != nil { + return fmt.Errorf("failed to unmarshal service object: %w (line=%q)", err, line) } - return kubeClient, nil + key := svc.Namespace + "/" + svc.Name + s.rwMutex.Lock() + switch ev.Type { + case watch.Added, watch.Modified: + s.serviceSpecs[key] = &svc.Spec + case watch.Deleted: + delete(s.serviceSpecs, key) + default: + // NOP + } + s.rwMutex.Unlock() } - return nil, errors.New("no valid kubeconfig found") + if err := scanner.Err(); err != nil { + return fmt.Errorf("failed to scan kubectl event stream: %w", err) + } + return nil } func (s *ServiceWatcher) GetPorts() []Entry { - serviceInformer := s.getServiceInformer() - if serviceInformer == nil { - return nil - } + s.rwMutex.RLock() + defer s.rwMutex.RUnlock() var entries []Entry - for _, obj := range serviceInformer.GetStore().List() { - service := obj.(*corev1.Service) - if service.Spec.Type != corev1.ServiceTypeNodePort && - service.Spec.Type != corev1.ServiceTypeLoadBalancer { + for key, spec := range s.serviceSpecs { + if spec.Type != corev1.ServiceTypeNodePort && + spec.Type != corev1.ServiceTypeLoadBalancer { continue } - for _, portEntry := range service.Spec.Ports { + for _, portEntry := range spec.Ports { switch portEntry.Protocol { case corev1.ProtocolTCP, corev1.ProtocolUDP: // NOP default: - logrus.Debugf("unsupported protocol %s for service %s/%s, skipping", - portEntry.Protocol, service.Namespace, service.Name) + logrus.Debugf("unsupported protocol %s for service %q, skipping", + portEntry.Protocol, key) continue } var port int32 - switch service.Spec.Type { + switch spec.Type { case corev1.ServiceTypeNodePort: port = portEntry.NodePort case corev1.ServiceTypeLoadBalancer: diff --git a/pkg/guestagent/kubernetesservice/kubernetesservice_test.go b/pkg/guestagent/kubernetesservice/kubernetesservice_test.go index e8e970a576f..2ad3439fff5 100644 --- a/pkg/guestagent/kubernetesservice/kubernetesservice_test.go +++ b/pkg/guestagent/kubernetesservice/kubernetesservice_test.go @@ -4,39 +4,18 @@ package kubernetesservice import ( - "context" "net" + "strings" "testing" "gotest.tools/v3/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/informers" - clientSet "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/cache" ) -func newFakeKubeClient() (clientSet.Interface, informers.SharedInformerFactory) { - kubeClient := fake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) - return kubeClient, informerFactory -} - func TestGetPorts(t *testing.T) { - ctx, cancel := context.WithCancel(t.Context()) - defer cancel() - serviceCreatedCh := make(chan struct{}, 1) - kubeClient, informerFactory := newFakeKubeClient() - serviceInformer := informerFactory.Core().V1().Services().Informer() - _, err := serviceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(any) { serviceCreatedCh <- struct{}{} }, - }) - assert.NilError(t, err) - informerFactory.Start(ctx.Done()) serviceWatcher := NewServiceWatcher() - serviceWatcher.setServiceInformer(serviceInformer) type testCase struct { name string @@ -121,15 +100,43 @@ func TestGetPorts(t *testing.T) { } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - _, err := kubeClient.CoreV1().Services("default").Create(ctx, &c.service, metav1.CreateOptions{}) - assert.NilError(t, err, "failed to create service [%s]", c.service.Name) + const ns = "default" + service := c.service + service.Namespace = ns + key := ns + "/" + c.service.Name - <-serviceCreatedCh + serviceWatcher.rwMutex.Lock() + serviceWatcher.serviceSpecs[key] = &service.Spec + serviceWatcher.rwMutex.Unlock() got := serviceWatcher.GetPorts() assert.DeepEqual(t, got, c.want) - err = kubeClient.CoreV1().Services("default").Delete(ctx, c.service.Name, metav1.DeleteOptions{}) - assert.NilError(t, err, "failed to delete service [%s]", c.service.Name) + + serviceWatcher.rwMutex.Lock() + delete(serviceWatcher.serviceSpecs, key) + serviceWatcher.rwMutex.Unlock() }) } } + +func TestReadKubectlStream(t *testing.T) { + stream := `{"type":"ADDED","object":{"apiVersion":"v1","kind":"Service","metadata":{"creationTimestamp":"2025-09-30T13:39:19Z","labels":{"component":"apiserver","provider":"kubernetes"},"managedFields":[{"apiVersion":"v1","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:labels":{".":{},"f:component":{},"f:provider":{}}},"f:spec":{"f:clusterIP":{},"f:internalTrafficPolicy":{},"f:ipFamilyPolicy":{},"f:ports":{".":{},"k:{\"port\":443,\"protocol\":\"TCP\"}":{".":{},"f:name":{},"f:port":{},"f:protocol":{},"f:targetPort":{}}},"f:sessionAffinity":{},"f:type":{}}},"manager":"kube-apiserver","operation":"Update","time":"2025-09-30T13:39:19Z"}],"name":"kubernetes","namespace":"default","resourceVersion":"201","uid":"de36e2fb-3883-47f2-860a-c28dfd896bac"},"spec":{"clusterIP":"10.96.0.1","clusterIPs":["10.96.0.1"],"internalTrafficPolicy":"Cluster","ipFamilies":["IPv4"],"ipFamilyPolicy":"SingleStack","ports":[{"name":"https","port":443,"protocol":"TCP","targetPort":6443}],"sessionAffinity":"None","type":"ClusterIP"},"status":{"loadBalancer":{}}}} +{"type":"ADDED","object":{"apiVersion":"v1","kind":"Service","metadata":{"annotations":{"prometheus.io/port":"9153","prometheus.io/scrape":"true"},"creationTimestamp":"2025-09-30T13:39:19Z","labels":{"k8s-app":"kube-dns","kubernetes.io/cluster-service":"true","kubernetes.io/name":"CoreDNS"},"managedFields":[{"apiVersion":"v1","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:annotations":{".":{},"f:prometheus.io/port":{},"f:prometheus.io/scrape":{}},"f:labels":{".":{},"f:k8s-app":{},"f:kubernetes.io/cluster-service":{},"f:kubernetes.io/name":{}}},"f:spec":{"f:clusterIP":{},"f:internalTrafficPolicy":{},"f:ports":{".":{},"k:{\"port\":53,\"protocol\":\"TCP\"}":{".":{},"f:name":{},"f:port":{},"f:protocol":{},"f:targetPort":{}},"k:{\"port\":53,\"protocol\":\"UDP\"}":{".":{},"f:name":{},"f:port":{},"f:protocol":{},"f:targetPort":{}},"k:{\"port\":9153,\"protocol\":\"TCP\"}":{".":{},"f:name":{},"f:port":{},"f:protocol":{},"f:targetPort":{}}},"f:selector":{},"f:sessionAffinity":{},"f:type":{}}},"manager":"kubeadm","operation":"Update","time":"2025-09-30T13:39:19Z"}],"name":"kube-dns","namespace":"kube-system","resourceVersion":"240","uid":"73817952-5f95-4a15-a6f9-64ba220a6933"},"spec":{"clusterIP":"10.96.0.10","clusterIPs":["10.96.0.10"],"internalTrafficPolicy":"Cluster","ipFamilies":["IPv4"],"ipFamilyPolicy":"SingleStack","ports":[{"name":"dns","port":53,"protocol":"UDP","targetPort":53},{"name":"dns-tcp","port":53,"protocol":"TCP","targetPort":53},{"name":"metrics","port":9153,"protocol":"TCP","targetPort":9153}],"selector":{"k8s-app":"kube-dns"},"sessionAffinity":"None","type":"ClusterIP"},"status":{"loadBalancer":{}}}} +{"type":"ADDED","object":{"apiVersion":"v1","kind":"Service","metadata":{"creationTimestamp":"2025-10-01T09:20:14Z","labels":{"app":"nginx"},"managedFields":[{"apiVersion":"v1","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:labels":{".":{},"f:app":{}}},"f:spec":{"f:externalTrafficPolicy":{},"f:internalTrafficPolicy":{},"f:ports":{".":{},"k:{\"port\":80,\"protocol\":\"TCP\"}":{".":{},"f:port":{},"f:protocol":{},"f:targetPort":{}}},"f:selector":{},"f:sessionAffinity":{},"f:type":{}}},"manager":"kubectl-expose","operation":"Update","time":"2025-10-01T09:20:14Z"}],"name":"nginx","namespace":"default","resourceVersion":"7178","uid":"d994b815-78e4-4f42-887c-abd00ff78425"},"spec":{"clusterIP":"10.104.89.81","clusterIPs":["10.104.89.81"],"externalTrafficPolicy":"Cluster","internalTrafficPolicy":"Cluster","ipFamilies":["IPv4"],"ipFamilyPolicy":"SingleStack","ports":[{"nodePort":30369,"port":80,"protocol":"TCP","targetPort":80}],"selector":{"app":"nginx"},"sessionAffinity":"None","type":"NodePort"},"status":{"loadBalancer":{}}}} +{"type":"DELETED","object":{"apiVersion":"v1","kind":"Service","metadata":{"creationTimestamp":"2025-10-01T09:20:14Z","labels":{"app":"nginx"},"managedFields":[{"apiVersion":"v1","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:labels":{".":{},"f:app":{}}},"f:spec":{"f:externalTrafficPolicy":{},"f:internalTrafficPolicy":{},"f:ports":{".":{},"k:{\"port\":80,\"protocol\":\"TCP\"}":{".":{},"f:port":{},"f:protocol":{},"f:targetPort":{}}},"f:selector":{},"f:sessionAffinity":{},"f:type":{}}},"manager":"kubectl-expose","operation":"Update","time":"2025-10-01T09:20:14Z"}],"name":"nginx","namespace":"default","resourceVersion":"8036","uid":"d994b815-78e4-4f42-887c-abd00ff78425"},"spec":{"clusterIP":"10.104.89.81","clusterIPs":["10.104.89.81"],"externalTrafficPolicy":"Cluster","internalTrafficPolicy":"Cluster","ipFamilies":["IPv4"],"ipFamilyPolicy":"SingleStack","ports":[{"nodePort":30369,"port":80,"protocol":"TCP","targetPort":80}],"selector":{"app":"nginx"},"sessionAffinity":"None","type":"NodePort"},"status":{"loadBalancer":{}}}} +{"type":"ADDED","object":{"apiVersion":"v1","kind":"Service","metadata":{"creationTimestamp":"2025-10-01T09:37:23Z","labels":{"app":"nginx"},"managedFields":[{"apiVersion":"v1","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:labels":{".":{},"f:app":{}}},"f:spec":{"f:externalTrafficPolicy":{},"f:internalTrafficPolicy":{},"f:ports":{".":{},"k:{\"port\":80,\"protocol\":\"TCP\"}":{".":{},"f:port":{},"f:protocol":{},"f:targetPort":{}}},"f:selector":{},"f:sessionAffinity":{},"f:type":{}}},"manager":"kubectl-expose","operation":"Update","time":"2025-10-01T09:37:23Z"}],"name":"nginx","namespace":"default","resourceVersion":"8564","uid":"cf112753-1335-4edc-8374-1bfb5aceb41f"},"spec":{"clusterIP":"10.109.71.60","clusterIPs":["10.109.71.60"],"externalTrafficPolicy":"Cluster","internalTrafficPolicy":"Cluster","ipFamilies":["IPv4"],"ipFamilyPolicy":"SingleStack","ports":[{"nodePort":32762,"port":80,"protocol":"TCP","targetPort":80}],"selector":{"app":"nginx"},"sessionAffinity":"None","type":"NodePort"},"status":{"loadBalancer":{}}}} +` + + watcher := NewServiceWatcher() + err := watcher.readKubectlStream(strings.NewReader(stream)) + assert.NilError(t, err) + + got := watcher.GetPorts() + want := []Entry{{ + Protocol: TCP, + IP: net.IPv4zero, + Port: uint16(32762), + }} + + assert.DeepEqual(t, got, want) +}