Python 解析 ROS2 mcap

1. 环境设置

1.1. Python 依赖

pip install mcap-ros2-support numpy pillow

1.2. 安装 foxglove

下载链接:https://foxglove.dev/download


2. 检查 mcap 里有哪些 topic

inspect_mcap.py:

from collections import defaultdict
from mcap_ros2.reader import read_ros2_messages


def inspect_mcap(mcap_path: str, max_msgs: int = 200000):
    topic_info = defaultdict(lambda: {"count": 0, "msg_type": None})
    for i, msg in enumerate(read_ros2_messages(mcap_path)):
        topic = msg.channel.topic
        tname = type(msg.ros_msg).__name__
        topic_info[topic]["count"] += 1
        topic_info[topic]["msg_type"] = tname
        if i + 1 >= max_msgs:
            break

    print("Detected topics:")
    for topic, info in sorted(topic_info.items()):
        print(
            f"- {topic:40s} type={info['msg_type']:25s} count={info['count']}")


if __name__ == "__main__":
    import argparse
    ap = argparse.ArgumentParser()
    ap.add_argument("mcap_path")
    args = ap.parse_args()
    inspect_mcap(args.mcap_path)

运行示例:

python inspect_mcap.py ~/Desktop/workspace/rosbag2_2026_03_24-18_35_20/rosbag2_2026_03_24-18_35_20_0.mcap 

输出示例:

Detected topics:
- /robot/topic/rdap/front_color_sensor_camera type=Image                     count=566
- /robot/topic/rdap/raw_front_left_sensor_camera type=Image                     count=372
- /robot/topic/rdap/raw_front_right_sensor_camera type=Image                     count=360
- /teleop/sync_state                       type=TeleopState               count=631

2. 获取关节状态

extract_joint_states.py:

import argparse
import json
from pathlib import Path
from typing import Dict, List, Optional

from mcap_ros2.reader import read_ros2_messages


def get_header_stamp_ns(msg) -> Optional[int]:
    header = getattr(msg, "header", None)
    if header is None:
        return None
    stamp = getattr(header, "stamp", None)
    if stamp is None:
        return None
    sec = getattr(stamp, "sec", None)
    nanosec = getattr(stamp, "nanosec", None)
    if sec is None or nanosec is None:
        return None
    return int(sec) * 1_000_000_000 + int(nanosec)


def get_log_time_ns(rec) -> int:
    log_time_ns = getattr(rec, "log_time_ns", None)
    if log_time_ns is not None:
        return int(log_time_ns)

    # Compatibility with reader versions that expose datetime only.
    log_time = getattr(rec, "log_time", None)
    if log_time is not None and hasattr(log_time, "timestamp"):
        return int(log_time.timestamp() * 1_000_000_000)

    raise ValueError("cannot parse record log time")


def parse_motor_states(msg, key: str) -> Optional[List[object]]:
    part = getattr(msg, key, None)
    if part is None:
        return None
    states = getattr(part, "states", None)
    if states is None or not isinstance(states, list) or len(states) == 0:
        return None
    return states


def extract_values(states: List[object], field: str) -> List[float]:
    values = []
    for s in states:
        v = getattr(s, field, 0.0)
        values.append(float(v))
    return values


def extract_joint_rows(
    mcap_path: str,
    topic: str,
    motor_key: str,
    max_msgs: int = 0,
) -> List[Dict[str, object]]:
    rows: List[Dict[str, object]] = []

    for rec in read_ros2_messages(mcap_path):
        if rec.channel.topic != topic:
            continue

        msg = rec.ros_msg
        states = parse_motor_states(msg, motor_key)
        if states is None:
            continue

        header_stamp_ns = get_header_stamp_ns(msg)
        log_time_ns = get_log_time_ns(rec)
        timestamp_ns = header_stamp_ns if header_stamp_ns is not None else log_time_ns

        q = extract_values(states, "q")
        dq = extract_values(states, "dq")
        tau_est = extract_values(states, "tau_est")

        row = {
            "timestamp_ns": timestamp_ns,
            "timestamp_sec": f"{timestamp_ns / 1e9:.9f}",
            "header_stamp_ns": header_stamp_ns if header_stamp_ns is not None else "",
            "log_time_ns": log_time_ns,
            "joint_count": len(states),
            "q_rad": q,
            "dq_rad_s": dq,
            "tau_est": tau_est,
        }
        rows.append(row)

        if max_msgs > 0 and len(rows) >= max_msgs:
            break

    return rows


def write_jsonl(path: Path, rows: List[Dict[str, object]]) -> None:
    with path.open("w", encoding="utf-8") as f:
        for row in rows:
            f.write(json.dumps(row, ensure_ascii=False) + "\n")


def main():
    ap = argparse.ArgumentParser(
        description="Extract joint position/velocity/torque from MCAP ROS2 messages."
    )
    ap.add_argument("--mcap", required=True, help="input .mcap path")
    ap.add_argument("--topic", default="/teleop/sync_state", help="source topic")
    ap.add_argument(
        "--motor-key",
        default="upper_states",
        help="field in message that contains MotorStates (e.g. upper_states)",
    )
    ap.add_argument(
        "--output-dir",
        default="joint_states_20260324",
        help="output directory",
    )
    ap.add_argument(
        "--max-msgs",
        type=int,
        default=0,
        help="optional limit for debugging, 0 means no limit",
    )
    args = ap.parse_args()

    rows = extract_joint_rows(
        mcap_path=args.mcap,
        topic=args.topic,
        motor_key=args.motor_key,
        max_msgs=args.max_msgs,
    )

    out_dir = Path(args.output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    jsonl_path = out_dir / "joint_states.jsonl"

    write_jsonl(jsonl_path, rows)

    print(f"Extracted frames: {len(rows)}")
    if rows:
        print(f"Joint count: {rows[0]['joint_count']}")
        print(f"Time range: {rows[0]['timestamp_ns']} -> {rows[-1]['timestamp_ns']}")
    print(f"JSONL: {jsonl_path}")


if __name__ == "__main__":
    main()

运行示例:

python extract_joint_states.py   --mcap "$HOME/Desktop/workspace/rosbag2_2026_03_24-18_35_20/rosbag2_2026_03_24-18_35_20_0.mcap"   --topic /teleop/sync_state   --motor-key upper_states   --output-dir joint_states_20260324

输出示例:

{"timestamp_ns": 1774348525932101741, "timestamp_sec": "1774348525.932101727", "header_stamp_ns": 1774348525932101741, "log_time_ns": 1774348525933098485, "joint_count": 17, "q_rad": [0.7381181716918945, 0.3676636219024658, -0.02226141095161438, 0.9963552951812744, 0.24104848504066467, 0.364896684885025, 0.413713663816452, -0.004354061093181372, -0.18004970252513885, 0.0005450062453746796, -0.13481418788433075, -0.11687090247869492, -0.08058905601501465, -1.5736966133117676, -0.12144656479358673, 0.030538318678736687, 0.1959267556667328], "dq_rad_s": [0.0005629734951071441, -0.03234103322029114, 0.0011678705923259258, 0.1537696272134781, 0.0009043510654009879, 0.2532781958580017, 0.012301569804549217, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0005629734951071441, 0.0, 0.0, 0.0, 0.0], "tau_est": [6.510000228881836, 1.3860000371932983, 1.2600001096725464, -0.42500001192092896, 0.04200000315904617, 0.37800002098083496, 0.04200000315904617, 0.0, 0.0, 0.0, -0.5040000081062317, -2.2260000705718994, 0.8400000333786011, 0.2750000059604645, 0.12600000202655792, -0.37800002098083496, 0.1469999998807907]}

3. 获取 Camera Frame

extract_camera_frames.py:

import argparse
import json
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple

import numpy as np
from PIL import Image
from mcap_ros2.reader import read_ros2_messages


def sanitize_topic(topic: str) -> str:
    """将 ROS topic 转成可作为目录名的安全字符串。"""
    return topic.strip("/").replace("/", "__")


def get_header_stamp_ns(ros_msg) -> Optional[int]:
    """从 ROS 消息头中读取时间戳(纳秒)。

    若消息没有 header/stamp 或字段不完整,返回 None。
    """
    header = getattr(ros_msg, "header", None)
    if header is None:
        return None
    stamp = getattr(header, "stamp", None)
    if stamp is None:
        return None
    sec = getattr(stamp, "sec", None)
    nanosec = getattr(stamp, "nanosec", None)
    if sec is None or nanosec is None:
        return None
    return int(sec) * 1_000_000_000 + int(nanosec)


def get_log_time_ns(rec) -> int:
    """读取 MCAP 记录的写入时间(纳秒)。

    优先使用新版本 reader 提供的 `log_time_ns`;
    若不可用,回退到旧版本的 datetime `log_time`。
    """
    log_time_ns = getattr(rec, "log_time_ns", None)
    if log_time_ns is not None:
        return int(log_time_ns)

    # 兼容旧版本:只有 datetime,没有原生纳秒字段。
    log_time = getattr(rec, "log_time", None)
    if log_time is not None and hasattr(log_time, "timestamp"):
        return int(log_time.timestamp() * 1_000_000_000)

    raise ValueError("cannot parse record log time")


def is_image_message(ros_msg) -> bool:
    """判断消息是否为图像消息。

    优先按消息类型名判断(与 inspect_mcap.py 同思路);
    若类型名不稳定,再回退到字段结构判断。
    """
    if type(ros_msg).__name__ == "Image":
        return True

    # 兜底:有些环境下类型名可能不一致,保留字段判断提高兼容性。
    required = ("height", "width", "encoding", "step", "data")
    return all(hasattr(ros_msg, key) for key in required)


def _reshape_rows_u8(data: bytes, height: int, row_bytes: int) -> np.ndarray:
    """将 uint8 字节流按“行”重排,便于后续按编码切片。

    示例:
    - 若 `height=2`、`row_bytes=6`,且 `data` 总长度为 12 字节,
      则返回数组形状为 `(2, 6)`。
    - 之后可按真实像素宽度裁剪,比如 `rows[:, : width * 3]`(rgb8)。
    """
    arr = np.frombuffer(data, dtype=np.uint8)
    return arr.reshape(height, row_bytes)


def decode_ros_image_to_pil(ros_msg) -> Image.Image:
    """将 ROS Image 消息解码为 PIL 图像。

    已支持常见编码:
    - rgb8 / bgr8
    - rgba8 / bgra8
    - mono8 / 8uc1
    - mono16 / 16uc1
    """
    height = int(ros_msg.height)
    width = int(ros_msg.width)
    step = int(ros_msg.step)
    encoding = str(ros_msg.encoding).lower()
    raw = bytes(ros_msg.data)

    if height <= 0 or width <= 0:
        raise ValueError(f"invalid image shape: h={height}, w={width}")

    if encoding == "rgb8":
        # step 表示每行实际字节数,可能大于 width*3(行尾 padding)。
        # 先按 (height, step) 拆成二维行视图,再截取前 width*3 字节去掉 padding,
        # 最后 reshape 为标准 RGB 形状 (height, width, 3)。
        rows = _reshape_rows_u8(raw, height, step)
        arr = rows[:, : width * 3].reshape(height, width, 3)
        return Image.fromarray(arr, mode="RGB")

    if encoding == "bgr8":
        rows = _reshape_rows_u8(raw, height, step)
        arr = rows[:, : width * 3].reshape(height, width, 3)
        arr = arr[:, :, ::-1]
        return Image.fromarray(arr, mode="RGB")

    if encoding == "rgba8":
        rows = _reshape_rows_u8(raw, height, step)
        arr = rows[:, : width * 4].reshape(height, width, 4)
        return Image.fromarray(arr, mode="RGBA")

    if encoding == "bgra8":
        rows = _reshape_rows_u8(raw, height, step)
        arr = rows[:, : width * 4].reshape(height, width, 4)
        arr = arr[:, :, [2, 1, 0, 3]]
        return Image.fromarray(arr, mode="RGBA")

    if encoding in ("mono8", "8uc1"):
        # 单通道 8bit 灰度图。
        rows = _reshape_rows_u8(raw, height, step)
        arr = rows[:, :width]
        return Image.fromarray(arr, mode="L")

    if encoding in ("mono16", "16uc1"):
        # 单通道 16bit,step 以字节计,因此每行元素个数是 step//2。
        row_values = step // 2
        arr = np.frombuffer(raw, dtype=np.uint16).reshape(height, row_values)
        arr = arr[:, :width]
        return Image.fromarray(arr, mode="I;16")

    raise ValueError(f"unsupported image encoding: {ros_msg.encoding}")


def ensure_dir(path: Path) -> None:
    """确保目录存在(不存在则递归创建)。"""
    path.mkdir(parents=True, exist_ok=True)


def frame_filename(timestamp_ns: int, index: int) -> str:
    """生成稳定可排序的帧文件名:时间戳 + topic 内序号。"""
    return f"{timestamp_ns:019d}_{index:06d}.png"


def write_metadata_jsonl(jsonl_path: Path, rows: Iterable[Dict[str, object]]) -> None:
    """把帧元信息按 JSONL 写入磁盘。"""
    with jsonl_path.open("w", encoding="utf-8") as f:
        for row in rows:
            f.write(json.dumps(row, ensure_ascii=False) + "\n")


def extract_frames(
    mcap_path: str,
    output_dir: str,
) -> Tuple[int, Dict[str, int]]:
    """提取 MCAP 中所有图像消息并落盘。

    返回:
    - 导出的总帧数
    - 各 topic 的帧计数
    """
    out_root = Path(output_dir)
    ensure_dir(out_root)

    # topic_index: 每个 topic 下“下一帧”的序号(用于文件命名)
    # topic_counts: 每个 topic 实际导出数量(用于最终统计打印)
    topic_index: Dict[str, int] = {}
    topic_counts: Dict[str, int] = {}
    metadata_rows: List[Dict[str, object]] = []
    error_count = 0

    # 逐条遍历 MCAP 记录:不做 topic 过滤,只要是图像消息就导出。
    for rec in read_ros2_messages(mcap_path):
        topic = rec.channel.topic

        ros_msg = rec.ros_msg
        if not is_image_message(ros_msg):
            continue

        try:
            pil_img = decode_ros_image_to_pil(ros_msg)
        except Exception as exc:
            error_count += 1
            print(f"[WARN] skip frame topic={topic}: {exc}")
            continue

        # 优先使用消息头时间;若缺失则退回 MCAP 记录时间。
        header_stamp_ns = get_header_stamp_ns(ros_msg)
        log_time_ns = get_log_time_ns(rec)
        timestamp_ns = header_stamp_ns if header_stamp_ns is not None else log_time_ns
        frame_idx = topic_index.get(topic, 0)
        topic_index[topic] = frame_idx + 1
        topic_counts[topic] = topic_counts.get(topic, 0) + 1

        topic_dir = out_root / sanitize_topic(topic)
        ensure_dir(topic_dir)
        fname = frame_filename(timestamp_ns, frame_idx)
        frame_path = topic_dir / fname
        pil_img.save(frame_path)

        # 记录每一帧的关键元信息,便于后续对齐/回放/检索。
        metadata_rows.append(
            {
                "topic": topic,
                "frame_index": frame_idx,
                "file_path": str(frame_path),
                "timestamp_ns": timestamp_ns,
                "timestamp_sec": f"{timestamp_ns / 1e9:.9f}",
                "header_stamp_ns": header_stamp_ns if header_stamp_ns is not None else "",
                "log_time_ns": log_time_ns,
                "encoding": str(ros_msg.encoding),
                "width": int(ros_msg.width),
                "height": int(ros_msg.height),
            }
        )

    # 全量覆盖写入索引文件。
    write_metadata_jsonl(out_root / "frames_index.jsonl", metadata_rows)

    if error_count > 0:
        print(f"[INFO] skipped {error_count} frames due to decode errors")

    return len(metadata_rows), topic_counts


def main():
    """命令行入口。"""
    ap = argparse.ArgumentParser(
        description="从 MCAP 中提取 ROS2 图像帧,并保存逐帧时间戳索引。"
    )
    ap.add_argument("--mcap", required=True, help="输入 .mcap 文件路径")
    ap.add_argument(
        "--output",
        default="camera_frames",
        help="输出目录(保存图片和 frames_index.jsonl)",
    )
    args = ap.parse_args()

    total, counts = extract_frames(
        mcap_path=args.mcap,
        output_dir=args.output,
    )

    print(f"\nExported total frames: {total}")
    for topic, count in sorted(counts.items()):
        print(f"- {topic}: {count}")
    print(f"Metadata JSONL: {Path(args.output) / 'frames_index.jsonl'}")


if __name__ == "__main__":
    main()

运行示例:

python "extract_camera_frames.py" --mcap "$HOME/Desktop/workspace/rosbag2_2026_03_24-18_35_20/rosbag2_2026_03_24-18_35_20_0.mcap" --output "camera_frames_20260324"

输出示例:

Exported total frames: 1298
- /robot/topic/rdap/front_color_sensor_camera: 566
- /robot/topic/rdap/raw_front_left_sensor_camera: 372
- /robot/topic/rdap/raw_front_right_sensor_camera: 360
Metadata JSONL: camera_frames_20260324/frames_index.jsonl