diff --git a/.gitignore b/.gitignore index 13c0b2fbe..19aa37977 100755 --- a/.gitignore +++ b/.gitignore @@ -210,8 +210,14 @@ ModelManifest.xml # Python *.pyc -# Thing specific to Project Malmo +# Things specific to Project Malmo +.vscode Schemas/xs3p.xsl Minecraft/run/config/malmomodCLIENT.cfg Minecraft/Minecraft_Client.launch - +Minecraft/run +.minecraft +.minecraftserver +# Specific files generated by Malmo installation +/Minecraft/src/main/resources/schemas.index +/Minecraft/src/main/resources/version.properties diff --git a/MalmoEnv/malmoenv/core.py b/MalmoEnv/malmoenv/core.py index cc1899ff5..ae32ffc86 100644 --- a/MalmoEnv/malmoenv/core.py +++ b/MalmoEnv/malmoenv/core.py @@ -85,6 +85,7 @@ def __init__(self, reshape=False): self.action_space = None self.observation_space = None self.metadata = {'render.modes': ['rgb_array']} + self.reward_range = (-float('inf'), float('inf')) self.xml = None self.integratedServerPort = 0 self.role = 0 @@ -348,6 +349,8 @@ def step(self, action): obs = obs.reshape((self.height, self.width, self.depth)).astype(np.uint8) self.last_obs = obs + # RLlib requires info be returned as a dict rather than a string + info = { "raw_info": info } return obs, reward, self.done, info def close(self): diff --git a/MalmoEnv/malmoenv/turnbasedmultiagentenv.py b/MalmoEnv/malmoenv/turnbasedmultiagentenv.py new file mode 100644 index 000000000..208076084 --- /dev/null +++ b/MalmoEnv/malmoenv/turnbasedmultiagentenv.py @@ -0,0 +1,327 @@ +# ------------------------------------------------------------------------------------------------ +# Copyright (c) 2020 Microsoft Corporation +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and +# associated documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, distribute, +# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or +# substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT +# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# ------------------------------------------------------------------------------------------------ +import time +from threading import Thread +from lxml import etree +from ray.rllib.env.multi_agent_env import MultiAgentEnv +import malmoenv +from malmoenv.core import EnvException + +STEP_DELAY_TIME = 0.15 + +def _validate_config(xml, agent_configs): + """ + Verify that the supplied agent config is compatible with the mission XML. + """ + assert len(agent_configs) >= 2 + xml = etree.fromstring(xml) + xml_agent_count = len(xml.findall("{http://ProjectMalmo.microsoft.com}AgentSection")) + assert len(agent_configs) == xml_agent_count + +def _parse_address(address): + """ + Take addresses of various forms and convert them to a tuple of the form (HOST, PORT). + """ + + if isinstance(address, int): + # Only a port number provided + return ("127.0.0.1", address) + + if isinstance(address, str): + parts = address.split(":") + if len(parts) == 1: + # Port number as a string + return ("127.0.0.1", int(parts[0])) + if len(parts) == 2: + # String in the form "HOST:PORT" + return (parts[0], int(parts[1])) + + if len(address) == 2 and isinstance(address[0], str) and isinstance(address[1], int): + # An already parsed address + return address + + raise EnvException(f"{address} is not a valid address") + +def _await_results(results): + """ + Receives a dictionary of result tasks and repopulates it with the final results after the tasks + complete. + """ + for agent_id, task in results.items(): + results[agent_id] = task.wait() + +def _default_env_factory(agent_id, xml, role, host_address, host_port, command_address, command_port): + """ + Default environment factory that fills out just enough settings to connect multiple game + instances into a single game session. + agent_id - The agent we're constructing the environment connection for. + xml - The mission XML. + role - The agent's role number. 0 == host agent. + host_address, host_port - Connection details for the game session host. + command_address, command_port - Connection details for the game instance the agent is controlling. + """ + env = malmoenv.make() + env.init(xml, host_port, + server=host_address, + server2=command_address, + port2=command_port, + role=role, + exp_uid="default_experiment_id" + ) + return env + +def _default_all_done_checker(env, obs, rewards, dones, infos): + """ + Returns True if any agent is reported as done. + """ + for done in dones.values(): + if done: + return True + return False + +# Wraps a MalmoEnv instance and provides async reset and sync step operations +# Reset operations need to be executed async as none of the connected environments will complete +# their reset operations until all environments have at least issued a reset request. +class _ConnectionContext: + def __init__(self, id, address, env): + """ + Wrapper around a connection to a game instance. + id - The agent id that is in control of the game instance. + address - (server, port) tuple for the command connection. + env - The MalmoEnv instance that is connected to the game instance. + """ + self.id = id + self.address = address + self.env = env + self.last_observation = None + + # Async task status tracking + self._task_thread = None + self._task_result = None + + def wait(self): + """ + Wait for the current async task to complete and return the result. + """ + assert self._task_thread is not None + self._task_thread.join() + self._task_thread = None + + # We want to re-trow the exception if the task raised an error + if isinstance(self._task_result, Exception): + raise self._task_result + + return self._task_result + + def reset(self): + """ + Issue a reset request and return the async task immediately. + """ + assert self._task_thread is None + self._task_thread = Thread(target=self._reset_task, name=f"Agent '{self.id}' reset") + self._task_thread.start() + return self + + def _reset_task(self): + try: + self._task_result = self.last_observation = self.env.reset() + except Exception as e: + self._task_result = e + + def step(self, action): + """ + Issue a step request and return the async task immediately. + """ + self.last_observation, r, d, i = self.env.step(action) + return self.last_observation, r, d, i + + def close(self): + """ + Shut down the Minecraft instance. + """ + self.env.close() + +# Config for a single agent that will be present within the environment +class AgentConfig: + def __init__(self, id, address): + """ + Configuration details for an agent acting within the environment. + id - The agent's id as used by RLlib. + address - The address for the game instance for the agent to connect to. + """ + self.id = id + self.address = _parse_address(address) + +# RLlib compatible multi-agent environment. +# This wraps multiple instances of MalmoEnv environments that are connected to their own Minecraft +# instances. +# The first agent defined in the agent_configs is treated as the primary Minecraft instance that +# will act as the game server. +class TurnBasedRllibMultiAgentEnv(MultiAgentEnv): + def __init__(self, xml, agent_configs, env_factory=None, all_done_checker=None): + """ + An RLlib compatible multi-agent environment. + NOTE: Will not work with turn based actions as all agent act together. + xml - The mission XML + agent_configs - A list of AgentConfigs to decribe the agents within the environment. + env_factory - Function to allow custom construction of the MalmoEnv instances. + This can be used to override the default inti parameter for the environment. + all_done_checker - Function to check if the "__all__" key should be set in the step done + dictionary. The default check returns True if any agent reports that + they're done. + """ + _validate_config(xml, agent_configs) + + self._all_done_checker = all_done_checker or _default_all_done_checker + env_factory = env_factory or _default_env_factory + + # The first agent is treated as the game session host + host_address = agent_configs[0].address + self._id = host_address + self._connections = {} + self._reset_request_time = 0 + self._step = 0 + + role = 0 + for agent_config in agent_configs: + env = env_factory( + agent_id=agent_config.id, + xml=xml, + role=role, + host_address=host_address[0], + host_port=host_address[1], + command_address=agent_config.address[0], + command_port=agent_config.address[1] + ) + context = _ConnectionContext( + agent_config.id, + agent_config.address, + env + ) + self._connections[agent_config.id] = context + role += 1 + + + def get_observation_space(self, agent_id): + return self._connections[agent_id].env.observation_space + + def get_action_space(self, agent_id): + return self._connections[agent_id].env.action_space + + def reset(self): + self._step = 0 + obs = {} + request_time = time.perf_counter() + for agent_id, connection in self._connections.items(): + obs[agent_id] = connection.reset() + + # All reset operations must be issued asynchronously as none of the Minecraft instances + # will complete their reset requests until all agents have issued a reset request + _await_results(obs) + self._reset_request_time = time.perf_counter() - request_time + + return obs + + def step(self, actions): + self._step += 1 + results = {} + request_time = time.perf_counter() + done = False + + for agent_id, action in actions.items(): + if not done: + # We need to wait a small amount of time between each agent's step request to give + # the Minecraft instances time to sync up and agree whose turn to act it is + time.sleep(STEP_DELAY_TIME) + o, r, done, i = self._connections[agent_id].step(action) + else: + # If any of the agents report themselves as "done", then we should stop taking turns + # so generate a dummy step result based on the last observation so that training + # receives valid looking data + o = self._connections[agent_id].last_observation + r = 0.0 + i = {} + + assert self._connections[agent_id].env.observation_space.contains(o), f"Shape={o.shape}" + results[agent_id] = (o, r, done, i) + + request_time = time.perf_counter() - request_time + + # We need to repack the individual step results into dictionaries per data type to conform + # with RLlib's requirements + obs = { + agent_id: result[0] + for agent_id, result in results.items() + } + rewards = { + agent_id: result[1] + for agent_id, result in results.items() + } + dones = { + agent_id: result[2] + for agent_id, result in results.items() + } + infos = { + agent_id: result[3] + for agent_id, result in results.items() + } + + # Pass the results to the done checker to set the required __all__ value + dones["__all__"] = self._all_done_checker(self, obs, rewards, dones, infos) + + return obs, rewards, dones, infos + + def close(self): + for connection in self._connections.values(): + try: + connection.close() + except Exception as e: + message = getattr(e, "message", e) + print(f"Error closing environment: {message}") + + +# As Malmo returns stale observations for actions, this wrapper can be used to sync observations +# and actions by issuing an idle action after the policy generated action to query the resultant +# state of the environment +class SyncRllibMultiAgentEnv(MultiAgentEnv): + def __init__(self, env, idle_action): + self.env = env + self.idle_action = idle_action + + def reset(self): + return self.env.reset() + + def step(self, actions): + # The first step request to the environment returns stale data, so we want to ignore it + # unless Malmo reports one of the instances as "done" + o, r, d, i = self.env.step(actions) + for done in d.values(): + if done: + return o, r, d, i + + # The second step request is really just a query for the environment state. When used with + # the turn based environment, there is a delay injected before the requests which allows + # the environment to settle into the new state + return self.env.step({ + key: self.idle_action + for key in actions + }) + + def close(self): + return self.env.close() diff --git a/MalmoEnv/missions/rllib_multiagent.xml b/MalmoEnv/missions/rllib_multiagent.xml new file mode 100644 index 000000000..5f8f68865 --- /dev/null +++ b/MalmoEnv/missions/rllib_multiagent.xml @@ -0,0 +1,126 @@ + + + + Multi-agent Test Goal + + + + 1 + + + + + + clear + false + + + + + + + + + + + + + + + + + + + + + + + + Agent1 + + + + + + + + + 84 + 84 + + + + + + attack + + + + + + + + + + + + + + + + + + + + + + + + + + Agent2 + + + + + + + + + 84 + 84 + + + + + + attack + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/MalmoEnv/rllib_train.py b/MalmoEnv/rllib_train.py new file mode 100644 index 000000000..a8f20e342 --- /dev/null +++ b/MalmoEnv/rllib_train.py @@ -0,0 +1,179 @@ +# ------------------------------------------------------------------------------------------------ +# Copyright (c) 2020 Microsoft Corporation +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and +# associated documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, distribute, +# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or +# substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT +# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# ------------------------------------------------------------------------------------------------ +import gym +import ray +from ray.rllib.env.atari_wrappers import FrameStack +from ray.tune import register_env, run_experiments +from pathlib import Path +import malmoenv +from malmoenv.turnbasedmultiagentenv import AgentConfig, TurnBasedRllibMultiAgentEnv, SyncRllibMultiAgentEnv + +SINGLE_AGENT_ENV = "malmo_single_agent" +MULTI_AGENT_ENV = "malmo_multi_agent" +MISSION_XML = "missions/rllib_multiagent.xml" +COMMAND_PORT = 8999 +NUM_ENVIRONMENT_INSTANCES = 2 +FRAME_STACK = 1 + +xml = Path(MISSION_XML).read_text() + +# An environment wrapper to shape rewards and determine episode terminality independently of Malmo +class TrackingEnv(gym.Wrapper): + def __init__(self, env): + super().__init__(env) + self._actions = [ + self._forward, + self._back, + self._turn_right, + self._turn_left, + self._idle + ] + self._step_count = 0 + + def _reset_state(self): + self._facing = (1, 0) + self._position = (0, 0) + self._visited = {} + self._update_visited() + self._step_count = 0 + + def _forward(self): + self._position = ( + self._position[0] + self._facing[0], + self._position[1] + self._facing[1] + ) + + def _back(self): + self._position = ( + self._position[0] - self._facing[0], + self._position[1] - self._facing[1] + ) + + def _turn_left(self): + self._facing = (self._facing[1], -self._facing[0]) + + def _turn_right(self): + self._facing = (-self._facing[1], self._facing[0]) + + def _idle(self): + pass + + def _encode_state(self): + return self._position + + def _update_visited(self): + state = self._encode_state() + value = self._visited.get(state, 0) + self._visited[state] = value + 1 + return value + + def reset(self): + self._reset_state() + return super().reset() + + def step(self, action): + o, r, d, i = super().step(action) + self._actions[action]() + revisit_count = self._update_visited() + if revisit_count == 0: + r += 0.02 + if action == 4: + r += -0.5 + + self._step_count += 1 + if self._step_count == 50: + d = True + elif r < -0.9: + d = True + + return o, r, d, i + + +def env_factory(agent_id, xml, role, host_address, host_port, command_address, command_port): + env = malmoenv.make() + env.init(xml, host_port, + server=host_address, + server2=command_address, + port2=command_port, + role=role, + exp_uid="multiagent", + reshape=True + ) + if FRAME_STACK > 1: + env = FrameStack(env, FRAME_STACK) + env = TrackingEnv(env) + return env + +def create_single_agent_env(config): + port = COMMAND_PORT + config.worker_index + return env_factory("agent0", xml, 0, "127.0.0.1", port, "127.0.0.1", port) + +def create_multi_agent_env(config): + port = COMMAND_PORT + (config.worker_index * 2) + agent_config = [ + AgentConfig(id=f"agent1", address=port-1), + AgentConfig(id=f"agent2", address=port), + ] + env = TurnBasedRllibMultiAgentEnv(xml, agent_config, + env_factory=env_factory, + ) + env = SyncRllibMultiAgentEnv(env, idle_action=4) + return env + + +register_env(SINGLE_AGENT_ENV, create_single_agent_env) +register_env(MULTI_AGENT_ENV, create_multi_agent_env) + +run_experiments({ + "malmo": { + "run": "IMPALA", + "env": MULTI_AGENT_ENV, + "config": { + "model": { + "dim": 42 + }, + "num_workers": NUM_ENVIRONMENT_INSTANCES, + "num_gpus": 0, + "rollout_fragment_length": 50, + "train_batch_size": 1024, + "replay_buffer_num_slots": 4000, + "replay_proportion": 10, + "learner_queue_timeout": 900, + "num_sgd_iter": 2, + "num_data_loader_buffers": 1, + + "exploration_config": { + "type": "EpsilonGreedy", + "initial_epsilon": 1.0, + "final_epsilon": 0.02, + "epsilon_timesteps": 7000 + }, + + "multiagent": { + "policies": { "shared_policy": ( + None, + gym.spaces.Box(0, 255, shape=(84, 84, 3 * FRAME_STACK)), + gym.spaces.Discrete(5), + {} + )}, + "policy_mapping_fn": (lambda agent_id: "shared_policy") + } + } + } +}) \ No newline at end of file diff --git a/MalmoEnv/utils/launcher.py b/MalmoEnv/utils/launcher.py new file mode 100644 index 000000000..65d165492 --- /dev/null +++ b/MalmoEnv/utils/launcher.py @@ -0,0 +1,147 @@ +# ------------------------------------------------------------------------------------------------ +# Copyright (c) 2020 Microsoft Corporation +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and +# associated documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, distribute, +# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or +# substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT +# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# ------------------------------------------------------------------------------------------------ +from collections.abc import Iterable +import socket +import subprocess +import sys +import time + + # This script captures the Minecraft output into file called out.txt +DEFAULT_SCRIPT = "./launchClient_quiet.sh" + +def launch_minecraft(ports, launch_script=DEFAULT_SCRIPT, keep_alive=False): + """ + Launch Minecraft instances in the background. + Function will block until all instances are ready to receive commands. + ports - List of ports you want the instances to listen on for commands + launch_script - Script to launch Minecraft. Default is ./launchClient_quiet.sh + keep_alive - Automatically restart Minecraft instances if they exit + """ + ports_collection = ports + if not isinstance(ports_collection, Iterable): + ports_collection = [ports_collection] + + minecraft_instances = [] + for port in ports_collection: + args = [ + sys.executable, __file__, + "--script", launch_script, + "--port", str(port) + ] + if keep_alive: + args.append("--keepalive") + + proc = subprocess.Popen(args, + stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + minecraft_instances.append(proc) + + await_instances([ + ("127.0.0.1", int(port)) + for port in ports_collection + ]) + + # Determine if we need to return a collection or a single item based on the type passed for + # ports initially + if isinstance(ports, Iterable): + return minecraft_instances + return minecraft_instances[0] + +def await_instances(end_points): + """ + Wait until the specified enpoints are all actively listening for connections. + end_points - List of addresses made up of tuples of the form (HOST, PORT) + """ + print(f"Waiting for {len(end_points)} instances...") + + while True: + try: + for end_point in end_points: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.settimeout(10) + s.connect(end_point) + s.close() + + print("Finished waiting for instances") + return + + except (ConnectionError, socket.timeout): + # If we fail to connect, most likely the instance isn't running yet + time.sleep(5) + + +################################################################################################### +# The remainder of this file contains code for when this script is invoked directly rather than +# imported into another script. +# This is used to directly set up and launch the Minecraft process +################################################################################################### + +def _parse_args(): + # Import locally so that we're not paying the import cost when they're not used + import argparse + + parser = argparse.ArgumentParser(description="Malmo Launcher") + parser.add_argument("--script", type=str, default=DEFAULT_SCRIPT, help="Script to launch Minecraft") + parser.add_argument("--port", type=int, nargs="+", help="Command ports for Minecraft instances", required=True) + parser.add_argument("--keepalive", action="store_true", default=False, help="Relaunch the Minecraft instance if it exits") + + return parser.parse_args() + +def _exec(*args): + proc = subprocess.Popen(args) + proc.communicate() + return proc.returncode + +def _launch_minecraft_direct(launch_script, port, keep_alive): + # Import locally so that we're not paying the import cost when this script is imported as a module + import os + import pathlib + import shutil + import tempfile + + # Make a copy of Minecraft into a unique temp directory as it's not possible to run multiple + # instances from a single Minecraft directory + target_dir = tempfile.mkdtemp(prefix="malmo_") + "/malmo" + source_dir = str(pathlib.Path(__file__).parent.absolute()) + "/../.." + print(f"Cloning {source_dir} into {target_dir}...") + shutil.copytree(source_dir, target_dir) + + # Launch Minecraft using the specified script + print(f"Launching Minecraft using {launch_script} with command port {port}...") + os.chdir(target_dir + "/Minecraft") + + spawn = True + while spawn: + rc = _exec(launch_script, str(port)) + spawn = keep_alive + + print(f"Exit code: {rc}") + +if __name__ == '__main__': + args = _parse_args() + + if len(args.port) == 1: + # If only a single port is specified, launch Minecraft directly + _launch_minecraft_direct(args.script, args.port[0], args.keepalive) + else: + # If multiple ports are specified, launch each Minecraft instance in a new child process + instances = launch_minecraft(args.port, launch_script=args.script, keep_alive=args.keepalive) + print("Waiting for all instances to exit...") + for instance in instances: + instance.communicate() \ No newline at end of file diff --git a/Minecraft/build.gradle b/Minecraft/build.gradle index 131f046da..d1006835f 100755 --- a/Minecraft/build.gradle +++ b/Minecraft/build.gradle @@ -15,7 +15,11 @@ apply plugin: 'net.minecraftforge.gradle.forge' // Read the version number from the Mod's version properties file. if (!file('src/main/resources/version.properties').exists()) { - ant.fail("version.properties file is missing - this is created automatically by CMake. If you are building from source, make sure you have built the full source tree, not just the Minecraft folder.") + // if version.properties doesn't exists make it from VERSION + File vers_file = new File("../VERSION") + version = vers_file.getText() + File out_file = new File("src/main/resources/version.properties") + out_file.write "malmomod.version=" + version } def propFile = file('src/main/resources/version.properties') def versionProp = new Properties() diff --git a/Minecraft/launchClient_quiet.sh b/Minecraft/launchClient_quiet.sh new file mode 100755 index 000000000..2930d00c7 --- /dev/null +++ b/Minecraft/launchClient_quiet.sh @@ -0,0 +1,2 @@ +#! /bin/bash +./launchClient.sh -port $1 -env > ../out.txt 2>&1