Python 解析 ROS2 mcap
1. 环境设置
1.1. Python 依赖
pip install mcap-ros2-support numpy pillow1.2. 安装 foxglove
2. 检查 mcap 里有哪些 topic
inspect_mcap.py:
from collections import defaultdict
from mcap_ros2.reader import read_ros2_messages
def inspect_mcap(mcap_path: str, max_msgs: int = 200000):
topic_info = defaultdict(lambda: {"count": 0, "msg_type": None})
for i, msg in enumerate(read_ros2_messages(mcap_path)):
topic = msg.channel.topic
tname = type(msg.ros_msg).__name__
topic_info[topic]["count"] += 1
topic_info[topic]["msg_type"] = tname
if i + 1 >= max_msgs:
break
print("Detected topics:")
for topic, info in sorted(topic_info.items()):
print(
f"- {topic:40s} type={info['msg_type']:25s} count={info['count']}")
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("mcap_path")
args = ap.parse_args()
inspect_mcap(args.mcap_path)
运行示例:
python inspect_mcap.py ~/Desktop/workspace/rosbag2_2026_03_24-18_35_20/rosbag2_2026_03_24-18_35_20_0.mcap 输出示例:
Detected topics:
- /robot/topic/rdap/front_color_sensor_camera type=Image count=566
- /robot/topic/rdap/raw_front_left_sensor_camera type=Image count=372
- /robot/topic/rdap/raw_front_right_sensor_camera type=Image count=360
- /teleop/sync_state type=TeleopState count=6312. 获取关节状态
extract_joint_states.py:
import argparse
import json
from pathlib import Path
from typing import Dict, List, Optional
from mcap_ros2.reader import read_ros2_messages
def get_header_stamp_ns(msg) -> Optional[int]:
header = getattr(msg, "header", None)
if header is None:
return None
stamp = getattr(header, "stamp", None)
if stamp is None:
return None
sec = getattr(stamp, "sec", None)
nanosec = getattr(stamp, "nanosec", None)
if sec is None or nanosec is None:
return None
return int(sec) * 1_000_000_000 + int(nanosec)
def get_log_time_ns(rec) -> int:
log_time_ns = getattr(rec, "log_time_ns", None)
if log_time_ns is not None:
return int(log_time_ns)
# Compatibility with reader versions that expose datetime only.
log_time = getattr(rec, "log_time", None)
if log_time is not None and hasattr(log_time, "timestamp"):
return int(log_time.timestamp() * 1_000_000_000)
raise ValueError("cannot parse record log time")
def parse_motor_states(msg, key: str) -> Optional[List[object]]:
part = getattr(msg, key, None)
if part is None:
return None
states = getattr(part, "states", None)
if states is None or not isinstance(states, list) or len(states) == 0:
return None
return states
def extract_values(states: List[object], field: str) -> List[float]:
values = []
for s in states:
v = getattr(s, field, 0.0)
values.append(float(v))
return values
def extract_joint_rows(
mcap_path: str,
topic: str,
motor_key: str,
max_msgs: int = 0,
) -> List[Dict[str, object]]:
rows: List[Dict[str, object]] = []
for rec in read_ros2_messages(mcap_path):
if rec.channel.topic != topic:
continue
msg = rec.ros_msg
states = parse_motor_states(msg, motor_key)
if states is None:
continue
header_stamp_ns = get_header_stamp_ns(msg)
log_time_ns = get_log_time_ns(rec)
timestamp_ns = header_stamp_ns if header_stamp_ns is not None else log_time_ns
q = extract_values(states, "q")
dq = extract_values(states, "dq")
tau_est = extract_values(states, "tau_est")
row = {
"timestamp_ns": timestamp_ns,
"timestamp_sec": f"{timestamp_ns / 1e9:.9f}",
"header_stamp_ns": header_stamp_ns if header_stamp_ns is not None else "",
"log_time_ns": log_time_ns,
"joint_count": len(states),
"q_rad": q,
"dq_rad_s": dq,
"tau_est": tau_est,
}
rows.append(row)
if max_msgs > 0 and len(rows) >= max_msgs:
break
return rows
def write_jsonl(path: Path, rows: List[Dict[str, object]]) -> None:
with path.open("w", encoding="utf-8") as f:
for row in rows:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
def main():
ap = argparse.ArgumentParser(
description="Extract joint position/velocity/torque from MCAP ROS2 messages."
)
ap.add_argument("--mcap", required=True, help="input .mcap path")
ap.add_argument("--topic", default="/teleop/sync_state", help="source topic")
ap.add_argument(
"--motor-key",
default="upper_states",
help="field in message that contains MotorStates (e.g. upper_states)",
)
ap.add_argument(
"--output-dir",
default="joint_states_20260324",
help="output directory",
)
ap.add_argument(
"--max-msgs",
type=int,
default=0,
help="optional limit for debugging, 0 means no limit",
)
args = ap.parse_args()
rows = extract_joint_rows(
mcap_path=args.mcap,
topic=args.topic,
motor_key=args.motor_key,
max_msgs=args.max_msgs,
)
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
jsonl_path = out_dir / "joint_states.jsonl"
write_jsonl(jsonl_path, rows)
print(f"Extracted frames: {len(rows)}")
if rows:
print(f"Joint count: {rows[0]['joint_count']}")
print(f"Time range: {rows[0]['timestamp_ns']} -> {rows[-1]['timestamp_ns']}")
print(f"JSONL: {jsonl_path}")
if __name__ == "__main__":
main()运行示例:
python extract_joint_states.py --mcap "$HOME/Desktop/workspace/rosbag2_2026_03_24-18_35_20/rosbag2_2026_03_24-18_35_20_0.mcap" --topic /teleop/sync_state --motor-key upper_states --output-dir joint_states_20260324输出示例:
{"timestamp_ns": 1774348525932101741, "timestamp_sec": "1774348525.932101727", "header_stamp_ns": 1774348525932101741, "log_time_ns": 1774348525933098485, "joint_count": 17, "q_rad": [0.7381181716918945, 0.3676636219024658, -0.02226141095161438, 0.9963552951812744, 0.24104848504066467, 0.364896684885025, 0.413713663816452, -0.004354061093181372, -0.18004970252513885, 0.0005450062453746796, -0.13481418788433075, -0.11687090247869492, -0.08058905601501465, -1.5736966133117676, -0.12144656479358673, 0.030538318678736687, 0.1959267556667328], "dq_rad_s": [0.0005629734951071441, -0.03234103322029114, 0.0011678705923259258, 0.1537696272134781, 0.0009043510654009879, 0.2532781958580017, 0.012301569804549217, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0005629734951071441, 0.0, 0.0, 0.0, 0.0], "tau_est": [6.510000228881836, 1.3860000371932983, 1.2600001096725464, -0.42500001192092896, 0.04200000315904617, 0.37800002098083496, 0.04200000315904617, 0.0, 0.0, 0.0, -0.5040000081062317, -2.2260000705718994, 0.8400000333786011, 0.2750000059604645, 0.12600000202655792, -0.37800002098083496, 0.1469999998807907]}3. 获取 Camera Frame
extract_camera_frames.py:
import argparse
import json
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import numpy as np
from PIL import Image
from mcap_ros2.reader import read_ros2_messages
def sanitize_topic(topic: str) -> str:
"""将 ROS topic 转成可作为目录名的安全字符串。"""
return topic.strip("/").replace("/", "__")
def get_header_stamp_ns(ros_msg) -> Optional[int]:
"""从 ROS 消息头中读取时间戳(纳秒)。
若消息没有 header/stamp 或字段不完整,返回 None。
"""
header = getattr(ros_msg, "header", None)
if header is None:
return None
stamp = getattr(header, "stamp", None)
if stamp is None:
return None
sec = getattr(stamp, "sec", None)
nanosec = getattr(stamp, "nanosec", None)
if sec is None or nanosec is None:
return None
return int(sec) * 1_000_000_000 + int(nanosec)
def get_log_time_ns(rec) -> int:
"""读取 MCAP 记录的写入时间(纳秒)。
优先使用新版本 reader 提供的 `log_time_ns`;
若不可用,回退到旧版本的 datetime `log_time`。
"""
log_time_ns = getattr(rec, "log_time_ns", None)
if log_time_ns is not None:
return int(log_time_ns)
# 兼容旧版本:只有 datetime,没有原生纳秒字段。
log_time = getattr(rec, "log_time", None)
if log_time is not None and hasattr(log_time, "timestamp"):
return int(log_time.timestamp() * 1_000_000_000)
raise ValueError("cannot parse record log time")
def is_image_message(ros_msg) -> bool:
"""判断消息是否为图像消息。
优先按消息类型名判断(与 inspect_mcap.py 同思路);
若类型名不稳定,再回退到字段结构判断。
"""
if type(ros_msg).__name__ == "Image":
return True
# 兜底:有些环境下类型名可能不一致,保留字段判断提高兼容性。
required = ("height", "width", "encoding", "step", "data")
return all(hasattr(ros_msg, key) for key in required)
def _reshape_rows_u8(data: bytes, height: int, row_bytes: int) -> np.ndarray:
"""将 uint8 字节流按“行”重排,便于后续按编码切片。
示例:
- 若 `height=2`、`row_bytes=6`,且 `data` 总长度为 12 字节,
则返回数组形状为 `(2, 6)`。
- 之后可按真实像素宽度裁剪,比如 `rows[:, : width * 3]`(rgb8)。
"""
arr = np.frombuffer(data, dtype=np.uint8)
return arr.reshape(height, row_bytes)
def decode_ros_image_to_pil(ros_msg) -> Image.Image:
"""将 ROS Image 消息解码为 PIL 图像。
已支持常见编码:
- rgb8 / bgr8
- rgba8 / bgra8
- mono8 / 8uc1
- mono16 / 16uc1
"""
height = int(ros_msg.height)
width = int(ros_msg.width)
step = int(ros_msg.step)
encoding = str(ros_msg.encoding).lower()
raw = bytes(ros_msg.data)
if height <= 0 or width <= 0:
raise ValueError(f"invalid image shape: h={height}, w={width}")
if encoding == "rgb8":
# step 表示每行实际字节数,可能大于 width*3(行尾 padding)。
# 先按 (height, step) 拆成二维行视图,再截取前 width*3 字节去掉 padding,
# 最后 reshape 为标准 RGB 形状 (height, width, 3)。
rows = _reshape_rows_u8(raw, height, step)
arr = rows[:, : width * 3].reshape(height, width, 3)
return Image.fromarray(arr, mode="RGB")
if encoding == "bgr8":
rows = _reshape_rows_u8(raw, height, step)
arr = rows[:, : width * 3].reshape(height, width, 3)
arr = arr[:, :, ::-1]
return Image.fromarray(arr, mode="RGB")
if encoding == "rgba8":
rows = _reshape_rows_u8(raw, height, step)
arr = rows[:, : width * 4].reshape(height, width, 4)
return Image.fromarray(arr, mode="RGBA")
if encoding == "bgra8":
rows = _reshape_rows_u8(raw, height, step)
arr = rows[:, : width * 4].reshape(height, width, 4)
arr = arr[:, :, [2, 1, 0, 3]]
return Image.fromarray(arr, mode="RGBA")
if encoding in ("mono8", "8uc1"):
# 单通道 8bit 灰度图。
rows = _reshape_rows_u8(raw, height, step)
arr = rows[:, :width]
return Image.fromarray(arr, mode="L")
if encoding in ("mono16", "16uc1"):
# 单通道 16bit,step 以字节计,因此每行元素个数是 step//2。
row_values = step // 2
arr = np.frombuffer(raw, dtype=np.uint16).reshape(height, row_values)
arr = arr[:, :width]
return Image.fromarray(arr, mode="I;16")
raise ValueError(f"unsupported image encoding: {ros_msg.encoding}")
def ensure_dir(path: Path) -> None:
"""确保目录存在(不存在则递归创建)。"""
path.mkdir(parents=True, exist_ok=True)
def frame_filename(timestamp_ns: int, index: int) -> str:
"""生成稳定可排序的帧文件名:时间戳 + topic 内序号。"""
return f"{timestamp_ns:019d}_{index:06d}.png"
def write_metadata_jsonl(jsonl_path: Path, rows: Iterable[Dict[str, object]]) -> None:
"""把帧元信息按 JSONL 写入磁盘。"""
with jsonl_path.open("w", encoding="utf-8") as f:
for row in rows:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
def extract_frames(
mcap_path: str,
output_dir: str,
) -> Tuple[int, Dict[str, int]]:
"""提取 MCAP 中所有图像消息并落盘。
返回:
- 导出的总帧数
- 各 topic 的帧计数
"""
out_root = Path(output_dir)
ensure_dir(out_root)
# topic_index: 每个 topic 下“下一帧”的序号(用于文件命名)
# topic_counts: 每个 topic 实际导出数量(用于最终统计打印)
topic_index: Dict[str, int] = {}
topic_counts: Dict[str, int] = {}
metadata_rows: List[Dict[str, object]] = []
error_count = 0
# 逐条遍历 MCAP 记录:不做 topic 过滤,只要是图像消息就导出。
for rec in read_ros2_messages(mcap_path):
topic = rec.channel.topic
ros_msg = rec.ros_msg
if not is_image_message(ros_msg):
continue
try:
pil_img = decode_ros_image_to_pil(ros_msg)
except Exception as exc:
error_count += 1
print(f"[WARN] skip frame topic={topic}: {exc}")
continue
# 优先使用消息头时间;若缺失则退回 MCAP 记录时间。
header_stamp_ns = get_header_stamp_ns(ros_msg)
log_time_ns = get_log_time_ns(rec)
timestamp_ns = header_stamp_ns if header_stamp_ns is not None else log_time_ns
frame_idx = topic_index.get(topic, 0)
topic_index[topic] = frame_idx + 1
topic_counts[topic] = topic_counts.get(topic, 0) + 1
topic_dir = out_root / sanitize_topic(topic)
ensure_dir(topic_dir)
fname = frame_filename(timestamp_ns, frame_idx)
frame_path = topic_dir / fname
pil_img.save(frame_path)
# 记录每一帧的关键元信息,便于后续对齐/回放/检索。
metadata_rows.append(
{
"topic": topic,
"frame_index": frame_idx,
"file_path": str(frame_path),
"timestamp_ns": timestamp_ns,
"timestamp_sec": f"{timestamp_ns / 1e9:.9f}",
"header_stamp_ns": header_stamp_ns if header_stamp_ns is not None else "",
"log_time_ns": log_time_ns,
"encoding": str(ros_msg.encoding),
"width": int(ros_msg.width),
"height": int(ros_msg.height),
}
)
# 全量覆盖写入索引文件。
write_metadata_jsonl(out_root / "frames_index.jsonl", metadata_rows)
if error_count > 0:
print(f"[INFO] skipped {error_count} frames due to decode errors")
return len(metadata_rows), topic_counts
def main():
"""命令行入口。"""
ap = argparse.ArgumentParser(
description="从 MCAP 中提取 ROS2 图像帧,并保存逐帧时间戳索引。"
)
ap.add_argument("--mcap", required=True, help="输入 .mcap 文件路径")
ap.add_argument(
"--output",
default="camera_frames",
help="输出目录(保存图片和 frames_index.jsonl)",
)
args = ap.parse_args()
total, counts = extract_frames(
mcap_path=args.mcap,
output_dir=args.output,
)
print(f"\nExported total frames: {total}")
for topic, count in sorted(counts.items()):
print(f"- {topic}: {count}")
print(f"Metadata JSONL: {Path(args.output) / 'frames_index.jsonl'}")
if __name__ == "__main__":
main()
运行示例:
python "extract_camera_frames.py" --mcap "$HOME/Desktop/workspace/rosbag2_2026_03_24-18_35_20/rosbag2_2026_03_24-18_35_20_0.mcap" --output "camera_frames_20260324"输出示例:
Exported total frames: 1298
- /robot/topic/rdap/front_color_sensor_camera: 566
- /robot/topic/rdap/raw_front_left_sensor_camera: 372
- /robot/topic/rdap/raw_front_right_sensor_camera: 360
Metadata JSONL: camera_frames_20260324/frames_index.jsonl