From 9d00aa81099d051e2427a0a94c32c7d87a70eff7 Mon Sep 17 00:00:00 2001 From: Michael Khalturin Date: Mon, 26 Aug 2024 20:21:22 +0300 Subject: [PATCH 1/8] added mvp ping endpoint proxy --- main.go | 2 +- scope.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index abc787f5..b24da71e 100644 --- a/main.go +++ b/main.go @@ -237,7 +237,7 @@ func serveHTTP(rw http.ResponseWriter, r *http.Request) { } proxy.refreshCacheMetrics() promHandler.ServeHTTP(rw, r) - case "/", "/query": + case "/", "/query", "/ping": var err error // nolint:forcetypeassert // We will cover this by tests as we control what is stored. proxyHandler := proxyHandler.Load().(*ProxyHandler) diff --git a/scope.go b/scope.go index c3debad1..42551103 100644 --- a/scope.go +++ b/scope.go @@ -389,6 +389,13 @@ func (s *scope) decorateRequest(req *http.Request) (*http.Request, url.Values) { // Make new params to purify URL. params := make(url.Values) + // pass ping request + if req.RequestURI == "/ping" { + req.URL.Scheme = s.host.Scheme() + req.URL.Host = s.host.Host() + return req, req.URL.Query() + } + // Set user params if s.user.params != nil { for _, param := range s.user.params.params { From ef5d0207f13c5226283421dc3b5370e09dc4fa4d Mon Sep 17 00:00:00 2001 From: Michael Khalturin Date: Tue, 27 Aug 2024 09:48:25 +0300 Subject: [PATCH 2/8] added allow_ping config option --- config/README.md | 3 +++ config/config.go | 3 +++ config/examples/debug.yml | 52 +++++++++++++++++++++++++++++++++++++++ main.go | 33 ++++++++++++++++++++++++- 4 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 config/examples/debug.yml diff --git a/config/README.md b/config/README.md index 49bc85cc..bb2f27b2 100644 --- a/config/README.md +++ b/config/README.md @@ -16,6 +16,9 @@ log_debug: | default = false [optional] # Whether to ignore security warnings hack_me_please: | default = false [optional] +# Allow ping server +allow_ping: | default = false [optional] + # Named list of cache configurations caches: - ... diff --git a/config/config.go b/config/config.go index 7d32d08d..652cfe9b 100644 --- a/config/config.go +++ b/config/config.go @@ -78,6 +78,9 @@ type Config struct { ConnectionPool ConnectionPool `yaml:"connection_pool,omitempty"` + // Allow to proxy ping requests + AllowPing bool `yaml:"allow_ping,omitempty"` + networkReg map[string]Networks // Catches all undefined fields diff --git a/config/examples/debug.yml b/config/examples/debug.yml new file mode 100644 index 00000000..cb88dd05 --- /dev/null +++ b/config/examples/debug.yml @@ -0,0 +1,52 @@ +log_debug: true +hack_me_please: true +allow_ping: true + +caches: + - name: "longterm" + mode: "file_system" + file_system: + dir: "/tmp/chproxy/longterm/cache" + max_size: 10Gb + expire: 1h + shared_with_all_users: true + +max_error_reason_size: 10GB + +server: + http: + listen_addr: ":80" + idle_timeout: 20m + + https: + listen_addr: ":4433" + autocert: + cache_dir: "/tmp/chproxy/certs" + proxy: + enable: true + header: CF-Connecting-IP + +users: + - name: "*" + to_cluster: "dl" + to_user: "*" + is_wildcarded: true + max_concurrent_queries: 4 + max_execution_time: 15m + deny_https: false + cache: "longterm" + +clusters: + - name: "dl" + scheme: "http" + nodes: ["localhost:28123"] + # heartbeat: + # interval: 1m + # timeout: 10s + # request: "/?query=SELECT%201%2B1" + # q response: "2\n" + kill_query_user: + name: "chproxy" + password: "chproxy" + users: + - name: "*" diff --git a/main.go b/main.go index b24da71e..ae19ce03 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,7 @@ var ( allowedNetworksHTTPS atomic.Value allowedNetworksMetrics atomic.Value proxyHandler atomic.Value + allowPing atomic.Bool ) func main() { @@ -237,12 +238,41 @@ func serveHTTP(rw http.ResponseWriter, r *http.Request) { } proxy.refreshCacheMetrics() promHandler.ServeHTTP(rw, r) - case "/", "/query", "/ping": + case "/", "/query": var err error // nolint:forcetypeassert // We will cover this by tests as we control what is stored. proxyHandler := proxyHandler.Load().(*ProxyHandler) r.RemoteAddr = proxyHandler.GetRemoteAddr(r) + var an *config.Networks + if r.TLS != nil { + // nolint:forcetypeassert // We will cover this by tests as we control what is stored. + an = allowedNetworksHTTPS.Load().(*config.Networks) + err = fmt.Errorf("https connections are not allowed from %s", r.RemoteAddr) + } else { + // nolint:forcetypeassert // We will cover this by tests as we control what is stored. + an = allowedNetworksHTTP.Load().(*config.Networks) + err = fmt.Errorf("http connections are not allowed from %s", r.RemoteAddr) + } + if !an.Contains(r.RemoteAddr) { + rw.Header().Set("Connection", "close") + respondWith(rw, err, http.StatusForbidden) + return + } + proxy.ServeHTTP(rw, r) + case "/ping": + var err error + + if !allowPing.Load() { + err = fmt.Errorf("ping is not allowed") + respondWith(rw, err, http.StatusForbidden) + return + } + + // nolint:forcetypeassert // We will cover this by tests as we control what is stored. + proxyHandler := proxyHandler.Load().(*ProxyHandler) + r.RemoteAddr = proxyHandler.GetRemoteAddr(r) + var an *config.Networks if r.TLS != nil { // nolint:forcetypeassert // We will cover this by tests as we control what is stored. @@ -296,6 +326,7 @@ func applyConfig(cfg *config.Config) error { allowedNetworksHTTPS.Store(&cfg.Server.HTTPS.AllowedNetworks) allowedNetworksMetrics.Store(&cfg.Server.Metrics.AllowedNetworks) proxyHandler.Store(NewProxyHandler(&cfg.Server.Proxy)) + allowPing.Store(cfg.AllowPing) log.SetDebug(cfg.LogDebug) log.Infof("Loaded config:\n%s", cfg) From 1f0d56944abd5fcfd10040bcd29f098b4e4da701 Mon Sep 17 00:00:00 2001 From: Michael Khalturin Date: Tue, 3 Sep 2024 00:55:14 +0300 Subject: [PATCH 3/8] minor refactoring path routing --- main.go | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/main.go b/main.go index ae19ce03..3f818f6a 100644 --- a/main.go +++ b/main.go @@ -238,32 +238,10 @@ func serveHTTP(rw http.ResponseWriter, r *http.Request) { } proxy.refreshCacheMetrics() promHandler.ServeHTTP(rw, r) - case "/", "/query": - var err error - // nolint:forcetypeassert // We will cover this by tests as we control what is stored. - proxyHandler := proxyHandler.Load().(*ProxyHandler) - r.RemoteAddr = proxyHandler.GetRemoteAddr(r) - - var an *config.Networks - if r.TLS != nil { - // nolint:forcetypeassert // We will cover this by tests as we control what is stored. - an = allowedNetworksHTTPS.Load().(*config.Networks) - err = fmt.Errorf("https connections are not allowed from %s", r.RemoteAddr) - } else { - // nolint:forcetypeassert // We will cover this by tests as we control what is stored. - an = allowedNetworksHTTP.Load().(*config.Networks) - err = fmt.Errorf("http connections are not allowed from %s", r.RemoteAddr) - } - if !an.Contains(r.RemoteAddr) { - rw.Header().Set("Connection", "close") - respondWith(rw, err, http.StatusForbidden) - return - } - proxy.ServeHTTP(rw, r) - case "/ping": + case "/", "/query", "/ping": var err error - if !allowPing.Load() { + if r.URL.Path == "/ping" && !allowPing.Load() { err = fmt.Errorf("ping is not allowed") respondWith(rw, err, http.StatusForbidden) return From 4e6544332a518fc7b47cdc1fdc836b21c73da32f Mon Sep 17 00:00:00 2001 From: Michael Khalturin Date: Tue, 3 Sep 2024 01:19:06 +0300 Subject: [PATCH 4/8] added ping tests --- config/examples/debug.yml | 52 --------------------------------------- main_test.go | 27 ++++++++++++++++++++ testdata/http.ping.yml | 15 +++++++++++ testdata/https.ping.yml | 17 +++++++++++++ 4 files changed, 59 insertions(+), 52 deletions(-) delete mode 100644 config/examples/debug.yml create mode 100644 testdata/http.ping.yml create mode 100644 testdata/https.ping.yml diff --git a/config/examples/debug.yml b/config/examples/debug.yml deleted file mode 100644 index cb88dd05..00000000 --- a/config/examples/debug.yml +++ /dev/null @@ -1,52 +0,0 @@ -log_debug: true -hack_me_please: true -allow_ping: true - -caches: - - name: "longterm" - mode: "file_system" - file_system: - dir: "/tmp/chproxy/longterm/cache" - max_size: 10Gb - expire: 1h - shared_with_all_users: true - -max_error_reason_size: 10GB - -server: - http: - listen_addr: ":80" - idle_timeout: 20m - - https: - listen_addr: ":4433" - autocert: - cache_dir: "/tmp/chproxy/certs" - proxy: - enable: true - header: CF-Connecting-IP - -users: - - name: "*" - to_cluster: "dl" - to_user: "*" - is_wildcarded: true - max_concurrent_queries: 4 - max_execution_time: 15m - deny_https: false - cache: "longterm" - -clusters: - - name: "dl" - scheme: "http" - nodes: ["localhost:28123"] - # heartbeat: - # interval: 1m - # timeout: 10s - # request: "/?query=SELECT%201%2B1" - # q response: "2\n" - kill_query_user: - name: "chproxy" - password: "chproxy" - users: - - name: "*" diff --git a/main_test.go b/main_test.go index d3e1d707..139d0bd7 100644 --- a/main_test.go +++ b/main_test.go @@ -858,6 +858,33 @@ func TestServe(t *testing.T) { }, startTLS, }, + { + "http ping request", + "testdata/http.ping.yml", + func(t *testing.T) { + httpGet(t, "http://127.0.0.1:9090/ping", http.StatusOK) + }, + startHTTP, + }, + { + "https ping request", + "testdata/https.ping.yml", + func(t *testing.T) { + req, err := http.NewRequest("GET", "https://127.0.0.1:9090/ping", nil) + checkErr(t, err) + req.Close = true + req.SetBasicAuth("default", "qwerty") + + resp, err := tlsClient.Do(req) + checkErr(t, err) + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK) + } + }, + startTLS, + }, } // Wait until CHServer starts. diff --git a/testdata/http.ping.yml b/testdata/http.ping.yml new file mode 100644 index 00000000..cd7e40bb --- /dev/null +++ b/testdata/http.ping.yml @@ -0,0 +1,15 @@ +allow_ping: true + +server: + http: + listen_addr: ":9090" + allowed_networks: ["127.0.0.1/24"] + +users: + - name: "default" + to_cluster: "default" + to_user: "default" + +clusters: + - name: "default" + nodes: ["127.0.0.1:18124"] diff --git a/testdata/https.ping.yml b/testdata/https.ping.yml new file mode 100644 index 00000000..e165a9f2 --- /dev/null +++ b/testdata/https.ping.yml @@ -0,0 +1,17 @@ +allow_ping: true + +server: + https: + listen_addr: ":9090" + cert_file: "testdata/example.com.cert" + key_file: "testdata/example.com.key" + +users: + - name: "default" + password: "qwerty" + to_cluster: "default" + to_user: "default" + +clusters: + - name: "default" + nodes: ["127.0.0.1:18124"] From ea41bf90d50958ee4b96279dbfe000c9a5119512 Mon Sep 17 00:00:00 2001 From: Michael Khalturin Date: Wed, 4 Sep 2024 15:55:37 +0300 Subject: [PATCH 5/8] fixed lint error by making /ping endpoint constant --- main.go | 6 ++++-- scope.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index 3f818f6a..b25bb503 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,8 @@ import ( "golang.org/x/crypto/acme/autocert" ) +const pingEndpoint string = "/ping" + var ( configFile = flag.String("config", "", "Proxy configuration filename") version = flag.Bool("version", false, "Prints current version and exits") @@ -238,10 +240,10 @@ func serveHTTP(rw http.ResponseWriter, r *http.Request) { } proxy.refreshCacheMetrics() promHandler.ServeHTTP(rw, r) - case "/", "/query", "/ping": + case "/", "/query", pingEndpoint: var err error - if r.URL.Path == "/ping" && !allowPing.Load() { + if r.URL.Path == pingEndpoint && !allowPing.Load() { err = fmt.Errorf("ping is not allowed") respondWith(rw, err, http.StatusForbidden) return diff --git a/scope.go b/scope.go index 42551103..f04b2655 100644 --- a/scope.go +++ b/scope.go @@ -390,7 +390,7 @@ func (s *scope) decorateRequest(req *http.Request) (*http.Request, url.Values) { params := make(url.Values) // pass ping request - if req.RequestURI == "/ping" { + if req.RequestURI == pingEndpoint { req.URL.Scheme = s.host.Scheme() req.URL.Host = s.host.Host() return req, req.URL.Query() From 9b46898460fee2abab420fc7a5193c92b504245a Mon Sep 17 00:00:00 2001 From: Michael Khalturin Date: Wed, 4 Sep 2024 16:02:01 +0300 Subject: [PATCH 6/8] added test for ping is not allowed configuration --- main_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/main_test.go b/main_test.go index 139d0bd7..7e721a29 100644 --- a/main_test.go +++ b/main_test.go @@ -885,6 +885,17 @@ func TestServe(t *testing.T) { }, startTLS, }, + { + "http ping request is not allowed", + "testdata/http.yml", + func(t *testing.T) { + resp := httpGet(t, "http://127.0.0.1:9090/ping", http.StatusForbidden) + expected := "ping is not allowed\n" + checkResponse(t, resp.Body, expected) + resp.Body.Close() + }, + startHTTP, + }, } // Wait until CHServer starts. From 60b75965cdb67b3ab1dfa37b601982182a533819 Mon Sep 17 00:00:00 2001 From: Michael Khalturin Date: Wed, 11 Sep 2024 08:27:24 +0300 Subject: [PATCH 7/8] added check grants for shared cache queries --- cache/async_cache.go | 16 +++++++------ config/README.md | 4 ++++ config/config.go | 4 ++++ io.go | 46 ++++++++++++++++++++++++++++++++++++ proxy.go | 56 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 119 insertions(+), 7 deletions(-) diff --git a/cache/async_cache.go b/cache/async_cache.go index ee55e44c..29dc003b 100644 --- a/cache/async_cache.go +++ b/cache/async_cache.go @@ -20,8 +20,9 @@ type AsyncCache struct { graceTime time.Duration - MaxPayloadSize config.ByteSize - SharedWithAllUsers bool + MaxPayloadSize config.ByteSize + SharedWithAllUsers bool + CheckGrantsForSharedCache bool } func (c *AsyncCache) Close() error { @@ -109,10 +110,11 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach maxPayloadSize := cfg.MaxPayloadSize return &AsyncCache{ - Cache: cache, - TransactionRegistry: transaction, - graceTime: graceTime, - MaxPayloadSize: maxPayloadSize, - SharedWithAllUsers: cfg.SharedWithAllUsers, + Cache: cache, + TransactionRegistry: transaction, + graceTime: graceTime, + MaxPayloadSize: maxPayloadSize, + SharedWithAllUsers: cfg.SharedWithAllUsers, + CheckGrantsForSharedCache: cfg.CheckGrantsForSharedCache, }, nil } diff --git a/config/README.md b/config/README.md index bb2f27b2..07e45ddb 100644 --- a/config/README.md +++ b/config/README.md @@ -100,6 +100,10 @@ max_payload_size: # Whether a query cached by a user can be used by another user shared_with_all_users: | default = false [optional] + +# Whether `shared_with_all_users` option is enabled +# check permissions for cached query used by another user +check_grants_for_shared_cache: | default = false [optional] ``` ### diff --git a/config/config.go b/config/config.go index 652cfe9b..6a42281c 100644 --- a/config/config.go +++ b/config/config.go @@ -938,6 +938,10 @@ type Cache struct { // Whether a query cached by a user could be used by another user SharedWithAllUsers bool `yaml:"shared_with_all_users,omitempty"` + + // Whether `shared_with_all_users` option is enabled + // check permissions for cached query used by another user + CheckGrantsForSharedCache bool `yaml:"check_grants_for_shared_cache,omitempty"` } func (c *Cache) setDefaults() { diff --git a/io.go b/io.go index 6598d8c7..b24b2d31 100644 --- a/io.go +++ b/io.go @@ -177,3 +177,49 @@ func (crc *cachedReadCloser) String() string { crc.bLock.Unlock() return s } + +var _ ResponseWriterWithCode = &checkGrantsResponseWriter{} + +type checkGrantsResponseWriter struct { + http.ResponseWriter + + statusCode int +} + +func (rw *checkGrantsResponseWriter) SetStatusCode(code int) { + rw.statusCode = code + rw.ResponseWriter.WriteHeader(rw.statusCode) +} + +func (rw *checkGrantsResponseWriter) StatusCode() int { + if rw.statusCode == 0 { + return http.StatusOK + } + + return rw.statusCode +} + +func (rw *checkGrantsResponseWriter) WriteHeader(statusCode int) { + // cache statusCode to keep the opportunity to change it in further + rw.statusCode = statusCode + rw.SetStatusCode(statusCode) +} + +func (rw *checkGrantsResponseWriter) Write(b []byte) (int, error) { + if rw.statusCode == http.StatusOK { + return 0, nil + } + + n, err := rw.ResponseWriter.Write(b) + return n, err +} + +// CloseNotify implements http.CloseNotifier +func (rw *checkGrantsResponseWriter) CloseNotify() <-chan bool { + // The rw.ResponseWriter must implement http.CloseNotifier + rwc, ok := rw.ResponseWriter.(http.CloseNotifier) + if !ok { + panic("BUG: the wrapped ResponseWriter must implement http.CloseNotifier") + } + return rwc.CloseNotify() +} diff --git a/proxy.go b/proxy.go index 24940762..629f2789 100644 --- a/proxy.go +++ b/proxy.go @@ -146,6 +146,25 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } if shouldReturnFromCache { + // if cache shared between all users + // try to check if cached query is allowed for current user + if s.user.cache != nil && s.user.cache.SharedWithAllUsers && s.user.cache.CheckGrantsForSharedCache { + checkReq, checkQuery, _ := s.createCheckGrantsRequest(req) + + srwCheck := &checkGrantsResponseWriter{ + ResponseWriter: srw.ResponseWriter, + } + + rp.proxyRequest(s, srwCheck, srw, checkReq) + + if srwCheck.statusCode == http.StatusOK { + log.Debugf("%s: check grants for shared cached query request success; query: %q; Method: %s; URL: %q", s, checkQuery, checkReq.Method, checkReq.URL.String()) + } else { + log.Debugf("%s: check grants for shared cached query request failure: non-200 status code %d; query: %q; Method: %s; URL: %q", s, srwCheck.statusCode, checkQuery, checkReq.Method, checkReq.URL.String()) + return + } + } + rp.serveFromCache(s, srw, req, origParams, q) } else { rp.proxyRequest(s, srw, srw, req) @@ -925,3 +944,40 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) { s.requestPacketSize = len(q) return s, 0, nil } + +// create a new request based on proxied one +// with query wrapped to fetch result types like: +// 'DESC ({original_query})' +// along with query parsed and analyzed for return types (which is fast) +// ClickHouse check permissions to execute this query for the user +func (s *scope) createCheckGrantsRequest(originalReq *http.Request) (*http.Request, string, error) { + originalQuery := originalReq.URL.Query().Get("query") + checkQuery := fmt.Sprintf("DESC (%s);", strings.TrimRight(originalQuery, ";")) + + newURL := *originalReq.URL + + queryParams, err := url.ParseQuery(newURL.RawQuery) + if err != nil { + return nil, checkQuery, err + } + + queryParams.Set("query", checkQuery) + + newURL.RawQuery = queryParams.Encode() + + req := &http.Request{ + Method: originalReq.Method, + URL: &newURL, + Proto: originalReq.Proto, + ProtoMajor: originalReq.ProtoMajor, + ProtoMinor: originalReq.ProtoMinor, + Header: originalReq.Header.Clone(), + Body: originalReq.Body, + Host: originalReq.Host, + ContentLength: originalReq.ContentLength, + Close: originalReq.Close, + TLS: originalReq.TLS, + } + + return req, checkQuery, nil +} From 866ba2d558b421b33aaefd2cd258dad0857435ef Mon Sep 17 00:00:00 2001 From: Michael Khalturin Date: Wed, 22 Jan 2025 12:51:32 +0300 Subject: [PATCH 8/8] minor fixes --- proxy.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/proxy.go b/proxy.go index 629f2789..3c86d49b 100644 --- a/proxy.go +++ b/proxy.go @@ -148,7 +148,7 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { if shouldReturnFromCache { // if cache shared between all users // try to check if cached query is allowed for current user - if s.user.cache != nil && s.user.cache.SharedWithAllUsers && s.user.cache.CheckGrantsForSharedCache { + if s.user.cache.SharedWithAllUsers && s.user.cache.CheckGrantsForSharedCache { checkReq, checkQuery, _ := s.createCheckGrantsRequest(req) srwCheck := &checkGrantsResponseWriter{ @@ -158,9 +158,9 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { rp.proxyRequest(s, srwCheck, srw, checkReq) if srwCheck.statusCode == http.StatusOK { - log.Debugf("%s: check grants for shared cached query request success; query: %q; Method: %s; URL: %q", s, checkQuery, checkReq.Method, checkReq.URL.String()) + log.Debugf("%s: check grants for shared cached query request success; user: %s; query: %q; Method: %s; URL: %q", s, s.user.name, checkQuery, checkReq.Method, checkReq.URL.String()) } else { - log.Debugf("%s: check grants for shared cached query request failure: non-200 status code %d; query: %q; Method: %s; URL: %q", s, srwCheck.statusCode, checkQuery, checkReq.Method, checkReq.URL.String()) + log.Debugf("%s: check grants for shared cached query request failure: non-200 status code %d; user: %s; query: %q; Method: %s; URL: %q", s, srwCheck.statusCode, s.user.name, checkQuery, checkReq.Method, checkReq.URL.String()) return } }