diff --git a/pyproject.toml b/pyproject.toml index da64feac..b4107988 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = ["websockets>=11.0", # NOTE: Same for pinocchio (pin) "pin==2.7.0", "tilburg-hand", + "digit-interface", ] readme = "README.md" maintainers = [ diff --git a/python/examples/env_cartesian_control_digit.py b/python/examples/env_cartesian_control_digit.py new file mode 100644 index 00000000..c9e5e3a3 --- /dev/null +++ b/python/examples/env_cartesian_control_digit.py @@ -0,0 +1,99 @@ +import logging + +from rcs._core.common import RobotPlatform +from rcs.control.fr3_desk import FCI, ContextManager, Desk, load_creds_fr3_desk +from rcs.envs.base import ControlMode, RelativeTo +from rcs.envs.creators import RCSFR3EnvCreator, RCSSimEnvCreator +from rcs.envs.utils import ( + default_digit, + default_fr3_hw_gripper_cfg, + default_fr3_hw_robot_cfg, + default_fr3_sim_gripper_cfg, + default_fr3_sim_robot_cfg, + default_mujoco_cameraset_cfg, +) + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +ROBOT_IP = "192.168.101.1" +ROBOT_INSTANCE = RobotPlatform.SIMULATION + + +""" +Create a .env file in the same directory as this file with the following content: +FR3_USERNAME= +FR3_PASSWORD= + +When you use a real FR3 you first need to unlock its joints using the following cli script: + +python -m rcs fr3 unlock + +or put it into guiding mode using: + +python -m rcs fr3 guiding-mode + +When you are done you lock it again using: + +python -m rcs fr3 lock + +or even shut it down using: + +python -m rcs fr3 shutdown +""" + + +def main(): + resource_manger: FCI | ContextManager + if ROBOT_INSTANCE == RobotPlatform.HARDWARE: + user, pw = load_creds_fr3_desk() + resource_manger = FCI(Desk(ROBOT_IP, user, pw), unlock=False, lock_when_done=False) + else: + resource_manger = ContextManager() + with resource_manger: + if ROBOT_INSTANCE == RobotPlatform.HARDWARE: + env_rel = RCSFR3EnvCreator()( + ip=ROBOT_IP, + control_mode=ControlMode.CARTESIAN_TQuat, + robot_cfg=default_fr3_hw_robot_cfg(), + collision_guard="lab", + gripper_cfg=default_fr3_hw_gripper_cfg(), + digit_set=default_digit({"digit_0": "D21182"}), + max_relative_movement=0.5, + relative_to=RelativeTo.LAST_STEP, + ) + else: + env_rel = RCSSimEnvCreator()( + control_mode=ControlMode.CARTESIAN_TQuat, + robot_cfg=default_fr3_sim_robot_cfg(), + collision_guard=False, + gripper_cfg=default_fr3_sim_gripper_cfg(), + camera_set_cfg=default_mujoco_cameraset_cfg(), + max_relative_movement=0.5, + relative_to=RelativeTo.LAST_STEP, + ) + env_rel.get_wrapper_attr("sim").open_gui() + + env_rel.reset() + print(env_rel.unwrapped.robot.get_cartesian_position()) # type: ignore + + for _ in range(10): + for _ in range(10): + # move 1cm in x direction (forward) and close gripper + act = {"tquat": [0.01, 0, 0, 0, 0, 0, 1], "gripper": 0} + obs, reward, terminated, truncated, info = env_rel.step(act) + if truncated or terminated: + logger.info("Truncated or terminated!") + return + for _ in range(10): + # move 1cm in negative x direction (backward) and open gripper + act = {"tquat": [-0.01, 0, 0, 0, 0, 0, 1], "gripper": 1} + obs, reward, terminated, truncated, info = env_rel.step(act) + if truncated or terminated: + logger.info("Truncated or terminated!") + return + env_rel.close() + + +if __name__ == "__main__": + main() diff --git a/python/rcs/camera/digit_cam.py b/python/rcs/camera/digit_cam.py new file mode 100644 index 00000000..38bcc0ef --- /dev/null +++ b/python/rcs/camera/digit_cam.py @@ -0,0 +1,50 @@ +from digit_interface.digit import Digit +from rcs.camera.hw import BaseHardwareCameraSet, HWCameraSetConfig +from rcs.camera.interface import CameraFrame, DataFrame, Frame + + +class DigitConfig(HWCameraSetConfig): + """ + Configuration for the DIGIT device. + This class is used to define the settings for the DIGIT device. + """ + + stream_name: str = "QVGA" # options: "QVGA" (60 and 30 fps), "VGA" (30 and 15 fps) + + +class DigitCam(BaseHardwareCameraSet): + """ + This module provides an interface to interact with the DIGIT device. + It allows for connecting to the device, changing settings, and retrieving information. + """ + + def __init__(self, cfg: DigitConfig): + self._cfg = cfg + super().__init__() + self._cameras: dict[str, Digit] = {} + self.initalize(self.config) + + def initalize(self, cfg: HWCameraSetConfig): + """ + Initialize the digit interface with the given configuration. + :param cfg: Configuration for the DIGIT device. + """ + for name, serial in cfg.name_to_identifier.items(): + digit = Digit(serial, name) + digit.connect() + self._cameras[name] = digit + + def _poll_frame(self, camera_name: str) -> Frame: + """Polls the frame from the camera with the given name.""" + digit = self._cameras[camera_name] + frame = digit.get_frame() + color = DataFrame(data=frame) + # rgb to bgr as expected by opencv + # color = DataFrame(data=frame[:, :, ::-1]) + cf = CameraFrame(color=color) + + return Frame(camera=cf) + + @property + def config(self) -> DigitConfig: + return self._cfg diff --git a/python/rcs/envs/base.py b/python/rcs/envs/base.py index c0fa8794..7733ec41 100644 --- a/python/rcs/envs/base.py +++ b/python/rcs/envs/base.py @@ -137,6 +137,23 @@ class CameraDictType(RCSpaceType): ] +class DigitCameraDictType(RCSpaceType): + digit_frames: dict[ + Annotated[str, "camera_names"], + Annotated[ + np.ndarray, + # needs to be filled with values downstream + lambda height, width, color_dim=3, dtype=np.uint8, low=0, high=255: gym.spaces.Box( + low=low, + high=high, + shape=(height, width, color_dim), + dtype=dtype, + ), + "digit_frames", + ], + ] + + # joining works with inheritance but need to inherit from protocol again class ArmObsType(TQuatDictType, JointsDictType, TRPYDictType): ... @@ -646,6 +663,54 @@ def close(self): super().close() +class DigitCameraSetWrapper(CameraSetWrapper): + """Wrapper for digit cameras.""" + + def __init__(self, env, camera_set: BaseCameraSet): + super().__init__(env, camera_set) + # self.unwrapped: FR3Env + self.camera_set = camera_set + self.observation_space: gym.spaces.Dict + # rgb is always included + params: dict = { + "digit_frames": { + "height": camera_set.config.resolution_height, + "width": camera_set.config.resolution_width, + } + } + + self.observation_space.spaces.update( + get_space( + DigitCameraDictType, + child_dict_keys_to_unfold={ + "camera_names": camera_set.camera_names, + }, + params=params, + ).spaces + ) + self.camera_key = get_space_keys(DigitCameraDictType)[0] + + def observation(self, observation: dict, info: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]: + observation = copy.deepcopy(observation) + info = copy.deepcopy(info) + frameset = self.camera_set.get_latest_frames() + if frameset is None: + observation[self.camera_key] = {} + info["digit_available"] = False + return observation, info + + frame_dict: dict[str, np.ndarray] = { + camera_name: frame.camera.color.data for camera_name, frame in frameset.frames.items() + } + observation[self.camera_key] = frame_dict + + info["digit_available"] = True + if frameset.avg_timestamp is not None: + info["digit_timestamp"] = frameset.avg_timestamp + + return observation, info + + class GripperWrapper(ActObsInfoWrapper): # TODO: sticky gripper, like in aloha diff --git a/python/rcs/envs/creators.py b/python/rcs/envs/creators.py index 9fba6439..8aafc6eb 100644 --- a/python/rcs/envs/creators.py +++ b/python/rcs/envs/creators.py @@ -9,11 +9,13 @@ from gymnasium.envs.registration import EnvCreator from rcs import sim from rcs._core.sim import CameraType +from rcs.camera.digit_cam import DigitCam from rcs.camera.hw import BaseHardwareCameraSet from rcs.camera.sim import SimCameraConfig, SimCameraSet, SimCameraSetConfig from rcs.envs.base import ( CameraSetWrapper, ControlMode, + DigitCameraSetWrapper, GripperWrapper, HandWrapper, MultiRobotWrapper, @@ -59,6 +61,7 @@ def __call__( # type: ignore collision_guard: str | PathLike | None = None, gripper_cfg: rcs.hw.FHConfig | rcs.hand.tilburg_hand.THConfig | None = None, camera_set: BaseHardwareCameraSet | None = None, + digit_set: DigitCam | None = None, max_relative_movement: float | tuple[float, float] | None = None, relative_to: RelativeTo = RelativeTo.LAST_STEP, urdf_path: str | PathLike | None = None, @@ -106,6 +109,12 @@ def __call__( # type: ignore logger.info("CameraSet started") env = CameraSetWrapper(env, camera_set) + if digit_set is not None: + digit_set.start() + digit_set.wait_for_frames() + logger.info("DigitCameraSet started") + env = DigitCameraSetWrapper(env, digit_set) + if collision_guard is not None: assert urdf_path is not None env = CollisionGuard.env_from_xml_paths( diff --git a/python/rcs/envs/utils.py b/python/rcs/envs/utils.py index fcd61bfa..3e9bb9b0 100644 --- a/python/rcs/envs/utils.py +++ b/python/rcs/envs/utils.py @@ -5,9 +5,11 @@ import mujoco as mj import numpy as np import rcs +from digit_interface import Digit from rcs import sim from rcs._core.hw import FR3Config, IKSolver from rcs._core.sim import CameraType +from rcs.camera.digit_cam import DigitCam, DigitConfig from rcs.camera.interface import BaseCameraConfig from rcs.camera.realsense import RealSenseCameraSet, RealSenseSetConfig from rcs.camera.sim import SimCameraConfig, SimCameraSetConfig @@ -73,6 +75,21 @@ def default_fr3_sim_gripper_cfg(idx: str = "0") -> sim.SimGripperConfig: return cfg +def default_digit(name2id: dict[str, str] | None, stream_name: str = "QVGA") -> DigitCam | None: + if name2id is None: + return None + cameras = {name: BaseCameraConfig(identifier=id) for name, id in name2id.items()} + stream_dict = Digit.STREAMS[stream_name] + digit_cfg = DigitConfig( + cameras=cameras, + resolution_height=stream_dict["resolution"]["height"], + resolution_width=stream_dict["resolution"]["width"], + stream_name=stream_name, + frame_rate=stream_dict["fps"]["30fps"], + ) + return DigitCam(digit_cfg) + + def default_mujoco_cameraset_cfg() -> SimCameraSetConfig: cameras = { "wrist": SimCameraConfig(identifier="wrist_0", type=int(CameraType.fixed)),