11import copy
2+ import datetime
23import logging
34import threading
45import typing
5- from dataclasses import dataclass
66
7+ import cv2
78import numpy as np
89from rcs .camera .hw import CalibrationStrategy , DummyCalibrationStrategy , HardwareCamera
9- from rcs .camera .interface import BaseCameraSet , DataFrame , CameraFrame , Frame
10- import cv2
10+ from rcs .camera .interface import CameraFrame , DataFrame , Frame
11+
1112from rcs import common
12- import datetime
13- '''
13+
14+ """
1415A generic extension class for handling USB-connected cameras.
1516Uses OpenCV to interface with the camera hardware, specifically using cv2.VideoCapture(id).
1617The ID can be both a single integer passed as a string, i.e. str(0), str(1), or the full /dev/ path, like /dev/video0.
1718
18- '''
19+ """
20+
21+
1922class USBCameraConfig (common .BaseCameraConfig ):
2023 color_intrinsics : np .ndarray [tuple [typing .Literal [3 ], typing .Literal [4 ]], np .dtype [np .float64 ]] | None = None
2124 distortion_coeffs : np .ndarray [tuple [typing .Literal [5 ]], np .dtype [np .float64 ]] | None = None
2225
26+
2327class USBCameraSet (HardwareCamera ):
24- def __init__ (self ,
25- cameras : dict [str , USBCameraConfig ],
26- calibration_strategy : dict [str , CalibrationStrategy ] | None = None ,):
28+ def __init__ (
29+ self ,
30+ cameras : dict [str , USBCameraConfig ],
31+ calibration_strategy : dict [str , CalibrationStrategy ] | None = None ,
32+ ):
2733 self .cameras = cameras
2834 self .CALIBRATION_FRAME_SIZE = 30
2935 if calibration_strategy is None :
3036 calibration_strategy = {camera_name : DummyCalibrationStrategy () for camera_name in cameras }
31- for name , cam in self .cameras .items ():
37+ for cam in self .cameras .values ():
3238 if cam .color_intrinsics is None :
33- cam .color_intrinsics = np .zeros ((3 , 4 ), dtype = np .float64 )
39+ cam .color_intrinsics = np .zeros ((3 , 4 ), dtype = np .float64 ) # type: ignore
3440 if cam .distortion_coeffs is None :
35- cam .distortion_coeffs = np .zeros ((5 ,), dtype = np .float64 )
41+ cam .distortion_coeffs = np .zeros ((5 ,), dtype = np .float64 ) # type: ignore
3642 if cam .resolution_height is None :
3743 cam .resolution_height = 480
3844 if cam .resolution_width is None :
@@ -44,7 +50,9 @@ def __init__(self,
4450 self ._captures : dict [str , cv2 .VideoCapture ] = {}
4551 self ._logger = logging .getLogger (__name__ )
4652 self ._logger .info ("USBCamera initialized with cameras: %s" , self ._camera_names )
47- self ._logger .info ("If the camera streams are not correct, try v4l2-ctl --list-devices to see the available cameras." )
53+ self ._logger .info (
54+ "If the camera streams are not correct, try v4l2-ctl --list-devices to see the available cameras."
55+ )
4856 self ._frame_buffer_lock : dict [str , threading .Lock ] = {}
4957 self ._frame_buffer : dict [str , list ] = {}
5058
@@ -58,7 +66,8 @@ def open(self):
5866 cap .set (cv2 .CAP_PROP_FPS , camera .frame_rate )
5967
6068 if not cap .isOpened ():
61- raise RuntimeError (f"Could not open camera { name } with id { camera .identifier } " )
69+ msg = f"Could not open camera { name } with id { camera .identifier } "
70+ raise RuntimeError (msg )
6271 self ._captures [name ] = cap
6372
6473 @property
@@ -70,20 +79,27 @@ def poll_frame(self, camera_name: str) -> Frame:
7079 timestamp = datetime .datetime .now ().timestamp ()
7180 ret , color_frame = cap .read ()
7281 if not ret :
73- raise RuntimeError (f"Failed to read frame from camera { camera_name } " )
82+ msg = f"Failed to read frame from camera { camera_name } "
83+ raise RuntimeError (msg )
7484 with self ._frame_buffer_lock [camera_name ]:
7585 if len (self ._frame_buffer [camera_name ]) >= self .CALIBRATION_FRAME_SIZE :
7686 self ._frame_buffer [camera_name ].pop (0 )
7787 self ._frame_buffer [camera_name ].append (copy .deepcopy (color_frame ))
78- color = DataFrame (data = color_frame ,
79- timestamp = timestamp ,
80- intrinsics = self .cameras [camera_name ].color_intrinsics ,
81- extrinsics = self .calibration_strategy [camera_name ].get_extrinsics ())
82- depth_frame = np .zeros ((self .cameras [camera_name ].resolution_height , self .cameras [camera_name ].resolution_width ), dtype = np .uint16 )
83- depth = DataFrame (data = depth_frame ,
84- timestamp = timestamp ,
85- intrinsics = self .cameras [camera_name ].color_intrinsics ,
86- extrinsics = self .calibration_strategy [camera_name ].get_extrinsics ())
88+ color = DataFrame (
89+ data = color_frame ,
90+ timestamp = timestamp ,
91+ intrinsics = self .cameras [camera_name ].color_intrinsics ,
92+ extrinsics = self .calibration_strategy [camera_name ].get_extrinsics (),
93+ )
94+ depth_frame = np .zeros (
95+ (self .cameras [camera_name ].resolution_height , self .cameras [camera_name ].resolution_width ), dtype = np .uint16
96+ )
97+ depth = DataFrame (
98+ data = depth_frame ,
99+ timestamp = timestamp ,
100+ intrinsics = self .cameras [camera_name ].color_intrinsics ,
101+ extrinsics = self .calibration_strategy [camera_name ].get_extrinsics (),
102+ )
87103 cf = CameraFrame (color = color , depth = depth )
88104 return Frame (camera = cf , avg_timestamp = timestamp )
89105
0 commit comments