arflow.core

Data exchanging service.

  1"""Data exchanging service."""
  2
  3import os
  4import pickle
  5import time
  6import uuid
  7from time import gmtime, strftime
  8from typing import Dict, List
  9
 10import numpy as np
 11import rerun as rr
 12
 13from arflow import service_pb2, service_pb2_grpc
 14
 15sessions: Dict[str, service_pb2.RegisterRequest] = {}
 16"""@private"""
 17
 18
 19class ARFlowService(service_pb2_grpc.ARFlowService):
 20    """ARFlow gRPC service."""
 21
 22    _start_time = time.time_ns()
 23    _frame_data: List[Dict[str, float | bytes]] = []
 24
 25    def __init__(self, use_visualizer: bool = True) -> None:
 26        self.recorder = rr
 27        self.use_visualizer = use_visualizer
 28        super().__init__()
 29
 30    def _save_frame_data(
 31        self, request: service_pb2.DataFrameRequest | service_pb2.RegisterRequest
 32    ):
 33        """@private"""
 34        time_stamp = (time.time_ns() - self._start_time) / 1e9
 35        self._frame_data.append(
 36            {"time_stamp": time_stamp, "data": request.SerializeToString()}
 37        )
 38
 39    def register(
 40        self, request: service_pb2.RegisterRequest, context, uid: str | None = None
 41    ) -> service_pb2.RegisterResponse:
 42        """Register a client."""
 43
 44        self._save_frame_data(request)
 45
 46        # Start processing.
 47        if uid is None:
 48            uid = str(uuid.uuid4())
 49
 50        sessions[uid] = request
 51
 52        self.recorder.init(
 53            f"{request.device_name} - ARFlow",
 54            spawn=self.use_visualizer,
 55            default_enabled=self.use_visualizer,
 56        )
 57        print("Registered a client with UUID: %s" % uid, request)
 58
 59        # Call the for user extension code.
 60        self.on_register(request)
 61
 62        return service_pb2.RegisterResponse(uid=uid)
 63
 64    def data_frame(
 65        self,
 66        request: service_pb2.DataFrameRequest,
 67        context,
 68    ) -> service_pb2.DataFrameResponse:
 69        """Process an incoming frame."""
 70
 71        self._save_frame_data(request)
 72
 73        # Start processing.
 74        decoded_data = {}
 75        session_configs = sessions[request.uid]
 76
 77        if session_configs.camera_color.enabled:
 78            color_rgb = ARFlowService.decode_rgb_image(session_configs, request.color)
 79            decoded_data["image/color_rgb"] = color_rgb
 80            color_rgb = np.flipud(color_rgb)
 81            self.recorder.log("rgb", rr.Image(color_rgb))
 82
 83        if session_configs.camera_depth.enabled:
 84            depth_img = ARFlowService.decode_depth_image(session_configs, request.depth)
 85            decoded_data["image/depth_img"] = depth_img
 86            depth_img = np.flipud(depth_img)
 87            self.recorder.log("depth", rr.DepthImage(depth_img, meter=1.0))
 88
 89        if session_configs.camera_transform.enabled:
 90            self.recorder.log("world/origin", rr.ViewCoordinates.RIGHT_HAND_Y_DOWN)
 91            # self.logger.log(
 92            #     "world/xyz",
 93            #     rr.Arrows3D(
 94            #         vectors=[[1, 0, 0], [0, 1, 0], [0, 0, 1]],
 95            #         colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]],
 96            #     ),
 97            # )
 98
 99            transform = ARFlowService.decode_transform(request.transform)
100            decoded_data["transform"] = transform
101            self.recorder.log(
102                "world/camera",
103                self.recorder.Transform3D(
104                    mat3x3=transform[:3, :3], translation=transform[:3, 3]
105                ),
106            )
107
108            k = ARFlowService.decode_intrinsic(session_configs)
109            self.recorder.log("world/camera", rr.Pinhole(image_from_camera=k))
110            self.recorder.log("world/camera", rr.Image(np.flipud(color_rgb)))
111
112        if session_configs.camera_point_cloud.enabled:
113            pcd, clr = ARFlowService.decode_point_cloud(
114                session_configs, k, color_rgb, depth_img, transform
115            )
116            decoded_data["point_cloud_pcd"] = pcd
117            decoded_data["point_cloud_clr"] = clr
118            self.recorder.log("world/point_cloud", rr.Points3D(pcd, colors=clr))
119
120        if session_configs.camera_plane_detection.enabled:
121            pass
122
123        if session_configs.gyroscope.enabled:
124            gyro_data = request.gyroscope
125
126            attitude = rr.Quaternion(
127                xyzw=[
128                    gyro_data.attitude.x,
129                    gyro_data.attitude.y,
130                    gyro_data.attitude.z,
131                    gyro_data.attitude.w,
132                ]
133            )
134            rotation_rate = rr.datatypes.Vec3D(
135                [
136                    gyro_data.rotation_rate.x,
137                    gyro_data.rotation_rate.y,
138                    gyro_data.rotation_rate.z,
139                ]
140            )
141            gravity = rr.datatypes.Vec3D(
142                [gyro_data.gravity.x, gyro_data.gravity.y, gyro_data.gravity.z]
143            )
144            acceleration = rr.datatypes.Vec3D(
145                [
146                    gyro_data.acceleration.x,
147                    gyro_data.acceleration.y,
148                    gyro_data.acceleration.z,
149                ]
150            )
151
152            # Attitute is displayed as a box, and the other acceleration variables are displayed as arrows.
153            rr.log(
154                "rotations/gyroscope/attitude",
155                rr.Boxes3D(half_sizes=[0.5, 0.5, 0.5], quaternions=[attitude]),
156            )
157            rr.log(
158                "rotations/gyroscope/rotation_rate",
159                rr.Arrows3D(vectors=[rotation_rate], colors=[[0, 255, 0]]),
160            )
161            rr.log(
162                "rotations/gyroscope/gravity",
163                rr.Arrows3D(vectors=[gravity], colors=[[0, 0, 255]]),
164            )
165            rr.log(
166                "rotations/gyroscope/acceleration",
167                rr.Arrows3D(vectors=[acceleration], colors=[[255, 255, 0]]),
168            )
169
170        # Call the for user extension code.
171        self.on_frame_received(decoded_data)
172
173        return service_pb2.DataFrameResponse(message="OK")
174
175    def on_register(self, request: service_pb2.RegisterRequest):
176        """Called when a new device is registered. Override this method to process the data."""
177        pass
178
179    def on_frame_received(self, frame_data: service_pb2.DataFrameRequest):
180        """Called when a frame is received. Override this method to process the data."""
181        pass
182
183    def on_program_exit(self, path_to_save: str | None):
184        """Save the data and exit."""
185        if path_to_save is None:
186            return
187        print("Saving the data...")
188        f_name = strftime("%Y_%m_%d_%H_%M_%S", gmtime())
189        save_path = os.path.join(path_to_save, f"frames_{f_name}.pkl")
190        with open(save_path, "wb") as f:
191            pickle.dump(self._frame_data, f)
192
193        print(f"Data saved to {save_path}")
194
195    @staticmethod
196    def decode_rgb_image(
197        session_configs: service_pb2.RegisterRequest, buffer: bytes
198    ) -> np.ndarray:
199        # Calculate the size of the image.
200        color_img_w = int(
201            session_configs.camera_intrinsics.resolution_x
202            * session_configs.camera_color.resize_factor_x
203        )
204        color_img_h = int(
205            session_configs.camera_intrinsics.resolution_y
206            * session_configs.camera_color.resize_factor_y
207        )
208        p = color_img_w * color_img_h
209        color_img = np.frombuffer(buffer, dtype=np.uint8)
210
211        # Decode RGB bytes into RGB.
212        if session_configs.camera_color.data_type == "RGB24":
213            color_rgb = color_img.reshape((color_img_h, color_img_w, 3))
214            color_rgb = color_rgb.astype(np.uint8)
215
216        # Decode YCbCr bytes into RGB.
217        elif session_configs.camera_color.data_type == "YCbCr420":
218            y = color_img[:p].reshape((color_img_h, color_img_w))
219            cbcr = color_img[p:].reshape((color_img_h // 2, color_img_w // 2, 2))
220            cb, cr = cbcr[:, :, 0], cbcr[:, :, 1]
221
222            # Very important! Convert to float32 first!
223            cb = np.repeat(cb, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128
224            cr = np.repeat(cr, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128
225
226            r = np.clip(y + 1.403 * cr, 0, 255)
227            g = np.clip(y - 0.344 * cb - 0.714 * cr, 0, 255)
228            b = np.clip(y + 1.772 * cb, 0, 255)
229
230            color_rgb = np.stack([r, g, b], axis=-1)
231            color_rgb = color_rgb.astype(np.uint8)
232
233        return color_rgb
234
235    @staticmethod
236    def decode_depth_image(
237        session_configs: service_pb2.RegisterRequest, buffer: bytes
238    ) -> np.ndarray:
239        if session_configs.camera_depth.data_type == "f32":
240            dtype = np.float32
241        elif session_configs.camera_depth.data_type == "u16":
242            dtype = np.uint16
243        else:
244            raise ValueError(
245                f"Unknown depth data type: {session_configs.camera_depth.data_type}"
246            )
247
248        depth_img = np.frombuffer(buffer, dtype=dtype)
249        depth_img = depth_img.reshape(
250            (
251                session_configs.camera_depth.resolution_y,
252                session_configs.camera_depth.resolution_x,
253            )
254        )
255
256        # 16-bit unsigned integer, describing the depth (distance to an object) in millimeters.
257        if dtype == np.uint16:
258            depth_img = depth_img.astype(np.float32) / 1000.0
259
260        return depth_img
261
262    @staticmethod
263    def decode_transform(buffer: bytes):
264        y_down_to_y_up = np.array(
265            [
266                [1.0, -0.0, 0.0, 0],
267                [0.0, -1.0, 0.0, 0],
268                [0.0, 0.0, 1.0, 0],
269                [0.0, 0.0, 0, 1.0],
270            ],
271            dtype=np.float32,
272        )
273
274        t = np.frombuffer(buffer, dtype=np.float32)
275        transform = np.eye(4)
276        transform[:3, :] = t.reshape((3, 4))
277        transform[:3, 3] = 0
278        transform = y_down_to_y_up @ transform
279
280        return transform
281
282    @staticmethod
283    def decode_intrinsic(session_configs: service_pb2.RegisterRequest):
284        sx = session_configs.camera_color.resize_factor_x
285        sy = session_configs.camera_color.resize_factor_y
286
287        fx, fy = (
288            session_configs.camera_intrinsics.focal_length_x * sx,
289            session_configs.camera_intrinsics.focal_length_y * sy,
290        )
291        cx, cy = (
292            session_configs.camera_intrinsics.principal_point_x * sx,
293            session_configs.camera_intrinsics.principal_point_y * sy,
294        )
295
296        k = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]])
297
298        return k
299
300    @staticmethod
301    def decode_point_cloud(
302        session_configs: service_pb2.RegisterRequest,
303        k: np.ndarray,
304        color_rgb: np.ndarray,
305        depth_img: np.ndarray,
306        transform: np.ndarray,
307    ) -> np.ndarray:
308        # Flip image is needed for point cloud generation.
309        color_rgb = np.flipud(color_rgb)
310        depth_img = np.flipud(depth_img)
311
312        color_img_w = int(
313            session_configs.camera_intrinsics.resolution_x
314            * session_configs.camera_color.resize_factor_x
315        )
316        color_img_h = int(
317            session_configs.camera_intrinsics.resolution_y
318            * session_configs.camera_color.resize_factor_y
319        )
320        u, v = np.meshgrid(np.arange(color_img_w), np.arange(color_img_h))
321        fx, fy = k[0, 0], k[1, 1]
322        cx, cy = k[0, 2], k[1, 2]
323
324        z = depth_img.copy()
325        x = ((u - cx) * z) / fx
326        y = ((v - cy) * z) / fy
327        pcd = np.stack([x, y, z], axis=-1).reshape(-1, 3)
328        pcd = np.matmul(transform[:3, :3], pcd.T).T + transform[:3, 3]
329        clr = color_rgb.reshape(-1, 3)
330
331        return pcd, clr
class ARFlowService(arflow.service_pb2_grpc.ARFlowService):
 20class ARFlowService(service_pb2_grpc.ARFlowService):
 21    """ARFlow gRPC service."""
 22
 23    _start_time = time.time_ns()
 24    _frame_data: List[Dict[str, float | bytes]] = []
 25
 26    def __init__(self, use_visualizer: bool = True) -> None:
 27        self.recorder = rr
 28        self.use_visualizer = use_visualizer
 29        super().__init__()
 30
 31    def _save_frame_data(
 32        self, request: service_pb2.DataFrameRequest | service_pb2.RegisterRequest
 33    ):
 34        """@private"""
 35        time_stamp = (time.time_ns() - self._start_time) / 1e9
 36        self._frame_data.append(
 37            {"time_stamp": time_stamp, "data": request.SerializeToString()}
 38        )
 39
 40    def register(
 41        self, request: service_pb2.RegisterRequest, context, uid: str | None = None
 42    ) -> service_pb2.RegisterResponse:
 43        """Register a client."""
 44
 45        self._save_frame_data(request)
 46
 47        # Start processing.
 48        if uid is None:
 49            uid = str(uuid.uuid4())
 50
 51        sessions[uid] = request
 52
 53        self.recorder.init(
 54            f"{request.device_name} - ARFlow",
 55            spawn=self.use_visualizer,
 56            default_enabled=self.use_visualizer,
 57        )
 58        print("Registered a client with UUID: %s" % uid, request)
 59
 60        # Call the for user extension code.
 61        self.on_register(request)
 62
 63        return service_pb2.RegisterResponse(uid=uid)
 64
 65    def data_frame(
 66        self,
 67        request: service_pb2.DataFrameRequest,
 68        context,
 69    ) -> service_pb2.DataFrameResponse:
 70        """Process an incoming frame."""
 71
 72        self._save_frame_data(request)
 73
 74        # Start processing.
 75        decoded_data = {}
 76        session_configs = sessions[request.uid]
 77
 78        if session_configs.camera_color.enabled:
 79            color_rgb = ARFlowService.decode_rgb_image(session_configs, request.color)
 80            decoded_data["image/color_rgb"] = color_rgb
 81            color_rgb = np.flipud(color_rgb)
 82            self.recorder.log("rgb", rr.Image(color_rgb))
 83
 84        if session_configs.camera_depth.enabled:
 85            depth_img = ARFlowService.decode_depth_image(session_configs, request.depth)
 86            decoded_data["image/depth_img"] = depth_img
 87            depth_img = np.flipud(depth_img)
 88            self.recorder.log("depth", rr.DepthImage(depth_img, meter=1.0))
 89
 90        if session_configs.camera_transform.enabled:
 91            self.recorder.log("world/origin", rr.ViewCoordinates.RIGHT_HAND_Y_DOWN)
 92            # self.logger.log(
 93            #     "world/xyz",
 94            #     rr.Arrows3D(
 95            #         vectors=[[1, 0, 0], [0, 1, 0], [0, 0, 1]],
 96            #         colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]],
 97            #     ),
 98            # )
 99
100            transform = ARFlowService.decode_transform(request.transform)
101            decoded_data["transform"] = transform
102            self.recorder.log(
103                "world/camera",
104                self.recorder.Transform3D(
105                    mat3x3=transform[:3, :3], translation=transform[:3, 3]
106                ),
107            )
108
109            k = ARFlowService.decode_intrinsic(session_configs)
110            self.recorder.log("world/camera", rr.Pinhole(image_from_camera=k))
111            self.recorder.log("world/camera", rr.Image(np.flipud(color_rgb)))
112
113        if session_configs.camera_point_cloud.enabled:
114            pcd, clr = ARFlowService.decode_point_cloud(
115                session_configs, k, color_rgb, depth_img, transform
116            )
117            decoded_data["point_cloud_pcd"] = pcd
118            decoded_data["point_cloud_clr"] = clr
119            self.recorder.log("world/point_cloud", rr.Points3D(pcd, colors=clr))
120
121        if session_configs.camera_plane_detection.enabled:
122            pass
123
124        if session_configs.gyroscope.enabled:
125            gyro_data = request.gyroscope
126
127            attitude = rr.Quaternion(
128                xyzw=[
129                    gyro_data.attitude.x,
130                    gyro_data.attitude.y,
131                    gyro_data.attitude.z,
132                    gyro_data.attitude.w,
133                ]
134            )
135            rotation_rate = rr.datatypes.Vec3D(
136                [
137                    gyro_data.rotation_rate.x,
138                    gyro_data.rotation_rate.y,
139                    gyro_data.rotation_rate.z,
140                ]
141            )
142            gravity = rr.datatypes.Vec3D(
143                [gyro_data.gravity.x, gyro_data.gravity.y, gyro_data.gravity.z]
144            )
145            acceleration = rr.datatypes.Vec3D(
146                [
147                    gyro_data.acceleration.x,
148                    gyro_data.acceleration.y,
149                    gyro_data.acceleration.z,
150                ]
151            )
152
153            # Attitute is displayed as a box, and the other acceleration variables are displayed as arrows.
154            rr.log(
155                "rotations/gyroscope/attitude",
156                rr.Boxes3D(half_sizes=[0.5, 0.5, 0.5], quaternions=[attitude]),
157            )
158            rr.log(
159                "rotations/gyroscope/rotation_rate",
160                rr.Arrows3D(vectors=[rotation_rate], colors=[[0, 255, 0]]),
161            )
162            rr.log(
163                "rotations/gyroscope/gravity",
164                rr.Arrows3D(vectors=[gravity], colors=[[0, 0, 255]]),
165            )
166            rr.log(
167                "rotations/gyroscope/acceleration",
168                rr.Arrows3D(vectors=[acceleration], colors=[[255, 255, 0]]),
169            )
170
171        # Call the for user extension code.
172        self.on_frame_received(decoded_data)
173
174        return service_pb2.DataFrameResponse(message="OK")
175
176    def on_register(self, request: service_pb2.RegisterRequest):
177        """Called when a new device is registered. Override this method to process the data."""
178        pass
179
180    def on_frame_received(self, frame_data: service_pb2.DataFrameRequest):
181        """Called when a frame is received. Override this method to process the data."""
182        pass
183
184    def on_program_exit(self, path_to_save: str | None):
185        """Save the data and exit."""
186        if path_to_save is None:
187            return
188        print("Saving the data...")
189        f_name = strftime("%Y_%m_%d_%H_%M_%S", gmtime())
190        save_path = os.path.join(path_to_save, f"frames_{f_name}.pkl")
191        with open(save_path, "wb") as f:
192            pickle.dump(self._frame_data, f)
193
194        print(f"Data saved to {save_path}")
195
196    @staticmethod
197    def decode_rgb_image(
198        session_configs: service_pb2.RegisterRequest, buffer: bytes
199    ) -> np.ndarray:
200        # Calculate the size of the image.
201        color_img_w = int(
202            session_configs.camera_intrinsics.resolution_x
203            * session_configs.camera_color.resize_factor_x
204        )
205        color_img_h = int(
206            session_configs.camera_intrinsics.resolution_y
207            * session_configs.camera_color.resize_factor_y
208        )
209        p = color_img_w * color_img_h
210        color_img = np.frombuffer(buffer, dtype=np.uint8)
211
212        # Decode RGB bytes into RGB.
213        if session_configs.camera_color.data_type == "RGB24":
214            color_rgb = color_img.reshape((color_img_h, color_img_w, 3))
215            color_rgb = color_rgb.astype(np.uint8)
216
217        # Decode YCbCr bytes into RGB.
218        elif session_configs.camera_color.data_type == "YCbCr420":
219            y = color_img[:p].reshape((color_img_h, color_img_w))
220            cbcr = color_img[p:].reshape((color_img_h // 2, color_img_w // 2, 2))
221            cb, cr = cbcr[:, :, 0], cbcr[:, :, 1]
222
223            # Very important! Convert to float32 first!
224            cb = np.repeat(cb, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128
225            cr = np.repeat(cr, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128
226
227            r = np.clip(y + 1.403 * cr, 0, 255)
228            g = np.clip(y - 0.344 * cb - 0.714 * cr, 0, 255)
229            b = np.clip(y + 1.772 * cb, 0, 255)
230
231            color_rgb = np.stack([r, g, b], axis=-1)
232            color_rgb = color_rgb.astype(np.uint8)
233
234        return color_rgb
235
236    @staticmethod
237    def decode_depth_image(
238        session_configs: service_pb2.RegisterRequest, buffer: bytes
239    ) -> np.ndarray:
240        if session_configs.camera_depth.data_type == "f32":
241            dtype = np.float32
242        elif session_configs.camera_depth.data_type == "u16":
243            dtype = np.uint16
244        else:
245            raise ValueError(
246                f"Unknown depth data type: {session_configs.camera_depth.data_type}"
247            )
248
249        depth_img = np.frombuffer(buffer, dtype=dtype)
250        depth_img = depth_img.reshape(
251            (
252                session_configs.camera_depth.resolution_y,
253                session_configs.camera_depth.resolution_x,
254            )
255        )
256
257        # 16-bit unsigned integer, describing the depth (distance to an object) in millimeters.
258        if dtype == np.uint16:
259            depth_img = depth_img.astype(np.float32) / 1000.0
260
261        return depth_img
262
263    @staticmethod
264    def decode_transform(buffer: bytes):
265        y_down_to_y_up = np.array(
266            [
267                [1.0, -0.0, 0.0, 0],
268                [0.0, -1.0, 0.0, 0],
269                [0.0, 0.0, 1.0, 0],
270                [0.0, 0.0, 0, 1.0],
271            ],
272            dtype=np.float32,
273        )
274
275        t = np.frombuffer(buffer, dtype=np.float32)
276        transform = np.eye(4)
277        transform[:3, :] = t.reshape((3, 4))
278        transform[:3, 3] = 0
279        transform = y_down_to_y_up @ transform
280
281        return transform
282
283    @staticmethod
284    def decode_intrinsic(session_configs: service_pb2.RegisterRequest):
285        sx = session_configs.camera_color.resize_factor_x
286        sy = session_configs.camera_color.resize_factor_y
287
288        fx, fy = (
289            session_configs.camera_intrinsics.focal_length_x * sx,
290            session_configs.camera_intrinsics.focal_length_y * sy,
291        )
292        cx, cy = (
293            session_configs.camera_intrinsics.principal_point_x * sx,
294            session_configs.camera_intrinsics.principal_point_y * sy,
295        )
296
297        k = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]])
298
299        return k
300
301    @staticmethod
302    def decode_point_cloud(
303        session_configs: service_pb2.RegisterRequest,
304        k: np.ndarray,
305        color_rgb: np.ndarray,
306        depth_img: np.ndarray,
307        transform: np.ndarray,
308    ) -> np.ndarray:
309        # Flip image is needed for point cloud generation.
310        color_rgb = np.flipud(color_rgb)
311        depth_img = np.flipud(depth_img)
312
313        color_img_w = int(
314            session_configs.camera_intrinsics.resolution_x
315            * session_configs.camera_color.resize_factor_x
316        )
317        color_img_h = int(
318            session_configs.camera_intrinsics.resolution_y
319            * session_configs.camera_color.resize_factor_y
320        )
321        u, v = np.meshgrid(np.arange(color_img_w), np.arange(color_img_h))
322        fx, fy = k[0, 0], k[1, 1]
323        cx, cy = k[0, 2], k[1, 2]
324
325        z = depth_img.copy()
326        x = ((u - cx) * z) / fx
327        y = ((v - cy) * z) / fy
328        pcd = np.stack([x, y, z], axis=-1).reshape(-1, 3)
329        pcd = np.matmul(transform[:3, :3], pcd.T).T + transform[:3, 3]
330        clr = color_rgb.reshape(-1, 3)
331
332        return pcd, clr

ARFlow gRPC service.

ARFlowService(use_visualizer: bool = True)
26    def __init__(self, use_visualizer: bool = True) -> None:
27        self.recorder = rr
28        self.use_visualizer = use_visualizer
29        super().__init__()
recorder
use_visualizer
def register( self, request: arflow.service_pb2.RegisterRequest, context, uid: str | None = None) -> arflow.service_pb2.RegisterResponse:
40    def register(
41        self, request: service_pb2.RegisterRequest, context, uid: str | None = None
42    ) -> service_pb2.RegisterResponse:
43        """Register a client."""
44
45        self._save_frame_data(request)
46
47        # Start processing.
48        if uid is None:
49            uid = str(uuid.uuid4())
50
51        sessions[uid] = request
52
53        self.recorder.init(
54            f"{request.device_name} - ARFlow",
55            spawn=self.use_visualizer,
56            default_enabled=self.use_visualizer,
57        )
58        print("Registered a client with UUID: %s" % uid, request)
59
60        # Call the for user extension code.
61        self.on_register(request)
62
63        return service_pb2.RegisterResponse(uid=uid)

Register a client.

def data_frame( self, request: arflow.service_pb2.DataFrameRequest, context) -> arflow.service_pb2.DataFrameResponse:
 65    def data_frame(
 66        self,
 67        request: service_pb2.DataFrameRequest,
 68        context,
 69    ) -> service_pb2.DataFrameResponse:
 70        """Process an incoming frame."""
 71
 72        self._save_frame_data(request)
 73
 74        # Start processing.
 75        decoded_data = {}
 76        session_configs = sessions[request.uid]
 77
 78        if session_configs.camera_color.enabled:
 79            color_rgb = ARFlowService.decode_rgb_image(session_configs, request.color)
 80            decoded_data["image/color_rgb"] = color_rgb
 81            color_rgb = np.flipud(color_rgb)
 82            self.recorder.log("rgb", rr.Image(color_rgb))
 83
 84        if session_configs.camera_depth.enabled:
 85            depth_img = ARFlowService.decode_depth_image(session_configs, request.depth)
 86            decoded_data["image/depth_img"] = depth_img
 87            depth_img = np.flipud(depth_img)
 88            self.recorder.log("depth", rr.DepthImage(depth_img, meter=1.0))
 89
 90        if session_configs.camera_transform.enabled:
 91            self.recorder.log("world/origin", rr.ViewCoordinates.RIGHT_HAND_Y_DOWN)
 92            # self.logger.log(
 93            #     "world/xyz",
 94            #     rr.Arrows3D(
 95            #         vectors=[[1, 0, 0], [0, 1, 0], [0, 0, 1]],
 96            #         colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]],
 97            #     ),
 98            # )
 99
100            transform = ARFlowService.decode_transform(request.transform)
101            decoded_data["transform"] = transform
102            self.recorder.log(
103                "world/camera",
104                self.recorder.Transform3D(
105                    mat3x3=transform[:3, :3], translation=transform[:3, 3]
106                ),
107            )
108
109            k = ARFlowService.decode_intrinsic(session_configs)
110            self.recorder.log("world/camera", rr.Pinhole(image_from_camera=k))
111            self.recorder.log("world/camera", rr.Image(np.flipud(color_rgb)))
112
113        if session_configs.camera_point_cloud.enabled:
114            pcd, clr = ARFlowService.decode_point_cloud(
115                session_configs, k, color_rgb, depth_img, transform
116            )
117            decoded_data["point_cloud_pcd"] = pcd
118            decoded_data["point_cloud_clr"] = clr
119            self.recorder.log("world/point_cloud", rr.Points3D(pcd, colors=clr))
120
121        if session_configs.camera_plane_detection.enabled:
122            pass
123
124        if session_configs.gyroscope.enabled:
125            gyro_data = request.gyroscope
126
127            attitude = rr.Quaternion(
128                xyzw=[
129                    gyro_data.attitude.x,
130                    gyro_data.attitude.y,
131                    gyro_data.attitude.z,
132                    gyro_data.attitude.w,
133                ]
134            )
135            rotation_rate = rr.datatypes.Vec3D(
136                [
137                    gyro_data.rotation_rate.x,
138                    gyro_data.rotation_rate.y,
139                    gyro_data.rotation_rate.z,
140                ]
141            )
142            gravity = rr.datatypes.Vec3D(
143                [gyro_data.gravity.x, gyro_data.gravity.y, gyro_data.gravity.z]
144            )
145            acceleration = rr.datatypes.Vec3D(
146                [
147                    gyro_data.acceleration.x,
148                    gyro_data.acceleration.y,
149                    gyro_data.acceleration.z,
150                ]
151            )
152
153            # Attitute is displayed as a box, and the other acceleration variables are displayed as arrows.
154            rr.log(
155                "rotations/gyroscope/attitude",
156                rr.Boxes3D(half_sizes=[0.5, 0.5, 0.5], quaternions=[attitude]),
157            )
158            rr.log(
159                "rotations/gyroscope/rotation_rate",
160                rr.Arrows3D(vectors=[rotation_rate], colors=[[0, 255, 0]]),
161            )
162            rr.log(
163                "rotations/gyroscope/gravity",
164                rr.Arrows3D(vectors=[gravity], colors=[[0, 0, 255]]),
165            )
166            rr.log(
167                "rotations/gyroscope/acceleration",
168                rr.Arrows3D(vectors=[acceleration], colors=[[255, 255, 0]]),
169            )
170
171        # Call the for user extension code.
172        self.on_frame_received(decoded_data)
173
174        return service_pb2.DataFrameResponse(message="OK")

Process an incoming frame.

def on_register(self, request: arflow.service_pb2.RegisterRequest):
176    def on_register(self, request: service_pb2.RegisterRequest):
177        """Called when a new device is registered. Override this method to process the data."""
178        pass

Called when a new device is registered. Override this method to process the data.

def on_frame_received(self, frame_data: arflow.service_pb2.DataFrameRequest):
180    def on_frame_received(self, frame_data: service_pb2.DataFrameRequest):
181        """Called when a frame is received. Override this method to process the data."""
182        pass

Called when a frame is received. Override this method to process the data.

def on_program_exit(self, path_to_save: str | None):
184    def on_program_exit(self, path_to_save: str | None):
185        """Save the data and exit."""
186        if path_to_save is None:
187            return
188        print("Saving the data...")
189        f_name = strftime("%Y_%m_%d_%H_%M_%S", gmtime())
190        save_path = os.path.join(path_to_save, f"frames_{f_name}.pkl")
191        with open(save_path, "wb") as f:
192            pickle.dump(self._frame_data, f)
193
194        print(f"Data saved to {save_path}")

Save the data and exit.

@staticmethod
def decode_rgb_image( session_configs: arflow.service_pb2.RegisterRequest, buffer: bytes) -> numpy.ndarray:
196    @staticmethod
197    def decode_rgb_image(
198        session_configs: service_pb2.RegisterRequest, buffer: bytes
199    ) -> np.ndarray:
200        # Calculate the size of the image.
201        color_img_w = int(
202            session_configs.camera_intrinsics.resolution_x
203            * session_configs.camera_color.resize_factor_x
204        )
205        color_img_h = int(
206            session_configs.camera_intrinsics.resolution_y
207            * session_configs.camera_color.resize_factor_y
208        )
209        p = color_img_w * color_img_h
210        color_img = np.frombuffer(buffer, dtype=np.uint8)
211
212        # Decode RGB bytes into RGB.
213        if session_configs.camera_color.data_type == "RGB24":
214            color_rgb = color_img.reshape((color_img_h, color_img_w, 3))
215            color_rgb = color_rgb.astype(np.uint8)
216
217        # Decode YCbCr bytes into RGB.
218        elif session_configs.camera_color.data_type == "YCbCr420":
219            y = color_img[:p].reshape((color_img_h, color_img_w))
220            cbcr = color_img[p:].reshape((color_img_h // 2, color_img_w // 2, 2))
221            cb, cr = cbcr[:, :, 0], cbcr[:, :, 1]
222
223            # Very important! Convert to float32 first!
224            cb = np.repeat(cb, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128
225            cr = np.repeat(cr, 2, axis=0).repeat(2, axis=1).astype(np.float32) - 128
226
227            r = np.clip(y + 1.403 * cr, 0, 255)
228            g = np.clip(y - 0.344 * cb - 0.714 * cr, 0, 255)
229            b = np.clip(y + 1.772 * cb, 0, 255)
230
231            color_rgb = np.stack([r, g, b], axis=-1)
232            color_rgb = color_rgb.astype(np.uint8)
233
234        return color_rgb
@staticmethod
def decode_depth_image( session_configs: arflow.service_pb2.RegisterRequest, buffer: bytes) -> numpy.ndarray:
236    @staticmethod
237    def decode_depth_image(
238        session_configs: service_pb2.RegisterRequest, buffer: bytes
239    ) -> np.ndarray:
240        if session_configs.camera_depth.data_type == "f32":
241            dtype = np.float32
242        elif session_configs.camera_depth.data_type == "u16":
243            dtype = np.uint16
244        else:
245            raise ValueError(
246                f"Unknown depth data type: {session_configs.camera_depth.data_type}"
247            )
248
249        depth_img = np.frombuffer(buffer, dtype=dtype)
250        depth_img = depth_img.reshape(
251            (
252                session_configs.camera_depth.resolution_y,
253                session_configs.camera_depth.resolution_x,
254            )
255        )
256
257        # 16-bit unsigned integer, describing the depth (distance to an object) in millimeters.
258        if dtype == np.uint16:
259            depth_img = depth_img.astype(np.float32) / 1000.0
260
261        return depth_img
@staticmethod
def decode_transform(buffer: bytes):
263    @staticmethod
264    def decode_transform(buffer: bytes):
265        y_down_to_y_up = np.array(
266            [
267                [1.0, -0.0, 0.0, 0],
268                [0.0, -1.0, 0.0, 0],
269                [0.0, 0.0, 1.0, 0],
270                [0.0, 0.0, 0, 1.0],
271            ],
272            dtype=np.float32,
273        )
274
275        t = np.frombuffer(buffer, dtype=np.float32)
276        transform = np.eye(4)
277        transform[:3, :] = t.reshape((3, 4))
278        transform[:3, 3] = 0
279        transform = y_down_to_y_up @ transform
280
281        return transform
@staticmethod
def decode_intrinsic(session_configs: arflow.service_pb2.RegisterRequest):
283    @staticmethod
284    def decode_intrinsic(session_configs: service_pb2.RegisterRequest):
285        sx = session_configs.camera_color.resize_factor_x
286        sy = session_configs.camera_color.resize_factor_y
287
288        fx, fy = (
289            session_configs.camera_intrinsics.focal_length_x * sx,
290            session_configs.camera_intrinsics.focal_length_y * sy,
291        )
292        cx, cy = (
293            session_configs.camera_intrinsics.principal_point_x * sx,
294            session_configs.camera_intrinsics.principal_point_y * sy,
295        )
296
297        k = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]])
298
299        return k
@staticmethod
def decode_point_cloud( session_configs: arflow.service_pb2.RegisterRequest, k: numpy.ndarray, color_rgb: numpy.ndarray, depth_img: numpy.ndarray, transform: numpy.ndarray) -> numpy.ndarray:
301    @staticmethod
302    def decode_point_cloud(
303        session_configs: service_pb2.RegisterRequest,
304        k: np.ndarray,
305        color_rgb: np.ndarray,
306        depth_img: np.ndarray,
307        transform: np.ndarray,
308    ) -> np.ndarray:
309        # Flip image is needed for point cloud generation.
310        color_rgb = np.flipud(color_rgb)
311        depth_img = np.flipud(depth_img)
312
313        color_img_w = int(
314            session_configs.camera_intrinsics.resolution_x
315            * session_configs.camera_color.resize_factor_x
316        )
317        color_img_h = int(
318            session_configs.camera_intrinsics.resolution_y
319            * session_configs.camera_color.resize_factor_y
320        )
321        u, v = np.meshgrid(np.arange(color_img_w), np.arange(color_img_h))
322        fx, fy = k[0, 0], k[1, 1]
323        cx, cy = k[0, 2], k[1, 2]
324
325        z = depth_img.copy()
326        x = ((u - cx) * z) / fx
327        y = ((v - cy) * z) / fy
328        pcd = np.stack([x, y, z], axis=-1).reshape(-1, 3)
329        pcd = np.matmul(transform[:3, :3], pcd.T).T + transform[:3, 3]
330        clr = color_rgb.reshape(-1, 3)
331
332        return pcd, clr