diff --git a/Makefile b/Makefile index ecf4201345..d451a3b0b0 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,8 @@ fake-aws fake-aws-s3 fake-aws-sqs aws-ingress fluent-bit kibana backoffice \ calling-test demo-smtp elasticsearch-curator elasticsearch-external \ elasticsearch-ephemeral minio-external cassandra-external \ ingress-nginx-controller nginx-ingress-services reaper restund \ -k8ssandra-test-cluster ldap-scim-bridge wire-server-enterprise +k8ssandra-test-cluster ldap-scim-bridge wire-server-enterprise \ +integration KIND_CLUSTER_NAME := wire-server HELM_PARALLELISM ?= 1 # 1 for sequential tests; 6 for all-parallel tests # (run `psql -h localhost -p 5432 -d backendA -U wire-server -w` for the list of options for PSQL_DB) diff --git a/charts/integration/templates/configmap.yaml b/charts/integration/templates/configmap.yaml index 82fc989528..4aee4ce200 100644 --- a/charts/integration/templates/configmap.yaml +++ b/charts/integration/templates/configmap.yaml @@ -77,14 +77,16 @@ data: apiPort: 5380 dohPort: 5381 - originDomain: federation-test-helper.{{ .Release.Namespace }}.svc.cluster.local + # originDomain: federation-test-helper.{{ .Release.Namespace }}.svc.cluster.local + # sven-test.wire.link + originDomain: dev.zinfra.io rabbitmq: host: rabbitmq port: 5671 adminHost: rabbitmq - adminPort: 15671 - enableTls: true + adminPort: 15672 + enableTls: false insecureSkipVerifyTls: true vHost: / @@ -310,3 +312,6 @@ data: integrationTestHostName: integration-headless.{{ .Release.Namespace }}.svc.cluster.local cellsEventQueue: cells_events + shardingGroupCount: {{ .Values.config.shardingGroupCount }} + maxUserNo: {{ .Values.config.maxUserNo }} + maxDeliveryDelay: {{ .Values.config.maxDeliveryDelay }} diff --git a/charts/integration/templates/integration-integration.yaml b/charts/integration/templates/integration-integration.yaml index f59475280e..b0ff8c19aa 100644 --- a/charts/integration/templates/integration-integration.yaml +++ b/charts/integration/templates/integration-integration.yaml @@ -29,10 +29,6 @@ spec: secret: secretName: "brig" - - name: "turn-servers" - configMap: - name: "turn" - - name: "cannon-config" configMap: name: "cannon" @@ -65,22 +61,6 @@ spec: configMap: name: "background-worker" - - name: "background-worker-secrets" - secret: - secretName: "background-worker" - - - name: "stern-config" - configMap: - name: "backoffice" - - - name: "proxy-config" - configMap: - name: "proxy" - - - name: "proxy-secrets" - secret: - secretName: "proxy" - - name: "nginz-config" configMap: name: "nginz" @@ -89,28 +69,12 @@ spec: secret: secretName: "nginz" - - name: elasticsearch-ca - secret: - secretName: {{ .Values.config.elasticsearch.tlsCaSecretRef.name }} - - - name: redis-ca - secret: - secretName: {{ .Values.config.redis.tlsCaSecretRef.name }} - - - name: rabbitmq-ca - secret: - secretName: {{ .Values.config.rabbitmq.tlsCaSecretRef.name }} - {{- if eq (include "useCassandraTLS" .Values.config) "true" }} - name: integration-cassandra secret: secretName: {{ include "cassandraTlsSecretName" .Values.config }} {{- end }} - - name: wire-server-enterprise-config - configMap: - name: wire-server-enterprise - restartPolicy: Never initContainers: @@ -121,14 +85,10 @@ spec: {{- toYaml .Values.podSecurityContext | nindent 6 }} {{- end }} volumeMounts: - - name: elasticsearch-ca - mountPath: "/certs/elasticsearch" {{- if eq (include "useCassandraTLS" .Values.config) "true" }} - name: "integration-cassandra" mountPath: "/certs/cassandra" {{- end }} - - name: rabbitmq-ca - mountPath: /certs/rabbitmq-ca env: - name: INTEGRATION_DYNAMIC_BACKENDS_POOLSIZE value: "{{ .Values.config.dynamicBackendsPoolsize }}" @@ -141,32 +101,19 @@ spec: - name: RABBITMQ_USERNAME valueFrom: secretKeyRef: - name: brig + name: gundeck key: rabbitmqUsername - name: RABBITMQ_PASSWORD valueFrom: secretKeyRef: - name: brig + name: gundeck key: rabbitmqPassword command: - /bin/sh - -c - | set -euo pipefail - # FUTUREWORK: Do all of this in the integration test binary - integration-dynamic-backends-db-schemas.sh \ - --host {{ .Values.config.cassandra.host }} \ - --port {{ .Values.config.cassandra.port }} \ - --replication-factor {{ .Values.config.cassandra.replicationFactor }} \ - {{- if eq (include "useCassandraTLS" .Values.config) "true" }} - --tls-ca-certificate-file /certs/cassandra/{{- include "cassandraTlsSecretKey" .Values.config }} - {{ end }} - - integration-dynamic-backends-brig-index.sh \ - --elasticsearch-server https://elastic:changeme@{{ .Values.config.elasticsearch.host }}:9200 \ - --elasticsearch-ca-cert /certs/elasticsearch/{{ .Values.config.elasticsearch.tlsCaSecretRef.key }} - integration-dynamic-backends-ses.sh {{ .Values.config.sesEndpointUrl }} - integration-dynamic-backends-s3.sh {{ .Values.config.s3EndpointUrl }} + {{- range $name, $dynamicBackend := .Values.config.dynamicBackends }} integration-dynamic-backends-vhosts.sh {{ $.Values.config.rabbitmqPutVHostUrl }} {{ $dynamicBackend.federatorExternalHostPrefix}}.{{ $.Release.Namespace }}.svc.cluster.local {{- end }} @@ -176,11 +123,12 @@ spec: cpu: "2" containers: - - name: integration - image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" - {{- if eq (include "includeSecurityContext" .) "true" }} + {{- range $i := until (int .Values.config.shardingGroupCount) }} + - name: "integration-{{ int $i}}" + image: "{{ $.Values.image.repository }}:{{ $.Values.image.tag }}" + {{- if eq (include "includeSecurityContext" $) "true" }} securityContext: - {{- toYaml .Values.podSecurityContext | nindent 6 }} + {{- toYaml $.Values.podSecurityContext | nindent 6 }} {{- end }} command: - /bin/bash @@ -188,13 +136,13 @@ spec: - | set -euo pipefail - if integration --config /etc/wire/integration/integration.yaml; then + if TEST_INCLUDE=testBench integration --config /etc/wire/integration/integration.yaml --sharding-group {{ $i }}; then exit_code=$? else exit_code=$? fi - {{- if .Values.config.uploadXml }} + {{- if $.Values.config.uploadXml }} # In case a different S3 compliant storage is used to upload test result. if ! [[ -z "${UPLOAD_XML_AWS_ACCESS_KEY_ID+x}" ]]; then export AWS_ACCESS_KEY_ID="$UPLOAD_XML_AWS_ACCESS_KEY_ID" @@ -210,7 +158,7 @@ spec: resources: requests: memory: "512Mi" - cpu: "2" + cpu: "0.5" volumeMounts: - name: integration-config @@ -228,9 +176,6 @@ spec: - name: brig-secrets mountPath: /etc/wire/brig/secrets - - name: turn-servers - mountPath: /etc/wire/brig/turn - - name: cannon-config mountPath: /etc/wire/cannon/conf @@ -243,58 +188,13 @@ spec: - name: spar-config mountPath: /etc/wire/spar/conf - - name: federator-config - mountPath: /etc/wire/federator/conf - - - name: federator-secrets - mountPath: /etc/wire/federator/secrets - - - name: federator-ca - mountPath: /etc/wire/federator/ca - - - name: background-worker-config - mountPath: /etc/wire/background-worker/conf - - - name: background-worker-secrets - mountPath: /etc/wire/background-worker/secrets - - - name: stern-config - mountPath: /etc/wire/stern/conf - - - name: proxy-config - mountPath: /etc/wire/proxy/conf - - - name: proxy-secrets - mountPath: /etc/wire/proxy/secrets - - name: nginz-config mountPath: /etc/wire/nginz/conf - name: nginz-secrets mountPath: /etc/wire/nginz/secrets - - name: elasticsearch-ca - mountPath: /etc/wire/brig/elasticsearch-ca - - - name: redis-ca - mountPath: /etc/wire/gundeck/redis-ca - - - name: rabbitmq-ca - mountPath: /etc/wire/brig/rabbitmq-ca - - - name: rabbitmq-ca - mountPath: /etc/wire/galley/rabbitmq-ca - - - name: rabbitmq-ca - mountPath: /etc/wire/background-worker/rabbitmq-ca - - - name: rabbitmq-ca - mountPath: /etc/wire/gundeck/rabbitmq-ca - - - name: rabbitmq-ca - mountPath: /etc/wire/cannon/rabbitmq-ca - - {{- if eq (include "useCassandraTLS" .Values.config) "true" }} + {{- if eq (include "useCassandraTLS" $.Values.config) "true" }} - name: "integration-cassandra" mountPath: "/certs" @@ -311,9 +211,6 @@ spec: mountPath: "/etc/wire/spar/cassandra" {{- end }} - - name: wire-server-enterprise-config - mountPath: /etc/wire/wire-server-enterprise/conf - env: # these dummy values are necessary for Amazonka's "Discover" - name: AWS_ACCESS_KEY_ID @@ -325,35 +222,21 @@ spec: - name: RABBITMQ_USERNAME valueFrom: secretKeyRef: - name: brig + name: gundeck key: rabbitmqUsername - name: RABBITMQ_PASSWORD valueFrom: secretKeyRef: - name: brig + name: gundeck key: rabbitmqPassword - - name: RABBITMQ_USERNAME_V0 - value: "wire-server" - - name: RABBITMQ_PASSWORD_V0 - valueFrom: - secretKeyRef: - name: rabbitmq-v0 - key: rabbitmq-password - - name: RABBITMQ_USERNAME_V1 - value: "wire-server" - - name: RABBITMQ_PASSWORD_V1 - valueFrom: - secretKeyRef: - name: rabbitmq-v1 - key: rabbitmq-password - {{- if hasKey .Values.secrets "redisUsername" }} + {{- if hasKey $.Values.secrets "redisUsername" }} - name: REDIS_USERNAME valueFrom: secretKeyRef: name: integration key: redisUsername {{- end }} - {{- if hasKey .Values.secrets "redisPassword" }} + {{- if hasKey $.Values.secrets "redisPassword" }} - name: REDIS_PASSWORD valueFrom: secretKeyRef: @@ -362,10 +245,10 @@ spec: {{- end }} - name: TEST_XML value: /tmp/result.xml - {{- if .Values.config.uploadXml }} + {{- if $.Values.config.uploadXml }} - name: UPLOAD_XML_S3_BASE_URL - value: {{ .Values.config.uploadXml.baseUrl }} - {{- if .Values.secrets.uploadXmlAwsAccessKeyId }} + value: {{ $.Values.config.uploadXml.baseUrl }} + {{- if $.Values.secrets.uploadXmlAwsAccessKeyId }} - name: UPLOAD_XML_AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: @@ -379,8 +262,9 @@ spec: {{- end }} {{- end }} - name: ENABLE_FEDERATION_V0 - value: "1" + value: "0" - name: ENABLE_FEDERATION_V1 - value: "1" + value: "0" - name: ENABLE_FEDERATION_V2 - value: "1" + value: "0" + {{- end }} diff --git a/charts/integration/values.yaml b/charts/integration/values.yaml index 36305b2be7..5bd5556def 100644 --- a/charts/integration/values.yaml +++ b/charts/integration/values.yaml @@ -12,7 +12,7 @@ podSecurityContext: type: RuntimeDefault config: - dynamicBackendsPoolsize: 3 + dynamicBackendsPoolsize: 0 dynamicBackends: dynamic-backend-1: federatorExternalHostPrefix: dynamic-backend-1 @@ -118,8 +118,11 @@ config: sqsEndpointUrl: http://fake-aws-sqs:4568 sesEndpointUrl: http://fake-aws-ses:4569 s3EndpointUrl: http://fake-aws-s3:9000 - rabbitmqPutVHostUrl: http://rabbitmq:15672/api/vhosts - + rabbitmqPutVHostUrl: http://rabbitmq-b.databases.svc.cluster.local:15672/api/vhosts + shardingGroupCount: 3 + # shardingGroup: 0 + maxUserNo: 1000 + maxDeliveryDelay: 120 tls: verify_depth: 1 # Namespace from which to obtain the secret containing the CA trusted by diff --git a/integration/default.nix b/integration/default.nix index 04163fc507..03512c4bc5 100644 --- a/integration/default.nix +++ b/integration/default.nix @@ -74,6 +74,7 @@ , split , stm , streaming-commons +, streamly , string-conversions , system-linux-proc , tagged @@ -179,6 +180,7 @@ mkDerivation { split stm streaming-commons + streamly string-conversions system-linux-proc tagged diff --git a/integration/integration.cabal b/integration/integration.cabal index 0cdf6972ef..70386331cf 100644 --- a/integration/integration.cabal +++ b/integration/integration.cabal @@ -184,6 +184,7 @@ library Test.MLS.Unreachable Test.NginxZAuthModule Test.Notifications + Test.NotificationsBenchmark Test.OAuth Test.PasswordReset Test.Presence @@ -299,6 +300,7 @@ library , split , stm , streaming-commons + , streamly , string-conversions , system-linux-proc , tagged diff --git a/integration/test/Test/NotificationsBenchmark.hs b/integration/test/Test/NotificationsBenchmark.hs new file mode 100644 index 0000000000..479e0564d7 --- /dev/null +++ b/integration/test/Test/NotificationsBenchmark.hs @@ -0,0 +1,139 @@ +module Test.NotificationsBenchmark where + +import API.Brig +import API.BrigCommon +import API.Common +import API.GundeckInternal +import Control.Concurrent +import Control.Monad.Codensity (Codensity (..)) +import Control.Monad.Reader (asks) +import Control.Monad.Reader.Class (local) +import Control.Retry +import qualified Data.Map.Strict as Map +import Data.String.Conversions (cs) +import Data.Time +import GHC.Conc (numCapabilities) +import GHC.Stack +import SetupHelpers +import qualified Streamly.Data.Fold.Prelude as Fold +import qualified Streamly.Data.Stream.Prelude as Stream +import System.Random +import qualified Test.Events as TestEvents +import Testlib.Prekeys +import Testlib.Prelude + +data TestRecipient = TestRecipient + { user :: Value, + clientIds :: [String] + } + deriving (Show) + +testBench :: (HasCallStack) => App () +testBench = do + shardingGroupCount <- asks (.shardingGroupCount) + shardingGroup <- asks (.shardingGroup) + maxUserNo <- asks (.maxUserNo) + + -- Preparation + let parCfg = Stream.maxThreads (numCapabilities * 2) . Stream.ordered False + toMap = Fold.foldl' (\kv (k, v) -> Map.insert k v kv) Map.empty + -- Later, we only read from this map. Thus, it doesn't have to be thread-safe. + userMap :: Map Word TestRecipient <- + Stream.fromList [0 :: Word .. maxUserNo] + & Stream.filter ((shardingGroup ==) . (`mod` shardingGroupCount)) + & Stream.parMapM parCfg (\i -> generateTestRecipient >>= \r -> pure (i, r)) + & Stream.fold toMap + + now <- liftIO getCurrentTime + + -- TODO: To be replaced with real data from the file. (See + -- https://wearezeta.atlassian.net/wiki/spaces/PET/pages/2118680620/Simulating+production-like+data) + let fakeData = zip (plusDelta now <$> [0 :: Word ..]) (cycle [0 .. maxUserNo]) + + Stream.fromList fakeData + & Stream.filter (\(_t, uNo) -> (uNo `mod` shardingGroupCount) == shardingGroup) + & Stream.parMapM parCfg (\(t, uNo) -> waitForTimeStamp t >> sendAndReceive uNo userMap) + & Stream.fold Fold.drain + +waitForTimeStamp :: UTCTime -> App () +waitForTimeStamp timestamp = liftIO $ do + now <- getCurrentTime + when (now < timestamp) + $ + -- Event comes from the simulated future: Wait here until now and timestamp are aligned. + let delta = diffTimeToMicroSeconds $ diffUTCTime timestamp now + in print ("Waiting " ++ show delta ++ " microseconds. (timestamp, now)" ++ show (timestamp, now)) + >> threadDelay delta + where + diffTimeToMicroSeconds :: NominalDiffTime -> Int + diffTimeToMicroSeconds dt = floor @Double (realToFrac dt * 1_000_000) + +plusDelta :: UTCTime -> Word -> UTCTime +plusDelta timestamp deltaMilliSeconds = addUTCTime (fromIntegral deltaMilliSeconds / 1000) timestamp + +sendAndReceive :: Word -> Map Word TestRecipient -> App () +sendAndReceive userNo userMap = do + print $ "pushing to user" ++ show userNo + let testRecipient = userMap Map.! (fromIntegral userNo) + alice = testRecipient.user + + r <- recipient alice + payload :: Value <- toJSON <$> liftIO randomPayload + now <- liftIO $ getCurrentTime + let push = + object + [ "recipients" .= [r], + "payload" + .= [ object + [ "foo" .= payload, + "sent_at" .= now + ] + ] + ] + + void $ postPush alice [push] >>= getBody 200 + + messageDeliveryTimeout <- asks $ fromIntegral . (.maxDeliveryDelay) + forM_ (testRecipient.clientIds) $ \(cid :: String) -> + runCodensity (TestEvents.createEventsWebSocket alice (Just cid)) $ \ws -> do + -- TODO: Tweak this value to the least acceptable event delivery duration + local (setTimeoutTo messageDeliveryTimeout) $ TestEvents.assertFindsEvent ws $ \e -> do + receivedAt <- liftIO getCurrentTime + sentAt :: UTCTime <- (e %. "payload.sent_at" >>= asByteString) <&> fromJust . decode . cs + print $ "Message sent/receive delta: " ++ show (diffUTCTime receivedAt sentAt) + + e %. "payload" `shouldMatch` [object ["foo" .= payload]] + where + -- \| Generate a random string with random length up to 2048 bytes + randomPayload :: IO String + randomPayload = + -- Measured with + -- `kubectl exec --namespace databases -it gundeck-gundeck-eks-eu-west-1a-sts-0 -- sh -c 'cqlsh -e "select blobAsText(payload) from gundeck.notifications LIMIT 5000;" ' | sed 's/^[ \t]*//;s/[ \t]*$//' | wc` + let len :: Int = 884 -- measured in prod + in mapM (\_ -> randomRIO ('\32', '\126')) [1 .. len] -- printable ASCII + +setTimeoutTo :: Int -> Env -> Env +setTimeoutTo tSecs env = env {timeOutSeconds = tSecs} + +generateTestRecipient :: (HasCallStack) => App TestRecipient +generateTestRecipient = do + print "generateTestRecipient" + user <- recover $ (randomUser OwnDomain def) + r <- randomRIO @Word (1, 8) + clientIds <- forM [0 .. r] $ \_ -> do + client <- + recover + $ addClient + user + def + { acapabilities = Just ["consumable-notifications"], + prekeys = Just $ take 10 somePrekeysRendered, + lastPrekey = Just $ head someLastPrekeysRendered + } + >>= getJSON 201 + objId client + + pure $ TestRecipient user clientIds + where + recover :: App a -> App a + recover = recoverAll (limitRetriesByCumulativeDelay 300 (exponentialBackoff 1_000_000)) . const diff --git a/integration/test/Testlib/Cannon.hs b/integration/test/Testlib/Cannon.hs index 45093c3e25..9a4aad779b 100644 --- a/integration/test/Testlib/Cannon.hs +++ b/integration/test/Testlib/Cannon.hs @@ -39,6 +39,8 @@ module Testlib.Cannon printAwaitAtLeastResult, waitForResponse, assertNoEvent, + -- This should never be merged! + connect, ) where diff --git a/integration/test/Testlib/Env.hs b/integration/test/Testlib/Env.hs index 67a90228ad..7638e04608 100644 --- a/integration/test/Testlib/Env.hs +++ b/integration/test/Testlib/Env.hs @@ -62,8 +62,8 @@ serviceHostPort m Stern = m.stern serviceHostPort m FederatorInternal = m.federatorInternal serviceHostPort m WireServerEnterprise = m.wireServerEnterprise -mkGlobalEnv :: FilePath -> Codensity IO GlobalEnv -mkGlobalEnv cfgFile = do +mkGlobalEnv :: FilePath -> Word -> Codensity IO GlobalEnv +mkGlobalEnv cfgFile shardingGroup = do eith <- liftIO $ Yaml.decodeFileEither cfgFile intConfig <- liftIO $ case eith of Left err -> do @@ -145,7 +145,11 @@ mkGlobalEnv cfgFile = do gDNSMockServerConfig = intConfig.dnsMockServer, gCellsEventQueue = intConfig.cellsEventQueue, gCellsEventWatchersLock, - gCellsEventWatchers + gCellsEventWatchers, + gShardingGroupCount = intConfig.shardingGroupCount, + gShardingGroup = shardingGroup, + gMaxUserNo = intConfig.maxUserNo, + gMaxDeliveryDelay = intConfig.maxDeliveryDelay } where createSSLContext :: Maybe FilePath -> IO (Maybe OpenSSL.SSLContext) @@ -185,7 +189,9 @@ mkEnv currentTestName ge = do -- those domains. apiVersionByDomain = Map.fromList - [ (gFederationV0Domain ge, 4), + [ (gDomain1 ge, 11), + (gDomain2 ge, 11), + (gFederationV0Domain ge, 4), (gFederationV1Domain ge, 5), (gFederationV2Domain ge, 8) ], @@ -201,7 +207,11 @@ mkEnv currentTestName ge = do dnsMockServerConfig = ge.gDNSMockServerConfig, cellsEventQueue = ge.gCellsEventQueue, cellsEventWatchersLock = ge.gCellsEventWatchersLock, - cellsEventWatchers = ge.gCellsEventWatchers + cellsEventWatchers = ge.gCellsEventWatchers, + shardingGroupCount = ge.gShardingGroupCount, + shardingGroup = ge.gShardingGroup, + maxUserNo = ge.gMaxUserNo, + maxDeliveryDelay = ge.gMaxDeliveryDelay } allCiphersuites :: [Ciphersuite] diff --git a/integration/test/Testlib/Options.hs b/integration/test/Testlib/Options.hs index f109e13d8f..2b56256eeb 100644 --- a/integration/test/Testlib/Options.hs +++ b/integration/test/Testlib/Options.hs @@ -27,7 +27,8 @@ data TestOptions = TestOptions excludeTests :: [String], listTests :: Bool, xmlReport :: Maybe FilePath, - configFile :: String + configFile :: String, + shardingGroup :: Word } parser :: Parser TestOptions @@ -64,6 +65,13 @@ parser = <> help "Use configuration FILE" <> value "services/integration.yaml" ) + <*> option + auto + ( long "sharding-group" + <> short 's' + <> help "The sharding group of this instance" + <> value 0 + ) optInfo :: ParserInfo TestOptions optInfo = diff --git a/integration/test/Testlib/Run.hs b/integration/test/Testlib/Run.hs index 1ae1ddf06d..43667a034a 100644 --- a/integration/test/Testlib/Run.hs +++ b/integration/test/Testlib/Run.hs @@ -22,35 +22,21 @@ import Control.Exception as E import Control.Monad import Control.Monad.Codensity import Control.Monad.IO.Class -import Control.Monad.Reader.Class (asks) -import Data.Default import Data.Foldable import Data.Function import Data.Functor import Data.List import Data.Maybe (fromMaybe) -import Data.String (IsString (fromString)) -import Data.String.Conversions (cs) -import Data.Text (Text) -import qualified Data.Text as T import Data.Time -import qualified Data.Yaml as Yaml -import Network.AMQP.Extended -import Network.RabbitMqAdmin import RunAllTests import System.Directory import System.Environment import System.Exit import System.FilePath -import System.IO.Temp (writeTempFile) -import System.Process import Testlib.Assertions import Testlib.Env -import Testlib.ModService (readAndUpdateConfig) import Testlib.Options import Testlib.Printing -import Testlib.ResourcePool (acquireResources) -import Testlib.RunServices (backendA, backendB) import Testlib.Types import Testlib.XML import Text.Printf @@ -123,6 +109,7 @@ main = do opts <- getOptions let f = testFilter opts cfg = opts.configFile + shardingGroup = opts.shardingGroup allTests <- mkAllTests let tests = @@ -132,10 +119,10 @@ main = do let qualifiedName = fromMaybe module_ (stripPrefix "Test." module_) <> "." <> name in (qualifiedName, summary, full, action) - if opts.listTests then doListTests tests else runTests tests opts.xmlReport cfg + if opts.listTests then doListTests tests else runTests tests opts.xmlReport cfg shardingGroup -runTests :: [(String, x, y, App ())] -> Maybe FilePath -> FilePath -> IO () -runTests tests mXMLOutput cfg = do +runTests :: [(String, x, y, App ())] -> Maybe FilePath -> FilePath -> Word -> IO () +runTests tests mXMLOutput cfg shardingGroup = do output <- newChan let displayOutput = readChan output >>= \case @@ -143,12 +130,12 @@ runTests tests mXMLOutput cfg = do Nothing -> pure () let writeOutput = writeChan output . Just - runCodensity (mkEnvs cfg) $ \(genv, env) -> + runCodensity (mkEnvs cfg) $ \(genv, _env) -> withAsync displayOutput $ \displayThread -> do -- Although migrations are run on service start up we are running them here before -- to prevent race conditions between brig and galley -- which cause flakiness and can make the complete test suite fail - runAppWithEnv env runMigrations + -- runAppWithEnv env runMigrations -- Currently 4 seems to be stable, more seems to create more timeouts. report <- fmap mconcat $ pooledForConcurrentlyN 4 tests $ \(qname, _, _, action) -> do timestamp <- getCurrentTime @@ -172,7 +159,7 @@ runTests tests mXMLOutput cfg = do pure (TestSuiteReport [TestCaseReport qname TestSuccess tm]) writeChan output Nothing wait displayThread - deleteFederationV0AndV1Queues genv + -- deleteFederationV0AndV1Queues genv printReport report mapM_ (saveXMLReport report) mXMLOutput when (any (\testCase -> testCase.result /= TestSuccess) report.cases) $ @@ -180,62 +167,62 @@ runTests tests mXMLOutput cfg = do where mkEnvs :: FilePath -> Codensity IO (GlobalEnv, Env) mkEnvs fp = do - g <- mkGlobalEnv fp + g <- mkGlobalEnv fp shardingGroup e <- mkEnv Nothing g pure (g, e) -runMigrations :: App () -runMigrations = do - cwdBase <- asks (.servicesCwdBase) - let brig = "brig" - let (cwd, exe) = case cwdBase of - Nothing -> (Nothing, brig) - Just dir -> - (Just (dir brig), "../../dist" brig) - getConfig <- readAndUpdateConfig def backendA Brig - config <- liftIO getConfig - tempFile <- liftIO $ writeTempFile "/tmp" "brig-migrations.yaml" (cs $ Yaml.encode config) - dynDomains <- asks (.dynamicDomains) - pool <- asks (.resourcePool) - lowerCodensity $ do - resources <- acquireResources (length dynDomains) pool - let dbnames = [backendA.berPostgresqlDBName, backendB.berPostgresqlDBName] <> map (.berPostgresqlDBName) resources - for_ dbnames $ runMigration exe tempFile cwd - liftIO $ putStrLn "Postgres migrations finished" - where - runMigration :: (MonadIO m) => FilePath -> FilePath -> Maybe FilePath -> String -> m () - runMigration exe tempFile cwd dbname = do - let cp = (proc exe ["-c", tempFile, "migrate-postgres", "--dbname", dbname]) {cwd} - (_, _, _, ph) <- liftIO $ createProcess cp - void $ liftIO $ waitForProcess ph - -deleteFederationV0AndV1Queues :: GlobalEnv -> IO () -deleteFederationV0AndV1Queues env = do - let testDomains = env.gDomain1 : env.gDomain2 : env.gDynamicDomains - putStrLn "Attempting to delete federation V0 queues..." - (mV0User, mV0Pass) <- readCredsFromEnvWithSuffix "V0" - fromMaybe (putStrLn "No or incomplete credentials for fed V0 RabbitMQ") $ - deleteFederationQueues testDomains env.gRabbitMQConfigV0 <$> mV0User <*> mV0Pass - - putStrLn "Attempting to delete federation V1 queues..." - (mV1User, mV1Pass) <- readCredsFromEnvWithSuffix "V1" - fromMaybe (putStrLn "No or incomplete credentials for fed V1 RabbitMQ") $ - deleteFederationQueues testDomains env.gRabbitMQConfigV1 <$> mV1User <*> mV1Pass - where - readCredsFromEnvWithSuffix :: String -> IO (Maybe Text, Maybe Text) - readCredsFromEnvWithSuffix suffix = - (,) - <$> (fmap fromString <$> lookupEnv ("RABBITMQ_USERNAME_" <> suffix)) - <*> (fmap fromString <$> lookupEnv ("RABBITMQ_PASSWORD_" <> suffix)) - -deleteFederationQueues :: [String] -> RabbitMqAdminOpts -> Text -> Text -> IO () -deleteFederationQueues testDomains opts username password = do - client <- mkRabbitMqAdminClientEnvWithCreds opts username password - for_ testDomains $ \domain -> do - page <- client.listQueuesByVHost opts.vHost (fromString $ "^backend-notifications\\." <> domain <> "$") True 100 1 - for_ page.items $ \queue -> do - putStrLn $ "Deleting queue " <> T.unpack queue.name - void $ deleteQueue client opts.vHost queue.name +-- runMigrations :: App () +-- runMigrations = do +-- cwdBase <- asks (.servicesCwdBase) +-- let brig = "brig" +-- let (cwd, exe) = case cwdBase of +-- Nothing -> (Nothing, brig) +-- Just dir -> +-- (Just (dir brig), "../../dist" brig) +-- getConfig <- readAndUpdateConfig def backendA Brig +-- config <- liftIO getConfig +-- tempFile <- liftIO $ writeTempFile "/tmp" "brig-migrations.yaml" (cs $ Yaml.encode config) +-- dynDomains <- asks (.dynamicDomains) +-- pool <- asks (.resourcePool) +-- lowerCodensity $ do +-- resources <- acquireResources (length dynDomains) pool +-- let dbnames = [backendA.berPostgresqlDBName, backendB.berPostgresqlDBName] <> map (.berPostgresqlDBName) resources +-- for_ dbnames $ runMigration exe tempFile cwd +-- liftIO $ putStrLn "Postgres migrations finished" +-- where +-- runMigration :: (MonadIO m) => FilePath -> FilePath -> Maybe FilePath -> String -> m () +-- runMigration exe tempFile cwd dbname = do +-- let cp = (proc exe ["-c", tempFile, "migrate-postgres", "--dbname", dbname]) {cwd} +-- (_, _, _, ph) <- liftIO $ createProcess cp +-- void $ liftIO $ waitForProcess ph + +-- deleteFederationV0AndV1Queues :: GlobalEnv -> IO () +-- deleteFederationV0AndV1Queues env = do +-- let testDomains = env.gDomain1 : env.gDomain2 : env.gDynamicDomains +-- putStrLn "Attempting to delete federation V0 queues..." +-- (mV0User, mV0Pass) <- readCredsFromEnvWithSuffix "V0" +-- fromMaybe (putStrLn "No or incomplete credentials for fed V0 RabbitMQ") $ +-- deleteFederationQueues testDomains env.gRabbitMQConfigV0 <$> mV0User <*> mV0Pass +-- +-- putStrLn "Attempting to delete federation V1 queues..." +-- (mV1User, mV1Pass) <- readCredsFromEnvWithSuffix "V1" +-- fromMaybe (putStrLn "No or incomplete credentials for fed V1 RabbitMQ") $ +-- deleteFederationQueues testDomains env.gRabbitMQConfigV1 <$> mV1User <*> mV1Pass +-- where +-- readCredsFromEnvWithSuffix :: String -> IO (Maybe Text, Maybe Text) +-- readCredsFromEnvWithSuffix suffix = +-- (,) +-- <$> (fmap fromString <$> lookupEnv ("RABBITMQ_USERNAME_" <> suffix)) +-- <*> (fmap fromString <$> lookupEnv ("RABBITMQ_PASSWORD_" <> suffix)) +-- +-- deleteFederationQueues :: [String] -> RabbitMqAdminOpts -> Text -> Text -> IO () +-- deleteFederationQueues testDomains opts username password = do +-- client <- mkRabbitMqAdminClientEnvWithCreds opts username password +-- for_ testDomains $ \domain -> do +-- page <- client.listQueuesByVHost opts.vHost (fromString $ "^backend-notifications\\." <> domain <> "$") True 100 1 +-- for_ page.items $ \queue -> do +-- putStrLn $ "Deleting queue " <> T.unpack queue.name +-- void $ deleteQueue client opts.vHost queue.name doListTests :: [(String, String, String, x)] -> IO () doListTests tests = for_ tests $ \(qname, _desc, _full, _) -> do diff --git a/integration/test/Testlib/RunServices.hs b/integration/test/Testlib/RunServices.hs index 4a9f6403d4..5fbcabc2e2 100644 --- a/integration/test/Testlib/RunServices.hs +++ b/integration/test/Testlib/RunServices.hs @@ -88,8 +88,10 @@ main = do let cp = proc "sh" (["-c", "exec \"$@\"", "--"] <> opts.runSubprocess) (_, _, _, ph) <- createProcess cp exitWith =<< waitForProcess ph + -- The shardingGroup only matters for the testBench test; probably not here. + shardingGroup = 0 - runCodensity (mkGlobalEnv cfg >>= mkEnv Nothing) $ \env -> + runCodensity (mkGlobalEnv cfg shardingGroup >>= mkEnv Nothing) $ \env -> runAppWithEnv env $ lowerCodensity $ do diff --git a/integration/test/Testlib/Types.hs b/integration/test/Testlib/Types.hs index 29aa0b2b4b..99044af3c6 100644 --- a/integration/test/Testlib/Types.hs +++ b/integration/test/Testlib/Types.hs @@ -144,7 +144,11 @@ data GlobalEnv = GlobalEnv gDNSMockServerConfig :: DNSMockServerConfig, gCellsEventQueue :: String, gCellsEventWatchersLock :: MVar (), - gCellsEventWatchers :: IORef (Map String QueueWatcher) + gCellsEventWatchers :: IORef (Map String QueueWatcher), + gShardingGroupCount :: Word, + gShardingGroup :: Word, + gMaxUserNo :: Word, + gMaxDeliveryDelay :: Word } data IntegrationConfig = IntegrationConfig @@ -160,7 +164,10 @@ data IntegrationConfig = IntegrationConfig rabbitmqV1 :: RabbitMqAdminOpts, cassandra :: CassandraConfig, dnsMockServer :: DNSMockServerConfig, - cellsEventQueue :: String + cellsEventQueue :: String, + shardingGroupCount :: Word, + maxUserNo :: Word, + maxDeliveryDelay :: Word } deriving (Show, Generic) @@ -181,6 +188,9 @@ instance FromJSON IntegrationConfig where <*> o .: fromString "cassandra" <*> o .: fromString "dnsMockServer" <*> o .: fromString "cellsEventQueue" + <*> o .: fromString "shardingGroupCount" + <*> o .: fromString "maxUserNo" + <*> o .: fromString "maxDeliveryDelay" data ServiceMap = ServiceMap { brig :: HostPort, @@ -271,7 +281,11 @@ data Env = Env dnsMockServerConfig :: DNSMockServerConfig, cellsEventQueue :: String, cellsEventWatchersLock :: MVar (), - cellsEventWatchers :: IORef (Map String QueueWatcher) + cellsEventWatchers :: IORef (Map String QueueWatcher), + shardingGroupCount :: Word, + shardingGroup :: Word, + maxUserNo :: Word, + maxDeliveryDelay :: Word } data Response = Response @@ -446,7 +460,7 @@ hoistCodensity m = Codensity $ \k -> do getServiceMap :: (HasCallStack) => String -> App ServiceMap getServiceMap fedDomain = do env <- ask - assertJust ("Could not find service map for federation domain: " <> fedDomain) (Map.lookup fedDomain env.serviceMap) + assertJust ("Could not find service map for federation domain: " <> fedDomain <> " in " <> show (Map.keys env.serviceMap)) (Map.lookup fedDomain env.serviceMap) getMLSState :: App MLSState getMLSState = do diff --git a/services/integration.yaml b/services/integration.yaml index 427aa761d1..9ec920a603 100644 --- a/services/integration.yaml +++ b/services/integration.yaml @@ -330,3 +330,7 @@ integrationTestHostName: "localhost" additionalElasticSearch: https://localhost:9201 cellsEventQueue: cells_events + +shardingGroupCount: 1 +maxUserNo: 1000 +maxDeliveryDelay: 120