Files
VSlamExample/data_parser/data_parser.py
Ryan Fitz-Gerald 51a8c0e923 Added motion estimation with outputs
Added Recorder pose outputs to compare calculations
Added dataset parsing library
2026-03-13 15:50:29 -04:00

410 lines
14 KiB
Python

import json
from os import PathLike
from pathlib import Path
from typing import Optional
import numpy as np
from scipy.spatial.transform import RigidTransform, Rotation
from sqlalchemy import Column, Engine, Float, Integer, Enum, ForeignKey, String
from sqlalchemy.orm import DeclarativeBase, Session, relationship
import re
class ModelBase(DeclarativeBase):
pass
metadata = ModelBase.metadata
def create_tables(engine:Engine):
metadata.create_all(engine)
ts_pose_re = re.compile(r"city_SE3_egovehicle_(?P<timestamp>\d*)")
cam_name_re = re.compile(r"image_raw_(?P<name>.*)")
camview_fn_re = re.compile(r"(?P<cam_name>[^\d]*)_(?P<timestamp>\d*)")
def json_to_pose(data: dict) -> RigidTransform:
""" Creates a pose from an Argoverse pose json
Args:
data (dict): json contents
Returns:
RigidTransform: pose in the json data
"""
# TODO improve json validation
if 'rotation' in data:
if 'coefficients' in data['rotation']:
rot_data = data['rotation']['coefficients']
else:
rot_data = data['rotation']
else:
raise RuntimeError('No rotation present in pose data')
if 'translation' in data:
trans_data = data['translation']
else:
raise RuntimeError('No translation present in pose data')
return RigidTransform.from_components(
np.array(trans_data),
Rotation.from_quat(rot_data))
class Dataset(ModelBase):
""" Represents a dataset """
__tablename__ = "datasets"
id = Column(Integer, primary_key=True, autoincrement=True)
root_path = Column(String(256))
key = Column(String(36))
city = Column(String(3), default="")
timestamps = relationship("Timestamp", back_populates='dataset')
cameras = relationship("Camera", back_populates='dataset')
camera_views = relationship("CameraView", back_populates='dataset')
def __init__(self, root_path: PathLike, key: str, city: str = ''):
""" Constructor
Args:
key (str): key of the dataset
root_path (PathLike): Root of the data directory
"""
self.key = key
self.city = city if len(city) <= 3 else city[0:3]
self.root_path = str(root_path)
@staticmethod
def import_dataset(root_path: PathLike, session:Session) -> 'Dataset':
""" Imports a dataset
Args:
root_path (PathLike): root path to the dataset
session (Session): SQLAlchemy session to use
Returns:
Dataset: Reference to the dataset
"""
root_path = Path(root_path)
key = root_path.stem
city = ''
city_info_path = root_path / "city_info.json"
calib_path = root_path / "vehicle_calibration_info.json"
pose_root = root_path / "poses"
# Get City Information
if city_info_path.exists():
with open(city_info_path, 'r') as f:
data = json.load(f)
city = data['city'] if 'city' in data else ''
# Create the new dataset
dataset = Dataset(root_path, key, city)
# Create timestamps from poses
for path in pose_root.glob("*.json"):
# TODO Validate json
timestamp = Timestamp.from_json(path, dataset, session)
# Create Cameras
with open(calib_path, 'r') as f:
data = json.load(f)
# TODO Validate file
cameras = []
for camera_data in data['camera_data_']:
camera = Camera.from_json(camera_data, dataset, session)
if camera is not None:
cameras.append(camera)
# Create Camera Views
for camera in cameras:
camera_dir = root_path / camera.name
for path in camera_dir.glob("*.*"):
CameraView.from_path(path, dataset, session)
return dataset
class Timestamp(ModelBase):
""" Correlates data at a given timestamp """
__tablename__ = "timestamps"
id = Column(Integer, primary_key=True, autoincrement=True)
time = Column(Integer)
vehicle_pose_x = Column(Float) # x component
vehicle_pose_y = Column(Float) # y component
vehicle_pose_z = Column(Float) # z component
vehicle_pose_q0 = Column(Float) # Quaternion component 0
vehicle_pose_q1 = Column(Float) # Quaternion component 1
vehicle_pose_q2 = Column(Float) # Quaternion component 2
vehicle_pose_q3 = Column(Float) # Quaternion component 3
dataset_id = Column(Integer, ForeignKey('datasets.id'), nullable=False)
dataset = relationship('Dataset', back_populates='timestamps')
camera_views = relationship("CameraView", back_populates="timestamp")
def __init__(self, time: int, vehicle_pose: RigidTransform, dataset: Dataset):
""" Constructor
Args:
time (int): nanosecond time for the timestamp
vehicle_pose (RigidTransform): vehicle pose in world coordinates
dataset (Dataset): dataset this timestamp belongs to
"""
self.time = time
self.set_vehicle_pose(vehicle_pose)
self.dataset = dataset
def set_vehicle_pose(self, vehicle_pose: RigidTransform):
""" Sets the vehicle's pose
Args:
vehicle_pose (RigidTransform): vehicle pose in world coordinates
"""
t, R = vehicle_pose.as_components()
quat = R.as_quat()
self.vehicle_pose_x = t[0]
self.vehicle_pose_y = t[1]
self.vehicle_pose_z = t[2]
self.vehicle_pose_q0 = quat[0]
self.vehicle_pose_q1 = quat[0]
self.vehicle_pose_q2 = quat[0]
self.vehicle_pose_q3 = quat[0]
def get_vehicle_pose(self) -> RigidTransform:
""" Returns the vehicle pose as a RigidTransform object """
return RigidTransform.from_components(
translation=np.array(
[self.vehicle_pose_x, self.vehicle_pose_y, self.vehicle_pose_z]),
rotation=Rotation.from_quat([self.vehicle_pose_q0, self.vehicle_pose_q1, self.vehicle_pose_q2, self.vehicle_pose_q3]))
@staticmethod
def from_json(path: PathLike, dataset: Dataset, session:Session) -> Optional["Timestamp"]:
""" Creates a timestamp object from a pose file path and dataset object
Args:
path (PathLike): Path to pose file
dataset (Dataset): Dataset object to which this timestamp
session (Session): SQLAlchemy session to use
Returns:
Optional[Timestamp]: new timestamp object if it is successfully created, None otherwise
"""
result = None
path = Path(path)
ts_pose_match = ts_pose_re.match(str(path.stem))
if ts_pose_match is not None:
time = int(ts_pose_match.group("timestamp"))
with open(path, 'r') as f:
# TODO Add exception handling
result = Timestamp(time, json_to_pose(json.load(f)), dataset)
session.add(result)
session.commit()
return result
class Camera(ModelBase):
""" Represents a camera in the dataset """
__tablename__ = "cameras"
id = Column(Integer, primary_key=True)
name = Column(String(20), nullable=False)
fx = Column(Float, nullable=False)
fy = Column(Float, nullable=False)
cx = Column(Float, nullable=False)
cy = Column(Float, nullable=False)
extrinsics_x = Column(Float) # x component
extrinsics_y = Column(Float) # y component
extrinsics_z = Column(Float) # z component
extrinsics_q0 = Column(Float) # Quaternion component 0
extrinsics_q1 = Column(Float) # Quaternion component 1
extrinsics_q2 = Column(Float) # Quaternion component 2
extrinsics_q3 = Column(Float) # Quaternion component 3
dataset_id = Column(Integer, ForeignKey('datasets.id'), nullable=False)
dataset = relationship('Dataset', back_populates='cameras')
camera_views = relationship("CameraView", back_populates="camera")
def __init__(self, name: str, intrinsics: np.ndarray, extrinsics: RigidTransform, dataset: Dataset):
""" Constructor
Args:
name (str): Name of the camera
intrinsics (np.ndarray): Camera intrinsics matrix
extrinsics (RigidTransform): Camera extrinsics transform
dataset (Dataset): dataset this camera belongs to
"""
self.name = name
self.set_intrinsics(intrinsics)
self.set_extrinsics(extrinsics)
self.dataset = dataset
def set_intrinsics(self, intrinsics: np.ndarray):
""" Sets the camera intrinsics matrix
Args:
intrinsics (np.ndarray): Camera intrinsics matrix
"""
self.fx = intrinsics[0, 0]
self.fy = intrinsics[1, 1]
self.cx = intrinsics[0, 2]
self.cy = intrinsics[1, 2]
def set_extrinsics(self, extrinsics: RigidTransform):
""" Sets the camera extrinsics
Args:
extrinsics (RigidTransform): camera extrinsics in vehicle coordinates
"""
t, R = extrinsics.as_components()
quat = R.as_quat()
self.extrinsics_x = t[0]
self.extrinsics_y = t[1]
self.extrinsics_z = t[2]
self.extrinsics_q0 = quat[0]
self.extrinsics_q0 = quat[0]
self.extrinsics_q0 = quat[0]
self.extrinsics_q0 = quat[0]
def get_intrinsics(self) -> np.ndarray:
""" Gets the camera intrinsics matrix
Returns:
np.ndarray: 3 x 3 camera intrinsics matrix
"""
return np.array([[self.fx, 0, self.cx],
[0, self.fy, self.cy],
[0, 0, 1]])
def get_extrinsics(self) -> RigidTransform:
""" Returns the extrinsics as a RigidTransform object """
return RigidTransform.from_components(
translation=np.array(
[self.extrinsics_x, self.extrinsics_y, self.extrinsics_z]),
rotation=Rotation.from_quat([self.extrinsics_q0, self.extrinsics_q1, self.extrinsics_q2, self.extrinsics_q3]))
@staticmethod
def from_json(data: dict, dataset: Dataset, session:Session) -> Optional["Camera"]:
""" Creates a camera from json data
Args:
data (dict): camera data dict
dataset (Dataset): dataset object to which the camera belongs
session (Session): SQLAlchemy session to use
Returns:
Camera: Camera object if the data is valid, None otherwise
"""
result = None
# TODO Validate data
name_match = cam_name_re.match(data['key'])
if name_match is not None:
name = name_match.group('name')
values = data['value']
intrinsics = np.array([[values['focal_length_x_px_'], 0, values['focal_center_x_px_']],
[0, values['focal_length_y_px_'], values['focal_center_x_px_']],
[0, 0, 1]])
extrinsics = json_to_pose(values['vehicle_SE3_camera_'])
result = Camera(name, intrinsics, extrinsics, dataset)
session.add(result)
session.commit()
return result
class CameraView(ModelBase):
""" Represents a camera view at a given timestamp """
__tablename__ = "camera_views"
id = Column(Integer, primary_key=True)
ext = Column(String(3), nullable=False)
dataset_id = Column(Integer, ForeignKey('datasets.id'), nullable=False)
dataset = relationship('Dataset', back_populates='camera_views')
timestamp_id = Column(Integer, ForeignKey('timestamps.id'), nullable=False)
timestamp = relationship('Timestamp', back_populates='camera_views')
camera_id = Column(Integer, ForeignKey('cameras.id'), nullable=False)
camera = relationship('Camera', back_populates='camera_views', order_by=Timestamp.time)
def __init__(self, ext: str, dataset: Dataset, timestamp: Timestamp, camera: Camera):
""" Constructor
Args:
dataset (Dataset): dataset this camera view belongs to
timestamp (Timestamp): timestamp this camera view was taken
camera (Camera): camera that took this camera view
"""
self.ext = ext
self.dataset = dataset
self.timestamp = timestamp
self.camera = camera
def get_path(self) -> Path:
""" Get the path to the camera view image file
Returns:
Path: Path to the camera view image file
"""
return Path(str(self.dataset.root_path)) / str(self.camera.name) / f"{self.camera.name}_{self.timestamp.time}.{self.ext}"
@staticmethod
def from_path(path:PathLike, dataset:Dataset, session:Session) -> Optional['CameraView']:
""" Creates a camera view from a file path
Args:
path (PathLike): Path to the camera view image file
dataset (Dataset): dataset this camera view belongs to
Returns:
Optional[CameraView]: CameraView object if the path is valid, None otherwise
"""
result = None
path = Path(path)
camview_fn_match = camview_fn_re.match(path.stem)
if camview_fn_match is not None:
time = int(camview_fn_match.group('timestamp'))
cam_name = camview_fn_match.group('cam_name')
timestamp = session.query(Timestamp).filter_by(time=time).first()
camera = session.query(Camera).filter_by(name=cam_name).first()
if camera is not None and timestamp is not None:
result = CameraView(path.suffix[1:], dataset, timestamp, camera)
session.add(result)
session.commit()
return result