arflow.core
Data exchanging service.
1"""Data exchanging service.""" 2 3import os 4import pickle 5import time 6import uuid 7from time import gmtime, strftime 8from typing import Dict, List 9 10import numpy as np 11import rerun as rr 12 13from arflow import service_pb2, service_pb2_grpc 14 15sessions: Dict[str, service_pb2.RegisterRequest] = {} 16"""@private""" 17 18 19class ARFlowService(service_pb2_grpc.ARFlowService): 20 """ARFlow gRPC service.""" 21 22 _start_time = time.time_ns() 23 _frame_data: List[Dict[str, float | bytes]] = [] 24 25 def __init__(self, use_visualizer: bool = True) -> None: 26 self.recorder = rr 27 self.use_visualizer = use_visualizer 28 super().__init__() 29 30 def _save_frame_data( 31 self, request: service_pb2.DataFrameRequest | service_pb2.RegisterRequest 32 ): 33 """@private""" 34 time_stamp = (time.time_ns() - self._start_time) / 1e9 35 self._frame_data.append( 36 {"time_stamp": time_stamp, "data": request.SerializeToString()} 37 ) 38 39 def register( 40 self, request: service_pb2.RegisterRequest, context, uid: str | None = None 41 ) -> service_pb2.RegisterResponse: 42 """Register a client.""" 43 44 self._save_frame_data(request) 45 46 # Start processing. 47 if uid is None: 48 uid = str(uuid.uuid4()) 49 50 sessions[uid] = request 51 52 self.recorder.init( 53 f"{request.device_name} - ARFlow", 54 spawn=self.use_visualizer, 55 default_enabled=self.use_visualizer, 56 ) 57 print("Registered a client with UUID: %s" % uid, request) 58 59 # Call the for user extension code. 60 self.on_register(request) 61 62 return service_pb2.RegisterResponse(uid=uid) 63 64 def data_frame( 65 self, 66 request: service_pb2.DataFrameRequest, 67 context, 68 ) -> service_pb2.DataFrameResponse: 69 """Process an incoming frame.""" 70 71 self._save_frame_data(request) 72 73 # Start processing. 74 decoded_data = {} 75 session_configs = sessions[request.uid] 76 77 if session_configs.camera_color.enabled: 78 color_rgb = ARFlowService.decode_rgb_image(session_configs, request.color) 79 decoded_data["image/color_rgb"] = color_rgb 80 color_rgb = np.flipud(color_rgb) 81 self.recorder.log("rgb", rr.Image(color_rgb)) 82 83 if session_configs.camera_depth.enabled: 84 depth_img = ARFlowService.decode_depth_image(session_configs, request.depth) 85 decoded_data["image/depth_img"] = depth_img 86 depth_img = np.flipud(depth_img) 87 self.recorder.log("depth", rr.DepthImage(depth_img, meter=1.0)) 88 89 if session_configs.camera_transform.enabled: 90 self.recorder.log("world/origin", rr.ViewCoordinates.RIGHT_HAND_Y_DOWN) 91 # self.logger.log( 92 # "world/xyz", 93 # rr.Arrows3D( 94 # vectors=[[1, 0, 0], [0, 1, 0], [0, 0, 1]], 95 # colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]], 96 # ), 97 # ) 98 99 transform = ARFlowService.decode_transform(request.transform) 100 decoded_data["transform"] = transform 101 self.recorder.log( 102 "world/camera", 103 self.recorder.Transform3D( 104 mat3x3=transform[:3, :3], translation=transform[:3, 3] 105 ), 106 ) 107 108 k = ARFlowService.decode_intrinsic(session_configs) 109 self.recorder.log("world/camera", rr.Pinhole(image_from_camera=k)) 110 self.recorder.log("world/camera", rr.Image(np.flipud(color_rgb))) 111 112 if session_configs.camera_point_cloud.enabled: 113 pcd, clr = ARFlowService.decode_point_cloud( 114 session_configs, k, color_rgb, depth_img, transform 115 ) 116 decoded_data["point_cloud_pcd"] = pcd 117 decoded_data["point_cloud_clr"] = clr 118 self.recorder.log("world/point_cloud", rr.Points3D(pcd, colors=clr)) 119 120 if session_configs.camera_plane_detection.enabled: 121 pass 122 123 if session_configs.gyroscope.enabled: 124 gyro_data = request.gyroscope 125 126 attitude = rr.Quaternion( 127 xyzw=[ 128 gyro_data.attitude.x, 129 gyro_data.attitude.y, 130 gyro_data.attitude.z, 131 gyro_data.attitude.w, 132 ] 133 ) 134 rotation_rate = rr.datatypes.Vec3D( 135 [ 136 gyro_data.rotation_rate.x, 137 gyro_data.rotation_rate.y, 138 gyro_data.rotation_rate.z, 139 ] 140 ) 141 gravity = rr.datatypes.Vec3D( 142 [gyro_data.gravity.x, gyro_data.gravity.y, gyro_data.gravity.z] 143 ) 144 acceleration = rr.datatypes.Vec3D( 145 [ 146 gyro_data.acceleration.x, 147 gyro_data.acceleration.y, 148 gyro_data.acceleration.z, 149 ] 150 ) 151 152 # Attitute is displayed as a box, and the other acceleration variables are displayed as arrows. 153 rr.log( 154 "rotations/gyroscope/attitude", 155 rr.Boxes3D(half_sizes=[0.5, 0.5, 0.5], quaternions=[attitude]), 156 ) 157 rr.log( 158 "rotations/gyroscope/rotation_rate", 159 rr.Arrows3D(vectors=[rotation_rate], colors=[[0, 255, 0]]), 160 ) 161 rr.log( 162 "rotations/gyroscope/gravity", 163 rr.Arrows3D(vectors=[gravity], colors=[[0, 0, 255]]), 164 ) 165 rr.log( 166 "rotations/gyroscope/acceleration", 167 rr.Arrows3D(vectors=[acceleration], colors=[[255, 255, 0]]), 168 ) 169 170 # Call the for user extension code. 171 self.on_frame_received(decoded_data) 172 173 return service_pb2.DataFrameResponse(message="OK") 174 175 def on_register(self, request: service_pb2.RegisterRequest): 176 """Called when a new device is registered. Override this method to process the data.""" 177 pass 178 179 def on_frame_received(self, frame_data: service_pb2.DataFrameRequest): 180 """Called when a frame is received. Override this method to process the data.""" 181 pass 182 183 def on_program_exit(self, path_to_save: str | None): 184 """Save the data and exit.""" 185 if path_to_save is None: 186 return 187 print("Saving the data...") 188 f_name = strftime("%Y_%m_%d_%H_%M_%S", gmtime()) 189 save_path = os.path.join(path_to_save, f"frames_{f_name}.pkl") 190 with open(save_path, "wb") as f: 191 pickle.dump(self._frame_data, f) 192 193 print(f"Data saved to {save_path}") 194 195 @staticmethod 196 def decode_rgb_image( 197 session_configs: service_pb2.RegisterRequest, buffer: bytes 198 ) -> np.ndarray: 199 # Calculate the size of the image. 200 color_img_w = int( 201 session_configs.camera_intrinsics.resolution_x 202 * session_configs.camera_color.resize_factor_x 203 ) 204 color_img_h = int( 205 session_configs.camera_intrinsics.resolution_y 206 * session_configs.camera_color.resize_factor_y 207 ) 208 p = color_img_w * color_img_h 209 color_img = np.frombuffer(buffer, dtype=np.uint8) 210 211 # Decode RGB bytes into RGB. 212 if session_configs.camera_color.data_type == "RGB24": 213 color_rgb = color_img.reshape((color_img_h, color_img_w, 3)) 214 color_rgb = color_rgb.astype(np.uint8) 215 216 # Decode YCbCr bytes into RGB. 217 elif session_configs.camera_color.data_type == "YCbCr420": 218 y = color_img[:p].reshape((color_img_h, color_img_w)) 219 cbcr = color_img[p:].reshape((color_img_h // 2, color_img_w // 2, 2)) 220 cb, cr = cbcr[:, :, 0], cbcr[:, :, 1] 221 222 # Very important! Convert to float32 first! 223 cb = np.repeat(cb, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128 224 cr = np.repeat(cr, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128 225 226 r = np.clip(y + 1.403 * cr, 0, 255) 227 g = np.clip(y - 0.344 * cb - 0.714 * cr, 0, 255) 228 b = np.clip(y + 1.772 * cb, 0, 255) 229 230 color_rgb = np.stack([r, g, b], axis=-1) 231 color_rgb = color_rgb.astype(np.uint8) 232 233 return color_rgb 234 235 @staticmethod 236 def decode_depth_image( 237 session_configs: service_pb2.RegisterRequest, buffer: bytes 238 ) -> np.ndarray: 239 if session_configs.camera_depth.data_type == "f32": 240 dtype = np.float32 241 elif session_configs.camera_depth.data_type == "u16": 242 dtype = np.uint16 243 else: 244 raise ValueError( 245 f"Unknown depth data type: {session_configs.camera_depth.data_type}" 246 ) 247 248 depth_img = np.frombuffer(buffer, dtype=dtype) 249 depth_img = depth_img.reshape( 250 ( 251 session_configs.camera_depth.resolution_y, 252 session_configs.camera_depth.resolution_x, 253 ) 254 ) 255 256 # 16-bit unsigned integer, describing the depth (distance to an object) in millimeters. 257 if dtype == np.uint16: 258 depth_img = depth_img.astype(np.float32) / 1000.0 259 260 return depth_img 261 262 @staticmethod 263 def decode_transform(buffer: bytes): 264 y_down_to_y_up = np.array( 265 [ 266 [1.0, -0.0, 0.0, 0], 267 [0.0, -1.0, 0.0, 0], 268 [0.0, 0.0, 1.0, 0], 269 [0.0, 0.0, 0, 1.0], 270 ], 271 dtype=np.float32, 272 ) 273 274 t = np.frombuffer(buffer, dtype=np.float32) 275 transform = np.eye(4) 276 transform[:3, :] = t.reshape((3, 4)) 277 transform[:3, 3] = 0 278 transform = y_down_to_y_up @ transform 279 280 return transform 281 282 @staticmethod 283 def decode_intrinsic(session_configs: service_pb2.RegisterRequest): 284 sx = session_configs.camera_color.resize_factor_x 285 sy = session_configs.camera_color.resize_factor_y 286 287 fx, fy = ( 288 session_configs.camera_intrinsics.focal_length_x * sx, 289 session_configs.camera_intrinsics.focal_length_y * sy, 290 ) 291 cx, cy = ( 292 session_configs.camera_intrinsics.principal_point_x * sx, 293 session_configs.camera_intrinsics.principal_point_y * sy, 294 ) 295 296 k = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]]) 297 298 return k 299 300 @staticmethod 301 def decode_point_cloud( 302 session_configs: service_pb2.RegisterRequest, 303 k: np.ndarray, 304 color_rgb: np.ndarray, 305 depth_img: np.ndarray, 306 transform: np.ndarray, 307 ) -> np.ndarray: 308 # Flip image is needed for point cloud generation. 309 color_rgb = np.flipud(color_rgb) 310 depth_img = np.flipud(depth_img) 311 312 color_img_w = int( 313 session_configs.camera_intrinsics.resolution_x 314 * session_configs.camera_color.resize_factor_x 315 ) 316 color_img_h = int( 317 session_configs.camera_intrinsics.resolution_y 318 * session_configs.camera_color.resize_factor_y 319 ) 320 u, v = np.meshgrid(np.arange(color_img_w), np.arange(color_img_h)) 321 fx, fy = k[0, 0], k[1, 1] 322 cx, cy = k[0, 2], k[1, 2] 323 324 z = depth_img.copy() 325 x = ((u - cx) * z) / fx 326 y = ((v - cy) * z) / fy 327 pcd = np.stack([x, y, z], axis=-1).reshape(-1, 3) 328 pcd = np.matmul(transform[:3, :3], pcd.T).T + transform[:3, 3] 329 clr = color_rgb.reshape(-1, 3) 330 331 return pcd, clr
class
ARFlowService(arflow.service_pb2_grpc.ARFlowService):
20class ARFlowService(service_pb2_grpc.ARFlowService): 21 """ARFlow gRPC service.""" 22 23 _start_time = time.time_ns() 24 _frame_data: List[Dict[str, float | bytes]] = [] 25 26 def __init__(self, use_visualizer: bool = True) -> None: 27 self.recorder = rr 28 self.use_visualizer = use_visualizer 29 super().__init__() 30 31 def _save_frame_data( 32 self, request: service_pb2.DataFrameRequest | service_pb2.RegisterRequest 33 ): 34 """@private""" 35 time_stamp = (time.time_ns() - self._start_time) / 1e9 36 self._frame_data.append( 37 {"time_stamp": time_stamp, "data": request.SerializeToString()} 38 ) 39 40 def register( 41 self, request: service_pb2.RegisterRequest, context, uid: str | None = None 42 ) -> service_pb2.RegisterResponse: 43 """Register a client.""" 44 45 self._save_frame_data(request) 46 47 # Start processing. 48 if uid is None: 49 uid = str(uuid.uuid4()) 50 51 sessions[uid] = request 52 53 self.recorder.init( 54 f"{request.device_name} - ARFlow", 55 spawn=self.use_visualizer, 56 default_enabled=self.use_visualizer, 57 ) 58 print("Registered a client with UUID: %s" % uid, request) 59 60 # Call the for user extension code. 61 self.on_register(request) 62 63 return service_pb2.RegisterResponse(uid=uid) 64 65 def data_frame( 66 self, 67 request: service_pb2.DataFrameRequest, 68 context, 69 ) -> service_pb2.DataFrameResponse: 70 """Process an incoming frame.""" 71 72 self._save_frame_data(request) 73 74 # Start processing. 75 decoded_data = {} 76 session_configs = sessions[request.uid] 77 78 if session_configs.camera_color.enabled: 79 color_rgb = ARFlowService.decode_rgb_image(session_configs, request.color) 80 decoded_data["image/color_rgb"] = color_rgb 81 color_rgb = np.flipud(color_rgb) 82 self.recorder.log("rgb", rr.Image(color_rgb)) 83 84 if session_configs.camera_depth.enabled: 85 depth_img = ARFlowService.decode_depth_image(session_configs, request.depth) 86 decoded_data["image/depth_img"] = depth_img 87 depth_img = np.flipud(depth_img) 88 self.recorder.log("depth", rr.DepthImage(depth_img, meter=1.0)) 89 90 if session_configs.camera_transform.enabled: 91 self.recorder.log("world/origin", rr.ViewCoordinates.RIGHT_HAND_Y_DOWN) 92 # self.logger.log( 93 # "world/xyz", 94 # rr.Arrows3D( 95 # vectors=[[1, 0, 0], [0, 1, 0], [0, 0, 1]], 96 # colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]], 97 # ), 98 # ) 99 100 transform = ARFlowService.decode_transform(request.transform) 101 decoded_data["transform"] = transform 102 self.recorder.log( 103 "world/camera", 104 self.recorder.Transform3D( 105 mat3x3=transform[:3, :3], translation=transform[:3, 3] 106 ), 107 ) 108 109 k = ARFlowService.decode_intrinsic(session_configs) 110 self.recorder.log("world/camera", rr.Pinhole(image_from_camera=k)) 111 self.recorder.log("world/camera", rr.Image(np.flipud(color_rgb))) 112 113 if session_configs.camera_point_cloud.enabled: 114 pcd, clr = ARFlowService.decode_point_cloud( 115 session_configs, k, color_rgb, depth_img, transform 116 ) 117 decoded_data["point_cloud_pcd"] = pcd 118 decoded_data["point_cloud_clr"] = clr 119 self.recorder.log("world/point_cloud", rr.Points3D(pcd, colors=clr)) 120 121 if session_configs.camera_plane_detection.enabled: 122 pass 123 124 if session_configs.gyroscope.enabled: 125 gyro_data = request.gyroscope 126 127 attitude = rr.Quaternion( 128 xyzw=[ 129 gyro_data.attitude.x, 130 gyro_data.attitude.y, 131 gyro_data.attitude.z, 132 gyro_data.attitude.w, 133 ] 134 ) 135 rotation_rate = rr.datatypes.Vec3D( 136 [ 137 gyro_data.rotation_rate.x, 138 gyro_data.rotation_rate.y, 139 gyro_data.rotation_rate.z, 140 ] 141 ) 142 gravity = rr.datatypes.Vec3D( 143 [gyro_data.gravity.x, gyro_data.gravity.y, gyro_data.gravity.z] 144 ) 145 acceleration = rr.datatypes.Vec3D( 146 [ 147 gyro_data.acceleration.x, 148 gyro_data.acceleration.y, 149 gyro_data.acceleration.z, 150 ] 151 ) 152 153 # Attitute is displayed as a box, and the other acceleration variables are displayed as arrows. 154 rr.log( 155 "rotations/gyroscope/attitude", 156 rr.Boxes3D(half_sizes=[0.5, 0.5, 0.5], quaternions=[attitude]), 157 ) 158 rr.log( 159 "rotations/gyroscope/rotation_rate", 160 rr.Arrows3D(vectors=[rotation_rate], colors=[[0, 255, 0]]), 161 ) 162 rr.log( 163 "rotations/gyroscope/gravity", 164 rr.Arrows3D(vectors=[gravity], colors=[[0, 0, 255]]), 165 ) 166 rr.log( 167 "rotations/gyroscope/acceleration", 168 rr.Arrows3D(vectors=[acceleration], colors=[[255, 255, 0]]), 169 ) 170 171 # Call the for user extension code. 172 self.on_frame_received(decoded_data) 173 174 return service_pb2.DataFrameResponse(message="OK") 175 176 def on_register(self, request: service_pb2.RegisterRequest): 177 """Called when a new device is registered. Override this method to process the data.""" 178 pass 179 180 def on_frame_received(self, frame_data: service_pb2.DataFrameRequest): 181 """Called when a frame is received. Override this method to process the data.""" 182 pass 183 184 def on_program_exit(self, path_to_save: str | None): 185 """Save the data and exit.""" 186 if path_to_save is None: 187 return 188 print("Saving the data...") 189 f_name = strftime("%Y_%m_%d_%H_%M_%S", gmtime()) 190 save_path = os.path.join(path_to_save, f"frames_{f_name}.pkl") 191 with open(save_path, "wb") as f: 192 pickle.dump(self._frame_data, f) 193 194 print(f"Data saved to {save_path}") 195 196 @staticmethod 197 def decode_rgb_image( 198 session_configs: service_pb2.RegisterRequest, buffer: bytes 199 ) -> np.ndarray: 200 # Calculate the size of the image. 201 color_img_w = int( 202 session_configs.camera_intrinsics.resolution_x 203 * session_configs.camera_color.resize_factor_x 204 ) 205 color_img_h = int( 206 session_configs.camera_intrinsics.resolution_y 207 * session_configs.camera_color.resize_factor_y 208 ) 209 p = color_img_w * color_img_h 210 color_img = np.frombuffer(buffer, dtype=np.uint8) 211 212 # Decode RGB bytes into RGB. 213 if session_configs.camera_color.data_type == "RGB24": 214 color_rgb = color_img.reshape((color_img_h, color_img_w, 3)) 215 color_rgb = color_rgb.astype(np.uint8) 216 217 # Decode YCbCr bytes into RGB. 218 elif session_configs.camera_color.data_type == "YCbCr420": 219 y = color_img[:p].reshape((color_img_h, color_img_w)) 220 cbcr = color_img[p:].reshape((color_img_h // 2, color_img_w // 2, 2)) 221 cb, cr = cbcr[:, :, 0], cbcr[:, :, 1] 222 223 # Very important! Convert to float32 first! 224 cb = np.repeat(cb, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128 225 cr = np.repeat(cr, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128 226 227 r = np.clip(y + 1.403 * cr, 0, 255) 228 g = np.clip(y - 0.344 * cb - 0.714 * cr, 0, 255) 229 b = np.clip(y + 1.772 * cb, 0, 255) 230 231 color_rgb = np.stack([r, g, b], axis=-1) 232 color_rgb = color_rgb.astype(np.uint8) 233 234 return color_rgb 235 236 @staticmethod 237 def decode_depth_image( 238 session_configs: service_pb2.RegisterRequest, buffer: bytes 239 ) -> np.ndarray: 240 if session_configs.camera_depth.data_type == "f32": 241 dtype = np.float32 242 elif session_configs.camera_depth.data_type == "u16": 243 dtype = np.uint16 244 else: 245 raise ValueError( 246 f"Unknown depth data type: {session_configs.camera_depth.data_type}" 247 ) 248 249 depth_img = np.frombuffer(buffer, dtype=dtype) 250 depth_img = depth_img.reshape( 251 ( 252 session_configs.camera_depth.resolution_y, 253 session_configs.camera_depth.resolution_x, 254 ) 255 ) 256 257 # 16-bit unsigned integer, describing the depth (distance to an object) in millimeters. 258 if dtype == np.uint16: 259 depth_img = depth_img.astype(np.float32) / 1000.0 260 261 return depth_img 262 263 @staticmethod 264 def decode_transform(buffer: bytes): 265 y_down_to_y_up = np.array( 266 [ 267 [1.0, -0.0, 0.0, 0], 268 [0.0, -1.0, 0.0, 0], 269 [0.0, 0.0, 1.0, 0], 270 [0.0, 0.0, 0, 1.0], 271 ], 272 dtype=np.float32, 273 ) 274 275 t = np.frombuffer(buffer, dtype=np.float32) 276 transform = np.eye(4) 277 transform[:3, :] = t.reshape((3, 4)) 278 transform[:3, 3] = 0 279 transform = y_down_to_y_up @ transform 280 281 return transform 282 283 @staticmethod 284 def decode_intrinsic(session_configs: service_pb2.RegisterRequest): 285 sx = session_configs.camera_color.resize_factor_x 286 sy = session_configs.camera_color.resize_factor_y 287 288 fx, fy = ( 289 session_configs.camera_intrinsics.focal_length_x * sx, 290 session_configs.camera_intrinsics.focal_length_y * sy, 291 ) 292 cx, cy = ( 293 session_configs.camera_intrinsics.principal_point_x * sx, 294 session_configs.camera_intrinsics.principal_point_y * sy, 295 ) 296 297 k = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]]) 298 299 return k 300 301 @staticmethod 302 def decode_point_cloud( 303 session_configs: service_pb2.RegisterRequest, 304 k: np.ndarray, 305 color_rgb: np.ndarray, 306 depth_img: np.ndarray, 307 transform: np.ndarray, 308 ) -> np.ndarray: 309 # Flip image is needed for point cloud generation. 310 color_rgb = np.flipud(color_rgb) 311 depth_img = np.flipud(depth_img) 312 313 color_img_w = int( 314 session_configs.camera_intrinsics.resolution_x 315 * session_configs.camera_color.resize_factor_x 316 ) 317 color_img_h = int( 318 session_configs.camera_intrinsics.resolution_y 319 * session_configs.camera_color.resize_factor_y 320 ) 321 u, v = np.meshgrid(np.arange(color_img_w), np.arange(color_img_h)) 322 fx, fy = k[0, 0], k[1, 1] 323 cx, cy = k[0, 2], k[1, 2] 324 325 z = depth_img.copy() 326 x = ((u - cx) * z) / fx 327 y = ((v - cy) * z) / fy 328 pcd = np.stack([x, y, z], axis=-1).reshape(-1, 3) 329 pcd = np.matmul(transform[:3, :3], pcd.T).T + transform[:3, 3] 330 clr = color_rgb.reshape(-1, 3) 331 332 return pcd, clr
ARFlow gRPC service.
def
register( self, request: arflow.service_pb2.RegisterRequest, context, uid: str | None = None) -> arflow.service_pb2.RegisterResponse:
40 def register( 41 self, request: service_pb2.RegisterRequest, context, uid: str | None = None 42 ) -> service_pb2.RegisterResponse: 43 """Register a client.""" 44 45 self._save_frame_data(request) 46 47 # Start processing. 48 if uid is None: 49 uid = str(uuid.uuid4()) 50 51 sessions[uid] = request 52 53 self.recorder.init( 54 f"{request.device_name} - ARFlow", 55 spawn=self.use_visualizer, 56 default_enabled=self.use_visualizer, 57 ) 58 print("Registered a client with UUID: %s" % uid, request) 59 60 # Call the for user extension code. 61 self.on_register(request) 62 63 return service_pb2.RegisterResponse(uid=uid)
Register a client.
def
data_frame( self, request: arflow.service_pb2.DataFrameRequest, context) -> arflow.service_pb2.DataFrameResponse:
65 def data_frame( 66 self, 67 request: service_pb2.DataFrameRequest, 68 context, 69 ) -> service_pb2.DataFrameResponse: 70 """Process an incoming frame.""" 71 72 self._save_frame_data(request) 73 74 # Start processing. 75 decoded_data = {} 76 session_configs = sessions[request.uid] 77 78 if session_configs.camera_color.enabled: 79 color_rgb = ARFlowService.decode_rgb_image(session_configs, request.color) 80 decoded_data["image/color_rgb"] = color_rgb 81 color_rgb = np.flipud(color_rgb) 82 self.recorder.log("rgb", rr.Image(color_rgb)) 83 84 if session_configs.camera_depth.enabled: 85 depth_img = ARFlowService.decode_depth_image(session_configs, request.depth) 86 decoded_data["image/depth_img"] = depth_img 87 depth_img = np.flipud(depth_img) 88 self.recorder.log("depth", rr.DepthImage(depth_img, meter=1.0)) 89 90 if session_configs.camera_transform.enabled: 91 self.recorder.log("world/origin", rr.ViewCoordinates.RIGHT_HAND_Y_DOWN) 92 # self.logger.log( 93 # "world/xyz", 94 # rr.Arrows3D( 95 # vectors=[[1, 0, 0], [0, 1, 0], [0, 0, 1]], 96 # colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]], 97 # ), 98 # ) 99 100 transform = ARFlowService.decode_transform(request.transform) 101 decoded_data["transform"] = transform 102 self.recorder.log( 103 "world/camera", 104 self.recorder.Transform3D( 105 mat3x3=transform[:3, :3], translation=transform[:3, 3] 106 ), 107 ) 108 109 k = ARFlowService.decode_intrinsic(session_configs) 110 self.recorder.log("world/camera", rr.Pinhole(image_from_camera=k)) 111 self.recorder.log("world/camera", rr.Image(np.flipud(color_rgb))) 112 113 if session_configs.camera_point_cloud.enabled: 114 pcd, clr = ARFlowService.decode_point_cloud( 115 session_configs, k, color_rgb, depth_img, transform 116 ) 117 decoded_data["point_cloud_pcd"] = pcd 118 decoded_data["point_cloud_clr"] = clr 119 self.recorder.log("world/point_cloud", rr.Points3D(pcd, colors=clr)) 120 121 if session_configs.camera_plane_detection.enabled: 122 pass 123 124 if session_configs.gyroscope.enabled: 125 gyro_data = request.gyroscope 126 127 attitude = rr.Quaternion( 128 xyzw=[ 129 gyro_data.attitude.x, 130 gyro_data.attitude.y, 131 gyro_data.attitude.z, 132 gyro_data.attitude.w, 133 ] 134 ) 135 rotation_rate = rr.datatypes.Vec3D( 136 [ 137 gyro_data.rotation_rate.x, 138 gyro_data.rotation_rate.y, 139 gyro_data.rotation_rate.z, 140 ] 141 ) 142 gravity = rr.datatypes.Vec3D( 143 [gyro_data.gravity.x, gyro_data.gravity.y, gyro_data.gravity.z] 144 ) 145 acceleration = rr.datatypes.Vec3D( 146 [ 147 gyro_data.acceleration.x, 148 gyro_data.acceleration.y, 149 gyro_data.acceleration.z, 150 ] 151 ) 152 153 # Attitute is displayed as a box, and the other acceleration variables are displayed as arrows. 154 rr.log( 155 "rotations/gyroscope/attitude", 156 rr.Boxes3D(half_sizes=[0.5, 0.5, 0.5], quaternions=[attitude]), 157 ) 158 rr.log( 159 "rotations/gyroscope/rotation_rate", 160 rr.Arrows3D(vectors=[rotation_rate], colors=[[0, 255, 0]]), 161 ) 162 rr.log( 163 "rotations/gyroscope/gravity", 164 rr.Arrows3D(vectors=[gravity], colors=[[0, 0, 255]]), 165 ) 166 rr.log( 167 "rotations/gyroscope/acceleration", 168 rr.Arrows3D(vectors=[acceleration], colors=[[255, 255, 0]]), 169 ) 170 171 # Call the for user extension code. 172 self.on_frame_received(decoded_data) 173 174 return service_pb2.DataFrameResponse(message="OK")
Process an incoming frame.
def
on_register(self, request: arflow.service_pb2.RegisterRequest):
176 def on_register(self, request: service_pb2.RegisterRequest): 177 """Called when a new device is registered. Override this method to process the data.""" 178 pass
Called when a new device is registered. Override this method to process the data.
def
on_frame_received(self, frame_data: arflow.service_pb2.DataFrameRequest):
180 def on_frame_received(self, frame_data: service_pb2.DataFrameRequest): 181 """Called when a frame is received. Override this method to process the data.""" 182 pass
Called when a frame is received. Override this method to process the data.
def
on_program_exit(self, path_to_save: str | None):
184 def on_program_exit(self, path_to_save: str | None): 185 """Save the data and exit.""" 186 if path_to_save is None: 187 return 188 print("Saving the data...") 189 f_name = strftime("%Y_%m_%d_%H_%M_%S", gmtime()) 190 save_path = os.path.join(path_to_save, f"frames_{f_name}.pkl") 191 with open(save_path, "wb") as f: 192 pickle.dump(self._frame_data, f) 193 194 print(f"Data saved to {save_path}")
Save the data and exit.
@staticmethod
def
decode_rgb_image( session_configs: arflow.service_pb2.RegisterRequest, buffer: bytes) -> numpy.ndarray:
196 @staticmethod 197 def decode_rgb_image( 198 session_configs: service_pb2.RegisterRequest, buffer: bytes 199 ) -> np.ndarray: 200 # Calculate the size of the image. 201 color_img_w = int( 202 session_configs.camera_intrinsics.resolution_x 203 * session_configs.camera_color.resize_factor_x 204 ) 205 color_img_h = int( 206 session_configs.camera_intrinsics.resolution_y 207 * session_configs.camera_color.resize_factor_y 208 ) 209 p = color_img_w * color_img_h 210 color_img = np.frombuffer(buffer, dtype=np.uint8) 211 212 # Decode RGB bytes into RGB. 213 if session_configs.camera_color.data_type == "RGB24": 214 color_rgb = color_img.reshape((color_img_h, color_img_w, 3)) 215 color_rgb = color_rgb.astype(np.uint8) 216 217 # Decode YCbCr bytes into RGB. 218 elif session_configs.camera_color.data_type == "YCbCr420": 219 y = color_img[:p].reshape((color_img_h, color_img_w)) 220 cbcr = color_img[p:].reshape((color_img_h // 2, color_img_w // 2, 2)) 221 cb, cr = cbcr[:, :, 0], cbcr[:, :, 1] 222 223 # Very important! Convert to float32 first! 224 cb = np.repeat(cb, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128 225 cr = np.repeat(cr, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128 226 227 r = np.clip(y + 1.403 * cr, 0, 255) 228 g = np.clip(y - 0.344 * cb - 0.714 * cr, 0, 255) 229 b = np.clip(y + 1.772 * cb, 0, 255) 230 231 color_rgb = np.stack([r, g, b], axis=-1) 232 color_rgb = color_rgb.astype(np.uint8) 233 234 return color_rgb
@staticmethod
def
decode_depth_image( session_configs: arflow.service_pb2.RegisterRequest, buffer: bytes) -> numpy.ndarray:
236 @staticmethod 237 def decode_depth_image( 238 session_configs: service_pb2.RegisterRequest, buffer: bytes 239 ) -> np.ndarray: 240 if session_configs.camera_depth.data_type == "f32": 241 dtype = np.float32 242 elif session_configs.camera_depth.data_type == "u16": 243 dtype = np.uint16 244 else: 245 raise ValueError( 246 f"Unknown depth data type: {session_configs.camera_depth.data_type}" 247 ) 248 249 depth_img = np.frombuffer(buffer, dtype=dtype) 250 depth_img = depth_img.reshape( 251 ( 252 session_configs.camera_depth.resolution_y, 253 session_configs.camera_depth.resolution_x, 254 ) 255 ) 256 257 # 16-bit unsigned integer, describing the depth (distance to an object) in millimeters. 258 if dtype == np.uint16: 259 depth_img = depth_img.astype(np.float32) / 1000.0 260 261 return depth_img
@staticmethod
def
decode_transform(buffer: bytes):
263 @staticmethod 264 def decode_transform(buffer: bytes): 265 y_down_to_y_up = np.array( 266 [ 267 [1.0, -0.0, 0.0, 0], 268 [0.0, -1.0, 0.0, 0], 269 [0.0, 0.0, 1.0, 0], 270 [0.0, 0.0, 0, 1.0], 271 ], 272 dtype=np.float32, 273 ) 274 275 t = np.frombuffer(buffer, dtype=np.float32) 276 transform = np.eye(4) 277 transform[:3, :] = t.reshape((3, 4)) 278 transform[:3, 3] = 0 279 transform = y_down_to_y_up @ transform 280 281 return transform
@staticmethod
def
decode_intrinsic(session_configs: arflow.service_pb2.RegisterRequest):
283 @staticmethod 284 def decode_intrinsic(session_configs: service_pb2.RegisterRequest): 285 sx = session_configs.camera_color.resize_factor_x 286 sy = session_configs.camera_color.resize_factor_y 287 288 fx, fy = ( 289 session_configs.camera_intrinsics.focal_length_x * sx, 290 session_configs.camera_intrinsics.focal_length_y * sy, 291 ) 292 cx, cy = ( 293 session_configs.camera_intrinsics.principal_point_x * sx, 294 session_configs.camera_intrinsics.principal_point_y * sy, 295 ) 296 297 k = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]]) 298 299 return k
@staticmethod
def
decode_point_cloud( session_configs: arflow.service_pb2.RegisterRequest, k: numpy.ndarray, color_rgb: numpy.ndarray, depth_img: numpy.ndarray, transform: numpy.ndarray) -> numpy.ndarray:
301 @staticmethod 302 def decode_point_cloud( 303 session_configs: service_pb2.RegisterRequest, 304 k: np.ndarray, 305 color_rgb: np.ndarray, 306 depth_img: np.ndarray, 307 transform: np.ndarray, 308 ) -> np.ndarray: 309 # Flip image is needed for point cloud generation. 310 color_rgb = np.flipud(color_rgb) 311 depth_img = np.flipud(depth_img) 312 313 color_img_w = int( 314 session_configs.camera_intrinsics.resolution_x 315 * session_configs.camera_color.resize_factor_x 316 ) 317 color_img_h = int( 318 session_configs.camera_intrinsics.resolution_y 319 * session_configs.camera_color.resize_factor_y 320 ) 321 u, v = np.meshgrid(np.arange(color_img_w), np.arange(color_img_h)) 322 fx, fy = k[0, 0], k[1, 1] 323 cx, cy = k[0, 2], k[1, 2] 324 325 z = depth_img.copy() 326 x = ((u - cx) * z) / fx 327 y = ((v - cy) * z) / fy 328 pcd = np.stack([x, y, z], axis=-1).reshape(-1, 3) 329 pcd = np.matmul(transform[:3, :3], pcd.T).T + transform[:3, 3] 330 clr = color_rgb.reshape(-1, 3) 331 332 return pcd, clr