diff --git a/server/ladder_service/ladder_service.py b/server/ladder_service/ladder_service.py index b99ee37d5..81ee7251e 100644 --- a/server/ladder_service/ladder_service.py +++ b/server/ladder_service/ladder_service.py @@ -9,6 +9,7 @@ from collections import defaultdict from typing import Awaitable, Callable, Optional +import aio_pika import aiocron import humanize from sqlalchemy import and_, func, select, text, true @@ -32,8 +33,9 @@ ) from server.decorators import with_logger from server.exceptions import DisabledError +from server.factions import Faction from server.game_service import GameService -from server.games import InitMode, LadderGame +from server.games import Game, InitMode, LadderGame from server.games.ladder_game import GameClosedError from server.ladder_service.game_name import game_name from server.ladder_service.violation_service import ViolationService @@ -43,10 +45,14 @@ OnMatchedCallback, Search ) +from server.message_queue_service import MessageQueueService from server.metrics import MatchLaunch +from server.player_service import PlayerService from server.players import Player, PlayerState from server.types import GameLaunchOptions, Map, NeroxisGeneratedMap +from .types import MQMatchmakingRequest, MQMatchmakingRequestParticipant + @with_logger class LadderService(Service): @@ -59,20 +65,38 @@ def __init__( self, database: FAFDatabase, game_service: GameService, + message_queue_service: MessageQueueService, + player_service: PlayerService, violation_service: ViolationService, ): self._db = database - self._informed_players: set[Player] = set() self.game_service = game_service - self.queues = {} + self.message_queue_service = message_queue_service + self.player_service = player_service self.violation_service = violation_service + self.queues: dict[str, MatchmakerQueue] = {} + self._initialized = False + self._informed_players: set[Player] = set() self._searches: dict[Player, dict[str, Search]] = defaultdict(dict) self._allow_new_searches = True async def initialize(self) -> None: + if self._initialized: + return + await self.update_data() + await self.message_queue_service.declare_exchange( + config.MQ_EXCHANGE_NAME + ) + await self.message_queue_service.consume( + config.MQ_EXCHANGE_NAME, + "request.match.create", + self.handle_mq_matchmaking_request + ) + self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data) + self._initialized = True async def update_data(self) -> None: async with self._db.acquire() as conn: @@ -181,7 +205,7 @@ async def fetch_map_pools(self, conn) -> dict[int, tuple[str, list[Map]]]: return map_pool_maps - async def fetch_matchmaker_queues(self, conn): + async def fetch_matchmaker_queues(self, conn) -> dict[str, dict]: result = await conn.execute( select( matchmaker_queue.c.id, @@ -439,6 +463,224 @@ def write_rating_progress(self, player: Player, rating_type: str) -> None: ) }) + async def handle_mq_matchmaking_request( + self, + message: aio_pika.IncomingMessage + ): + try: + game = await self._handle_mq_matchmaking_request(message) + except Exception as e: + if isinstance(e, NotConnectedError): + msg = { + "error_code": "launch_failed", + "args": [ + {"player_id": player.id} + for player in e.players + ], + } + elif isinstance(e, InvalidRequestError): + msg = { + "error_code": e.code, + "args": e.args, + } + else: + self._logger.exception( + "Unexpected error while handling MQ matchmaking request", + ) + msg = { + "error_code": "unknown", + "args": e.args, + } + + await self.message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "error.match.create", + msg, + correlation_id=message.correlation_id + ) + else: + await self.message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "success.match.create", + {"game_id": game.id}, + correlation_id=message.correlation_id + ) + + async def _handle_mq_matchmaking_request( + self, + message: aio_pika.IncomingMessage, + ): + self._logger.debug( + "Got matchmaking request: %s", + message.correlation_id, + ) + request = await self._parse_mq_matchmaking_request_message(message) + return await self._launch_mq_matchmaking_request(request) + + async def _parse_mq_matchmaking_request_message( + self, + message: aio_pika.IncomingMessage, + ) -> MQMatchmakingRequest: + try: + request = json.loads(message.body) + except json.JSONDecodeError as e: + raise InvalidRequestError( + "invalid_request", + {"message": str(e)}, + ) + + try: + # TODO: Use id instead of name? + queue_name = request.get("matchmaker_queue") + map_name = request["map_name"] + game_name = request["game_name"] + participants = request["participants"] + featured_mod = request.get("featured_mod") + game_options = request.get("game_options") + + if not featured_mod and not queue_name: + raise KeyError("featured_mod") + except KeyError as e: + raise InvalidRequestError( + "invalid_request", + {"message": f"missing '{e.args[0]}'"}, + ) + + queue = self.queues.get(queue_name) + + if queue_name: + if queue is None: + raise InvalidRequestError( + "invalid_request", + {"message": f"invalid queue '{queue_name}'"}, + ) + if featured_mod is None: + featured_mod = queue.featured_mod + + if not participants: + raise InvalidRequestError( + "invalid_request", + {"message": "empty participants"}, + ) + + player_ids = [participant["player_id"] for participant in participants] + missing_players = [ + id + for id in player_ids + if self.player_service[id] is None + ] + if missing_players: + raise InvalidRequestError( + "players_not_found", + *[{"player_id": id} for id in missing_players] + ) + + all_players = [ + player + for player_id in player_ids + if (player := self.player_service[player_id]) is not None + ] + non_idle_players = [ + player for player in all_players + if player.state != PlayerState.IDLE + ] + if non_idle_players: + raise InvalidRequestError( + "invalid_state", + *[ + {"player_id": player.id, "state": player.state.name} + for player in all_players + ] + ) + + try: + # Avoiding list comprehension so that `i` is available in the + # exception handler + mq_participants = [] + for i, (participant, player) in enumerate( + zip(participants, all_players), + ): + mq_participants.append( + MQMatchmakingRequestParticipant( + player=player, + faction=Faction.from_value(participant["faction"]), + team=participant["team"], + slot=participant["slot"], + ) + ) + except KeyError as e: + raise InvalidRequestError( + "invalid_request", + {"message": f"missing 'participants.[{i}].{e.args[0]}'"}, + ) + + return MQMatchmakingRequest( + game_name=game_name, + map=await self.game_service.get_map(map_name), + featured_mod=featured_mod, + participants=mq_participants, + game_options=game_options, + queue=queue, + ) + + async def _launch_mq_matchmaking_request( + self, + request: MQMatchmakingRequest, + ) -> Game: + all_players = [participant.player for participant in request.participants] + host = all_players[0] + guests = all_players[1:] + + for player in all_players: + player.state = PlayerState.STARTING_AUTOMATCH + + game = None + try: + game = self.game_service.create_game( + game_class=LadderGame, + game_mode=request.featured_mod, + host=host, + name="Matchmaker Game", + map=request.map, + matchmaker_queue_id=request.queue.id if request.queue else None, + rating_type=request.queue.rating_type if request.queue else None, + max_players=len(request.participants) + ) + game.init_mode = InitMode.AUTO_LOBBY + game.set_name_unchecked(request.game_name) + + for participant in request.participants: + player_id = participant.player.id + + game.set_player_option(player_id, "Faction", participant.faction.value) + game.set_player_option(player_id, "Team", participant.team) + game.set_player_option(player_id, "StartSpot", participant.slot) + game.set_player_option(player_id, "Army", participant.slot) + game.set_player_option(player_id, "Color", participant.slot) + + def make_game_options(player: Player) -> GameLaunchOptions: + return GameLaunchOptions( + mapname=request.map.folder_name, + expected_players=len(request.participants), + game_options=request.game_options, + team=game.get_player_option(player.id, "Team"), + faction=game.get_player_option(player.id, "Faction"), + map_position=game.get_player_option(player.id, "StartSpot") + ) + + await self.launch_match(game, host, guests, make_game_options) + + return game + except Exception: + if game: + await game.on_game_finish() + + for player in all_players: + if player.state == PlayerState.STARTING_AUTOMATCH: + player.state = PlayerState.IDLE + + raise + def on_match_found( self, s1: Search, @@ -734,3 +976,9 @@ async def graceful_shutdown(self): class NotConnectedError(asyncio.TimeoutError): def __init__(self, players: list[Player]): self.players = players + + +class InvalidRequestError(Exception): + def __init__(self, code: str, *args): + super().__init__(*args) + self.code = code diff --git a/server/ladder_service/types.py b/server/ladder_service/types.py new file mode 100644 index 000000000..25519552d --- /dev/null +++ b/server/ladder_service/types.py @@ -0,0 +1,22 @@ +from typing import Any, NamedTuple, Optional + +from server.factions import Faction +from server.matchmaker import MatchmakerQueue +from server.players import Player +from server.types import Map + + +class MQMatchmakingRequest(NamedTuple): + game_name: str + map: Map + featured_mod: str + participants: list["MQMatchmakingRequestParticipant"] + game_options: dict[str, Any] + queue: Optional[MatchmakerQueue] + + +class MQMatchmakingRequestParticipant(NamedTuple): + player: Player + faction: Faction + team: int + slot: int diff --git a/server/matchmaker/matchmaker_queue.py b/server/matchmaker/matchmaker_queue.py index 778407981..925dd14a8 100644 --- a/server/matchmaker/matchmaker_queue.py +++ b/server/matchmaker/matchmaker_queue.py @@ -90,7 +90,7 @@ def get_map_pool_for_rating(self, rating: float) -> Optional[MapPool]: continue return map_pool - def get_game_options(self) -> dict[str, Any]: + def get_game_options(self) -> Optional[dict[str, Any]]: return self.params.get("GameOptions") or None def initialize(self): diff --git a/server/message_queue_service.py b/server/message_queue_service.py index 5015c172a..c0b957c8b 100644 --- a/server/message_queue_service.py +++ b/server/message_queue_service.py @@ -4,7 +4,7 @@ import asyncio import json -from typing import Iterable +from typing import Callable, Iterable, Optional import aio_pika from aio_pika import DeliveryMode, ExchangeType @@ -125,26 +125,29 @@ async def _shutdown(self) -> None: async def publish( self, exchange_name: str, - routing: str, + routing_key: str, payload: dict, mandatory: bool = False, delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT, + correlation_id: Optional[str] = None, ) -> None: await self.publish_many( exchange_name, - routing, + routing_key, [payload], mandatory=mandatory, - delivery_mode=delivery_mode + delivery_mode=delivery_mode, + correlation_id=correlation_id, ) async def publish_many( self, exchange_name: str, - routing: str, + routing_key: str, payloads: Iterable[dict], mandatory: bool = False, delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT, + correlation_id: Optional[str] = None, ) -> None: if not self._is_ready: self._logger.warning( @@ -152,19 +155,18 @@ async def publish_many( ) return - exchange = self._exchanges.get(exchange_name) - if exchange is None: - raise KeyError(f"Unknown exchange {exchange_name}.") + exchange = self._get_exchange(exchange_name) async with self._channel.transaction(): for payload in payloads: message = aio_pika.Message( json.dumps(payload).encode(), - delivery_mode=delivery_mode + delivery_mode=delivery_mode, + correlation_id=correlation_id, ) await exchange.publish( message, - routing_key=routing, + routing_key=routing_key, mandatory=mandatory ) self._logger.log( @@ -172,9 +174,39 @@ async def publish_many( "Published message %s to %s/%s", payload, exchange_name, - routing + routing_key ) + async def consume( + self, + exchange_name: str, + routing_key: str, + process_message: Callable[[aio_pika.IncomingMessage], None], + ) -> None: + await self.initialize() + if not self._is_ready: + self._logger.warning( + "Not connected to RabbitMQ, unable to declare queue." + ) + return + + exchange = self._get_exchange(exchange_name) + queue = await self._channel.declare_queue( + None, + auto_delete=True, + durable=False + ) + + await queue.bind(exchange, routing_key) + await queue.consume(process_message, exclusive=True) + + def _get_exchange(self, exchange_name: str) -> aio_pika.Exchange: + exchange = self._exchanges.get(exchange_name) + if exchange is None: + raise KeyError(f"Unknown exchange {exchange_name}.") + + return exchange + @synchronizedmethod("initialization_lock") async def reconnect(self) -> None: self._is_ready = False diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 60f2c5b89..72d39a972 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -39,9 +39,22 @@ def mock_games(): @pytest.fixture -async def ladder_service(mocker, database, game_service, violation_service): +async def ladder_service( + mocker, + database, + game_service, + message_queue_service, + player_service, + violation_service, +): mocker.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1) - ladder_service = LadderService(database, game_service, violation_service) + ladder_service = LadderService( + database, + game_service, + message_queue_service, + player_service, + violation_service, + ) await ladder_service.initialize() yield ladder_service await ladder_service.shutdown() @@ -547,18 +560,23 @@ async def channel(): yield channel -async def connect_mq_consumer(server, channel, routing_key): - """ - Returns a subclass of Protocol that yields messages read from a rabbitmq - exchange. - """ +async def connect_mq_queue(channel, routing_key): exchange = await channel.declare_exchange( config.MQ_EXCHANGE_NAME, aio_pika.ExchangeType.TOPIC, durable=True ) - queue = await channel.declare_queue("", exclusive=True) + queue = await channel.declare_queue(None, exclusive=True) await queue.bind(exchange, routing_key=routing_key) + return queue + + +async def connect_mq_consumer(channel, routing_key): + """ + Returns a subclass of Protocol that yields messages read from a rabbitmq + exchange. + """ + queue = await connect_mq_queue(channel, routing_key) proto = AioQueueProtocol(queue) await proto.consume() diff --git a/tests/integration_tests/test_game.py b/tests/integration_tests/test_game.py index 3c90ecb45..393164aa9 100644 --- a/tests/integration_tests/test_game.py +++ b/tests/integration_tests/test_game.py @@ -581,7 +581,6 @@ async def test_game_ended_rates_game(lobby_server): @fast_forward(30) async def test_game_ended_broadcasts_rating_update(lobby_server, channel): mq_proto_all = await connect_mq_consumer( - lobby_server, channel, "success.rating.update" ) @@ -1460,12 +1459,10 @@ async def test_game_stats_broadcasts_achievement_updates( channel ): mq_proto_ach = await connect_mq_consumer( - lobby_server, channel, "request.achievement.update" ) mq_proto_evt = await connect_mq_consumer( - lobby_server, channel, "request.event.update" ) @@ -1553,7 +1550,6 @@ async def test_galactic_war_1v1_game_ended_broadcasts_army_results( channel ): mq_proto_all = await connect_mq_consumer( - lobby_server, channel, "success.gameResults.create" ) @@ -1649,7 +1645,6 @@ async def test_galactic_war_1v1_game_ended_broadcasts_army_results( @fast_forward(30) async def test_galactic_war_2v1_game_ended_broadcasts_army_results(lobby_server, channel): mq_proto_all = await connect_mq_consumer( - lobby_server, channel, "success.gameResults.create" ) diff --git a/tests/integration_tests/test_matchmaker_requests.py b/tests/integration_tests/test_matchmaker_requests.py new file mode 100644 index 000000000..97821c7d2 --- /dev/null +++ b/tests/integration_tests/test_matchmaker_requests.py @@ -0,0 +1,470 @@ +# External matchmaker requests over rabbitmq +import asyncio +import json +import uuid + +import pytest + +from server.config import config +from tests.utils import fast_forward + +from .conftest import connect_and_sign_in, connect_mq_queue, read_until_command +from .test_game import client_response, start_search + +pytestmark = pytest.mark.rabbitmq + + +@fast_forward(20) +async def test_valid_request_1v1( + lobby_server, + channel, + message_queue_service +): + test_id, _, proto1 = await connect_and_sign_in( + ("test", "test_password"), lobby_server + ) + rhiza_id, _, proto2 = await connect_and_sign_in( + ("Rhiza", "puff_the_magic_dragon"), lobby_server + ) + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + await asyncio.gather(*( + read_until_command(proto, "game_info") + for proto in (proto1, proto2) + )) + + # Include all the information we could possibly need + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "ladder1v1", + "featured_mod": "ladder1v1", + "game_name": "test VERSUS Rhiza", + "map_name": "scmp_003", + "participants": [ + { + "player_id": test_id, + "team": 2, + "slot": 1, + "faction": "uef" + }, + { + "player_id": rhiza_id, + "team": 3, + "slot": 2, + "faction": "cybran" + } + ] + }, + correlation_id=correlation_id + ) + + msg1, msg2 = await asyncio.gather( + client_response(proto1), + client_response(proto2) + ) + assert msg1["uid"] == msg2["uid"] + assert msg1["mapname"] == msg2["mapname"] + assert msg1["name"] == msg2["name"] + assert msg1["mod"] == msg2["mod"] + assert msg1["rating_type"] == msg2["rating_type"] + assert msg1["expected_players"] == msg2["expected_players"] + assert "game_options" not in msg1 and "game_options" not in msg2 + + assert msg1["mapname"] == "scmp_003" + assert msg1["name"] == "test VERSUS Rhiza" + assert msg1["mod"] == "ladder1v1" + assert msg1["rating_type"] == "ladder_1v1" + assert msg1["expected_players"] == 2 + + assert msg1["team"] == 2 + assert msg1["map_position"] == 1 + assert msg1["faction"] == 1 + + assert msg2["team"] == 3 + assert msg2["map_position"] == 2 + assert msg2["faction"] == 3 + + await proto1.send_message({ + "target": "game", + "command": "GameState", + "args": ["Launching"] + }) + + message = await success_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "game_id": msg1["uid"] + } + assert await error_queue.get(fail=False) is None + + +@fast_forward(20) +async def test_valid_request_1v1_game_options( + lobby_server, + channel, + message_queue_service +): + test_id, _, proto1 = await connect_and_sign_in( + ("test", "test_password"), lobby_server + ) + rhiza_id, _, proto2 = await connect_and_sign_in( + ("Rhiza", "puff_the_magic_dragon"), lobby_server + ) + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + await asyncio.gather(*( + read_until_command(proto, "game_info") + for proto in (proto1, proto2) + )) + + # Include all the information we could possibly need + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "ladder1v1", + "featured_mod": "ladder1v1", + "game_name": "test VERSUS Rhiza", + "map_name": "scmp_003", + "participants": [ + { + "player_id": test_id, + "team": 2, + "slot": 1, + "faction": "uef" + }, + { + "player_id": rhiza_id, + "team": 3, + "slot": 2, + "faction": "cybran" + } + ], + "game_options": { + "Share": "ShareUntilDeath", + "RestrictedCategories": ["T3", "T4", "SUBS", "PARAGON"], + } + }, + correlation_id=correlation_id + ) + + msg1, msg2 = await asyncio.gather( + client_response(proto1), + client_response(proto2) + ) + assert msg1["game_options"] == msg2["game_options"] + + assert msg1["game_options"] == { + "Share": "ShareUntilDeath", + "RestrictedCategories": ["T3", "T4", "SUBS", "PARAGON"], + } + + await proto1.send_message({ + "target": "game", + "command": "GameState", + "args": ["Launching"] + }) + + message = await success_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "game_id": msg1["uid"] + } + assert await error_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_invalid_request_empty( + ladder_service, + channel, + message_queue_service, +): + del ladder_service + + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + # Empty payload + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "invalid_request", + "args": [ + {"message": "missing 'map_name'"}, + ] + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_invalid_request_missing_queue_and_featured_mod( + ladder_service, + channel, + message_queue_service, +): + del ladder_service + + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "game_name": "Test bad game", + "map_name": "scmp_003", + "participants": [], + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "invalid_request", + "args": [ + {"message": "missing 'featured_mod'"}, + ] + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_invalid_request_invalid_queue_name( + ladder_service, + channel, + message_queue_service, +): + del ladder_service + + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "bad_queue_name", + "game_name": "Test bad game", + "map_name": "scmp_003", + "participants": [], + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "invalid_request", + "args": [ + {"message": "invalid queue 'bad_queue_name'"}, + ], + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_invalid_request_empty_participants( + ladder_service, + channel, + message_queue_service +): + del ladder_service + + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "ladder1v1", + "game_name": "Test bad game", + "map_name": "scmp_003", + "participants": [], + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "invalid_request", + "args": [ + {"message": "empty participants"}, + ], + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_player_offline( + lobby_server, + channel, + message_queue_service +): + rhiza_id, _, proto = await connect_and_sign_in( + ("Rhiza", "puff_the_magic_dragon"), lobby_server + ) + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + await read_until_command(proto, "game_info") + + # Include all the information we could possibly need + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "ladder1v1", + "game_name": "test VERSUS Rhiza", + "map_name": "scmp_003", + "participants": [ + { + "player_id": 1, + "team": 2, + "slot": 1, + "faction": "uef" + }, + { + "player_id": rhiza_id, + "team": 3, + "slot": 2, + "faction": "cybran" + } + ] + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "players_not_found", "args": [{"player_id": 1}] + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_player_already_searching( + lobby_server, + channel, + message_queue_service +): + rhiza_id, _, proto = await connect_and_sign_in( + ("Rhiza", "puff_the_magic_dragon"), lobby_server + ) + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + await read_until_command(proto, "game_info") + await start_search(proto, "ladder1v1") + + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "featured_mod": "faf", + "game_name": "Rhiza solo game", + "map_name": "scmp_003", + "participants": [ + { + "player_id": rhiza_id, + "team": 3, + "slot": 2, + "faction": "cybran" + } + ] + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "invalid_state", "args": [ + {"player_id": rhiza_id, "state": "SEARCHING_LADDER"}, + ] + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(100) +async def test_players_dont_connect( + lobby_server, + channel, + message_queue_service +): + test_id, _, proto1 = await connect_and_sign_in( + ("test", "test_password"), lobby_server + ) + rhiza_id, _, proto2 = await connect_and_sign_in( + ("Rhiza", "puff_the_magic_dragon"), lobby_server + ) + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + await asyncio.gather(*( + read_until_command(proto, "game_info") + for proto in (proto1, proto2) + )) + + # Include all the information we could possibly need + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "ladder1v1", + "featured_mod": "faf", + "game_name": "test VERSUS Rhiza", + "map_name": "scmp_003", + "participants": [ + { + "player_id": test_id, + "team": 2, + "slot": 1, + "faction": "aeon" + }, + { + "player_id": rhiza_id, + "team": 3, + "slot": 2, + "faction": "seraphim" + } + ] + }, + correlation_id=correlation_id + ) + + msg = await client_response(proto1) + assert msg["faction"] == 2 + # Mod field sould override the mod from queue + assert msg["mod"] == "faf" + + message = await error_queue.iterator(timeout=85).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "launch_failed", "args": [{"player_id": rhiza_id}] + } + assert await success_queue.get(fail=False) is None diff --git a/tests/integration_tests/test_server.py b/tests/integration_tests/test_server.py index ae00663bb..6b8390b6f 100644 --- a/tests/integration_tests/test_server.py +++ b/tests/integration_tests/test_server.py @@ -373,12 +373,10 @@ async def test_player_info_broadcast(lobby_server): @fast_forward(5) async def test_player_info_broadcast_to_rabbitmq(lobby_server, channel): mq_proto = await connect_mq_consumer( - lobby_server, channel, "broadcast.playerInfo.update" ) mq_proto_all = await connect_mq_consumer( - lobby_server, channel, "broadcast.*.update" ) @@ -613,7 +611,6 @@ async def test_game_info_broadcast_to_players_in_lobby(lobby_server): @fast_forward(10) async def test_info_broadcast_to_rabbitmq(lobby_server, channel): mq_proto_all = await connect_mq_consumer( - lobby_server, channel, "broadcast.*.update" ) diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index a1d9ffe12..b70cb2ddb 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -22,21 +22,26 @@ def ladder_and_game_service_context( @asynccontextmanager async def make_ladder_and_game_service(): async with database_context(request) as database: + player_service = mock.Mock() + message_queue_service = mock.Mock( + declare_exchange=mock.AsyncMock(), + consume=mock.AsyncMock() + ) with mock.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1): game_service = GameService( database, - player_service=mock.Mock(), + player_service=player_service, game_stats_service=mock.Mock(), rating_service=mock.Mock(), - message_queue_service=mock.Mock( - declare_exchange=mock.AsyncMock() - ) + message_queue_service=message_queue_service, ) violation_service = ViolationService() ladder_service = LadderService( database, game_service, - violation_service + message_queue_service, + player_service, + violation_service, ) await game_service.initialize() @@ -57,10 +62,18 @@ async def ladder_service( mocker, database, game_service, + message_queue_service, + player_service, violation_service, ): mocker.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1) - ladder_service = LadderService(database, game_service, violation_service) + ladder_service = LadderService( + database, + game_service, + message_queue_service, + player_service, + violation_service, + ) await ladder_service.initialize() yield ladder_service await ladder_service.shutdown() diff --git a/tests/unit_tests/test_ladder_service.py b/tests/unit_tests/test_ladder_service.py index 406602197..f258a766f 100644 --- a/tests/unit_tests/test_ladder_service.py +++ b/tests/unit_tests/test_ladder_service.py @@ -21,8 +21,20 @@ from .strategies import st_players -async def test_queue_initialization(database, game_service, violation_service): - ladder_service = LadderService(database, game_service, violation_service) +async def test_queue_initialization( + database, + game_service, + message_queue_service, + player_service, + violation_service, +): + ladder_service = LadderService( + database, + game_service, + message_queue_service, + player_service, + violation_service, + ) def make_mock_queue(*args, **kwargs): queue = mock.create_autospec(MatchmakerQueue)