Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies = ["websockets>=11.0",
# NOTE: Same for pinocchio (pin)
"pin==2.7.0",
"tilburg-hand",
"digit-interface",
]
readme = "README.md"
maintainers = [
Expand Down
99 changes: 99 additions & 0 deletions python/examples/env_cartesian_control_digit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import logging

from rcs._core.common import RobotPlatform
from rcs.control.fr3_desk import FCI, ContextManager, Desk, load_creds_fr3_desk
from rcs.envs.base import ControlMode, RelativeTo
from rcs.envs.creators import RCSFR3EnvCreator, RCSSimEnvCreator
from rcs.envs.utils import (
default_digit,
default_fr3_hw_gripper_cfg,
default_fr3_hw_robot_cfg,
default_fr3_sim_gripper_cfg,
default_fr3_sim_robot_cfg,
default_mujoco_cameraset_cfg,
)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

ROBOT_IP = "192.168.101.1"
ROBOT_INSTANCE = RobotPlatform.SIMULATION


"""
Create a .env file in the same directory as this file with the following content:
FR3_USERNAME=<username on franka desk>
FR3_PASSWORD=<password on franka desk>

When you use a real FR3 you first need to unlock its joints using the following cli script:

python -m rcs fr3 unlock <ip>

or put it into guiding mode using:

python -m rcs fr3 guiding-mode <ip>

When you are done you lock it again using:

python -m rcs fr3 lock <ip>

or even shut it down using:

python -m rcs fr3 shutdown <ip>
"""


def main():
resource_manger: FCI | ContextManager
if ROBOT_INSTANCE == RobotPlatform.HARDWARE:
user, pw = load_creds_fr3_desk()
resource_manger = FCI(Desk(ROBOT_IP, user, pw), unlock=False, lock_when_done=False)
else:
resource_manger = ContextManager()
with resource_manger:
if ROBOT_INSTANCE == RobotPlatform.HARDWARE:
env_rel = RCSFR3EnvCreator()(
ip=ROBOT_IP,
control_mode=ControlMode.CARTESIAN_TQuat,
robot_cfg=default_fr3_hw_robot_cfg(),
collision_guard="lab",
gripper_cfg=default_fr3_hw_gripper_cfg(),
digit_set=default_digit({"digit_0": "D21182"}),
max_relative_movement=0.5,
relative_to=RelativeTo.LAST_STEP,
)
else:
env_rel = RCSSimEnvCreator()(
control_mode=ControlMode.CARTESIAN_TQuat,
robot_cfg=default_fr3_sim_robot_cfg(),
collision_guard=False,
gripper_cfg=default_fr3_sim_gripper_cfg(),
camera_set_cfg=default_mujoco_cameraset_cfg(),
max_relative_movement=0.5,
relative_to=RelativeTo.LAST_STEP,
)
env_rel.get_wrapper_attr("sim").open_gui()

env_rel.reset()
print(env_rel.unwrapped.robot.get_cartesian_position()) # type: ignore

for _ in range(10):
for _ in range(10):
# move 1cm in x direction (forward) and close gripper
act = {"tquat": [0.01, 0, 0, 0, 0, 0, 1], "gripper": 0}
obs, reward, terminated, truncated, info = env_rel.step(act)
if truncated or terminated:
logger.info("Truncated or terminated!")
return
for _ in range(10):
# move 1cm in negative x direction (backward) and open gripper
act = {"tquat": [-0.01, 0, 0, 0, 0, 0, 1], "gripper": 1}
obs, reward, terminated, truncated, info = env_rel.step(act)
if truncated or terminated:
logger.info("Truncated or terminated!")
return
env_rel.close()


if __name__ == "__main__":
main()
50 changes: 50 additions & 0 deletions python/rcs/camera/digit_cam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from digit_interface.digit import Digit
from rcs.camera.hw import BaseHardwareCameraSet, HWCameraSetConfig
from rcs.camera.interface import CameraFrame, DataFrame, Frame


class DigitConfig(HWCameraSetConfig):
"""
Configuration for the DIGIT device.
This class is used to define the settings for the DIGIT device.
"""

stream_name: str = "QVGA" # options: "QVGA" (60 and 30 fps), "VGA" (30 and 15 fps)


class DigitCam(BaseHardwareCameraSet):
"""
This module provides an interface to interact with the DIGIT device.
It allows for connecting to the device, changing settings, and retrieving information.
"""

def __init__(self, cfg: DigitConfig):
self._cfg = cfg
super().__init__()
self._cameras: dict[str, Digit] = {}
self.initalize(self.config)

def initalize(self, cfg: HWCameraSetConfig):
"""
Initialize the digit interface with the given configuration.
:param cfg: Configuration for the DIGIT device.
"""
for name, serial in cfg.name_to_identifier.items():
digit = Digit(serial, name)
digit.connect()
self._cameras[name] = digit

def _poll_frame(self, camera_name: str) -> Frame:
"""Polls the frame from the camera with the given name."""
digit = self._cameras[camera_name]
frame = digit.get_frame()
color = DataFrame(data=frame)
# rgb to bgr as expected by opencv
# color = DataFrame(data=frame[:, :, ::-1])
cf = CameraFrame(color=color)

return Frame(camera=cf)

@property
def config(self) -> DigitConfig:
return self._cfg
65 changes: 65 additions & 0 deletions python/rcs/envs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,23 @@ class CameraDictType(RCSpaceType):
]


class DigitCameraDictType(RCSpaceType):
digit_frames: dict[
Annotated[str, "camera_names"],
Annotated[
np.ndarray,
# needs to be filled with values downstream
lambda height, width, color_dim=3, dtype=np.uint8, low=0, high=255: gym.spaces.Box(
low=low,
high=high,
shape=(height, width, color_dim),
dtype=dtype,
),
"digit_frames",
],
]


# joining works with inheritance but need to inherit from protocol again
class ArmObsType(TQuatDictType, JointsDictType, TRPYDictType): ...

Expand Down Expand Up @@ -646,6 +663,54 @@ def close(self):
super().close()


class DigitCameraSetWrapper(CameraSetWrapper):
"""Wrapper for digit cameras."""

def __init__(self, env, camera_set: BaseCameraSet):
super().__init__(env, camera_set)
# self.unwrapped: FR3Env
self.camera_set = camera_set
self.observation_space: gym.spaces.Dict
# rgb is always included
params: dict = {
"digit_frames": {
"height": camera_set.config.resolution_height,
"width": camera_set.config.resolution_width,
}
}

self.observation_space.spaces.update(
get_space(
DigitCameraDictType,
child_dict_keys_to_unfold={
"camera_names": camera_set.camera_names,
},
params=params,
).spaces
)
self.camera_key = get_space_keys(DigitCameraDictType)[0]

def observation(self, observation: dict, info: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
observation = copy.deepcopy(observation)
info = copy.deepcopy(info)
frameset = self.camera_set.get_latest_frames()
if frameset is None:
observation[self.camera_key] = {}
info["digit_available"] = False
return observation, info

frame_dict: dict[str, np.ndarray] = {
camera_name: frame.camera.color.data for camera_name, frame in frameset.frames.items()
}
observation[self.camera_key] = frame_dict

info["digit_available"] = True
if frameset.avg_timestamp is not None:
info["digit_timestamp"] = frameset.avg_timestamp

return observation, info


class GripperWrapper(ActObsInfoWrapper):
# TODO: sticky gripper, like in aloha

Expand Down
9 changes: 9 additions & 0 deletions python/rcs/envs/creators.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
from gymnasium.envs.registration import EnvCreator
from rcs import sim
from rcs._core.sim import CameraType
from rcs.camera.digit_cam import DigitCam
from rcs.camera.hw import BaseHardwareCameraSet
from rcs.camera.sim import SimCameraConfig, SimCameraSet, SimCameraSetConfig
from rcs.envs.base import (
CameraSetWrapper,
ControlMode,
DigitCameraSetWrapper,
GripperWrapper,
HandWrapper,
MultiRobotWrapper,
Expand Down Expand Up @@ -59,6 +61,7 @@ def __call__( # type: ignore
collision_guard: str | PathLike | None = None,
gripper_cfg: rcs.hw.FHConfig | rcs.hand.tilburg_hand.THConfig | None = None,
camera_set: BaseHardwareCameraSet | None = None,
digit_set: DigitCam | None = None,
max_relative_movement: float | tuple[float, float] | None = None,
relative_to: RelativeTo = RelativeTo.LAST_STEP,
urdf_path: str | PathLike | None = None,
Expand Down Expand Up @@ -106,6 +109,12 @@ def __call__( # type: ignore
logger.info("CameraSet started")
env = CameraSetWrapper(env, camera_set)

if digit_set is not None:
digit_set.start()
digit_set.wait_for_frames()
logger.info("DigitCameraSet started")
env = DigitCameraSetWrapper(env, digit_set)

if collision_guard is not None:
assert urdf_path is not None
env = CollisionGuard.env_from_xml_paths(
Expand Down
17 changes: 17 additions & 0 deletions python/rcs/envs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import mujoco as mj
import numpy as np
import rcs
from digit_interface import Digit
from rcs import sim
from rcs._core.hw import FR3Config, IKSolver
from rcs._core.sim import CameraType
from rcs.camera.digit_cam import DigitCam, DigitConfig
from rcs.camera.interface import BaseCameraConfig
from rcs.camera.realsense import RealSenseCameraSet, RealSenseSetConfig
from rcs.camera.sim import SimCameraConfig, SimCameraSetConfig
Expand Down Expand Up @@ -73,6 +75,21 @@ def default_fr3_sim_gripper_cfg(idx: str = "0") -> sim.SimGripperConfig:
return cfg


def default_digit(name2id: dict[str, str] | None, stream_name: str = "QVGA") -> DigitCam | None:
if name2id is None:
return None
cameras = {name: BaseCameraConfig(identifier=id) for name, id in name2id.items()}
stream_dict = Digit.STREAMS[stream_name]
digit_cfg = DigitConfig(
cameras=cameras,
resolution_height=stream_dict["resolution"]["height"],
resolution_width=stream_dict["resolution"]["width"],
stream_name=stream_name,
frame_rate=stream_dict["fps"]["30fps"],
)
return DigitCam(digit_cfg)


def default_mujoco_cameraset_cfg() -> SimCameraSetConfig:
cameras = {
"wrist": SimCameraConfig(identifier="wrist_0", type=int(CameraType.fixed)),
Expand Down