- Authors

- Name
- Youngju Kim
- @fjvbn20031
自動運転AI完全ガイド
自動運転車両は、認識・予測・計画・制御という4つのコアモジュールがリアルタイムで連携する複雑なAIシステムです。本記事では、LiDARによる3D認識からBEVFormerカメラ認識、マルチオブジェクト追跡、HDマップ、エンドツーエンド学習、シミュレーション、機能安全まで、スタック全体を体系的に解説します。
1. 自動運転スタックの概要
自動運転パイプラインは大きく4つのステージで構成されます。
- 認識(Perception): センサーデータから周囲の環境を把握します。LiDAR、カメラ、レーダーなど複数センサーを融合し、3D物体検出と走行可能領域を抽出します。
- 予測(Prediction): 周辺車両や歩行者の将来軌跡を予測します。Transformerベースのモデルが複雑な相互作用をモデリングします。
- 計画(Planning): 目的地への最適経路を生成します。ルールベースと学習ベースの手法が共存します。
- 制御(Control): 加速・制動・操舵コマンドを生成します。PIDコントローラやモデル予測制御(MPC)を活用します。
# 自動運転パイプラインの抽象化例
class AutonomousDrivingStack:
def __init__(self, perception, prediction, planner, controller):
self.perception = perception
self.prediction = prediction
self.planner = planner
self.controller = controller
def step(self, sensor_data):
# 1. 認識
scene = self.perception.process(sensor_data)
# 2. 予測
future_states = self.prediction.forecast(scene)
# 3. 計画
trajectory = self.planner.plan(scene, future_states)
# 4. 制御
commands = self.controller.compute(trajectory)
return commands
2. LiDARポイントクラウドによる3D認識
2.1 PointNet++
PointNet++はポイントクラウドを直接処理する階層型ニューラルネットワークです。Set Abstractionレイヤーで局所構造を反復的に集約します。
import torch
import torch.nn as nn
class SetAbstraction(nn.Module):
"""PointNet++ Set Abstractionレイヤー"""
def __init__(self, npoint, radius, nsample, in_channel, mlp):
super().__init__()
self.npoint = npoint
self.radius = radius
self.nsample = nsample
self.mlp_convs = nn.ModuleList()
self.mlp_bns = nn.ModuleList()
last_channel = in_channel
for out_channel in mlp:
self.mlp_convs.append(nn.Conv2d(last_channel, out_channel, 1))
self.mlp_bns.append(nn.BatchNorm2d(out_channel))
last_channel = out_channel
def forward(self, xyz, points):
# xyz: (B, N, 3), points: (B, C, N)
# FPS + ボールクエリ後にMLPを適用
new_xyz = self.farthest_point_sample(xyz, self.npoint)
grouped = self.ball_query(xyz, new_xyz, points)
for conv, bn in zip(self.mlp_convs, self.mlp_bns):
grouped = torch.relu(bn(conv(grouped)))
new_points = grouped.max(dim=-1)[0]
return new_xyz, new_points
def farthest_point_sample(self, xyz, npoint):
B, N, _ = xyz.shape
centroids = torch.zeros(B, npoint, dtype=torch.long)
distance = torch.ones(B, N) * 1e10
farthest = torch.randint(0, N, (B,))
for i in range(npoint):
centroids[:, i] = farthest
centroid = xyz[torch.arange(B), farthest, :].unsqueeze(1)
dist = ((xyz - centroid) ** 2).sum(-1)
distance = torch.min(distance, dist)
farthest = distance.max(-1)[1]
return xyz[torch.arange(B).unsqueeze(1), centroids]
def ball_query(self, xyz, new_xyz, points):
B, N, C = points.transpose(1, 2).shape
return points.unsqueeze(-1).expand(-1, -1, -1, self.nsample)
2.2 VoxelNet
VoxelNetはポイントクラウドを3Dボクセルに分割し、Voxel Feature Encoding(VFE)レイヤーで特徴を抽出した後、3D畳み込みバックボーンに通します。
class VoxelFeatureEncoder(nn.Module):
"""VoxelNet VFEレイヤー"""
def __init__(self, in_channels=4, out_channels=128):
super().__init__()
self.fc = nn.Sequential(
nn.Linear(in_channels, 64),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Linear(64, out_channels),
nn.BatchNorm1d(out_channels),
nn.ReLU(),
)
def forward(self, voxel_features, num_points):
# voxel_features: (N_voxels, max_points, C)
B, M, C = voxel_features.shape
x = self.fc(voxel_features.view(-1, C))
x = x.view(B, M, -1)
mask = torch.arange(M, device=x.device).unsqueeze(0) < num_points.unsqueeze(1)
x = x * mask.unsqueeze(-1).float()
return x.max(dim=1)[0]
3. BEVFormer: カメラベースBEV認識
BEVFormerは複数カメラ画像をBird's Eye View(BEV)空間に変換して3D物体検出を行います。Deformable AttentionによりBEV空間の3DクエリポイントをカメラImageに投影して特徴を集約します。
import torch
import torch.nn as nn
class BEVFormerLayer(nn.Module):
"""BEVFormer Spatial Cross-Attentionレイヤー"""
def __init__(self, embed_dim=256, num_heads=8, num_cameras=6):
super().__init__()
self.num_cameras = num_cameras
self.embed_dim = embed_dim
self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)
self.ffn = nn.Sequential(
nn.Linear(embed_dim, embed_dim * 4),
nn.GELU(),
nn.Linear(embed_dim * 4, embed_dim),
)
def forward(self, bev_query, camera_features, lidar2img):
"""
bev_query: (B, H*W, C) - BEVグリッドクエリ
camera_features: (B, N_cam, H_img, W_img, C) - カメラ特徴
lidar2img: (B, N_cam, 4, 4) - LiDARから画像への投影行列
"""
B, Q, C = bev_query.shape
ref_3d = self.get_reference_points(B, device=bev_query.device)
projected = self.project_to_image(ref_3d, lidar2img)
sampled = self.sample_camera_features(camera_features, projected)
attn_out, _ = self.attn(bev_query, sampled, sampled)
bev_query = self.norm1(bev_query + attn_out)
bev_query = self.norm2(bev_query + self.ffn(bev_query))
return bev_query
def get_reference_points(self, B, device, H=200, W=200, Z=8):
xs = torch.linspace(0, 1, W, device=device)
ys = torch.linspace(0, 1, H, device=device)
zs = torch.linspace(0, 1, Z, device=device)
grid = torch.stack(torch.meshgrid(xs, ys, zs), dim=-1)
return grid.view(1, -1, 3).expand(B, -1, -1)
def project_to_image(self, pts_3d, lidar2img):
return pts_3d[..., :2]
def sample_camera_features(self, features, coords):
B, Q, C = features.shape[0], coords.shape[1], features.shape[-1]
return torch.zeros(B, Q, C, device=features.device)
4. 3D物体検出とマルチオブジェクト追跡
4.1 CenterPoint
CenterPointはBEVヒートマップから物体中心を検出し、サイズ・方向・速度を回帰します。
class CenterPointHead(nn.Module):
"""CenterPoint検出ヘッド"""
def __init__(self, in_channels=256, num_classes=10):
super().__init__()
self.heatmap = nn.Conv2d(in_channels, num_classes, 1)
self.offset = nn.Conv2d(in_channels, 2, 1) # x, yオフセット
self.height = nn.Conv2d(in_channels, 1, 1) # z
self.size = nn.Conv2d(in_channels, 3, 1) # w, l, h
self.rotation = nn.Conv2d(in_channels, 2, 1) # sin, cos
self.velocity = nn.Conv2d(in_channels, 2, 1) # vx, vy
def forward(self, x):
return {
'heatmap': self.heatmap(x).sigmoid(),
'offset': self.offset(x),
'height': self.height(x),
'size': self.size(x),
'rotation': self.rotation(x),
'velocity': self.velocity(x),
}
4.2 カルマンフィルタによるマルチオブジェクト追跡
import numpy as np
class KalmanTracker:
"""3D物体追跡用カルマンフィルタ"""
def __init__(self, initial_state):
# 状態ベクトル: [x, y, z, vx, vy, vz, w, l, h, theta]
self.dt = 0.1
n = 10
self.x = np.array(initial_state, dtype=float).reshape(-1, 1)
self.P = np.eye(n) * 10.0
# 状態遷移行列 (等速運動モデル)
self.F = np.eye(n)
for i in range(3):
self.F[i, i + 3] = self.dt
# 観測行列 (位置+サイズ+方向のみ観測)
self.H = np.zeros((7, n))
for i in range(7):
self.H[i, i] = 1.0
self.Q = np.eye(n) * 0.1 # プロセスノイズ
self.R = np.eye(7) * 1.0 # 観測ノイズ
def predict(self):
"""予測ステップ"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
return self.x[:7].flatten()
def update(self, z):
"""更新ステップ"""
z = np.array(z).reshape(-1, 1)
y = z - self.H @ self.x # イノベーション
S = self.H @ self.P @ self.H.T + self.R # イノベーション共分散
K = self.P @ self.H.T @ np.linalg.inv(S) # カルマンゲイン
self.x = self.x + K @ y
self.P = (np.eye(len(self.x)) - K @ self.H) @ self.P
return self.x[:7].flatten()
class MOT3D:
"""ハンガリアンアルゴリズムベース3Dマルチオブジェクト追跡"""
def __init__(self, max_age=3, min_hits=3, iou_threshold=0.3):
self.trackers = []
self.max_age = max_age
self.min_hits = min_hits
self.iou_threshold = iou_threshold
self.frame_count = 0
self.next_id = 0
def update(self, detections):
self.frame_count += 1
predictions = [t.predict() for t in self.trackers]
matched, unmatched_dets, unmatched_trks = self.associate(
detections, predictions
)
for d, t in matched:
self.trackers[t].update(detections[d])
for i in unmatched_dets:
trk = KalmanTracker(detections[i])
trk.id = self.next_id
self.next_id += 1
self.trackers.append(trk)
self.trackers = [
t for t in self.trackers
if t.time_since_update <= self.max_age
]
return self.trackers
def associate(self, detections, predictions):
if not predictions:
return [], list(range(len(detections))), []
return [], list(range(len(detections))), []
5. HDマップとSLAM
5.1 NDTマッチング
Normal Distribution Transform(NDT)はポイントクラウドを正規分布で表現し、高速なスキャンマッチングを実現します。
class NDTMatching:
"""自動運転位置推定のためのNDTスキャンマッチング"""
def __init__(self, resolution=1.0):
self.resolution = resolution
self.voxel_map = {}
def build_map(self, points):
"""ポイントクラウドをNDTボクセルマップに変換"""
for pt in points:
key = tuple((pt / self.resolution).astype(int))
if key not in self.voxel_map:
self.voxel_map[key] = []
self.voxel_map[key].append(pt)
for key in self.voxel_map:
pts = np.array(self.voxel_map[key])
mean = pts.mean(axis=0)
cov = np.cov(pts.T) if len(pts) > 3 else np.eye(3)
self.voxel_map[key] = {'mean': mean, 'cov': cov}
def score(self, points, transform):
"""変換行列に対するNDTスコアを計算"""
score = 0.0
R = transform[:3, :3]
t = transform[:3, 3]
for pt in points:
pt_t = R @ pt + t
key = tuple((pt_t / self.resolution).astype(int))
if key in self.voxel_map:
v = self.voxel_map[key]
diff = pt_t - v['mean']
score += np.exp(-0.5 * diff @ np.linalg.inv(v['cov']) @ diff)
return score
5.2 OpenDRIVEパーシング
import xml.etree.ElementTree as ET
def parse_opendrive(xodr_path):
"""OpenDRIVE HDマップのパーシング"""
tree = ET.parse(xodr_path)
root = tree.getroot()
roads = []
for road in root.findall('road'):
road_data = {
'id': road.get('id'),
'length': float(road.get('length')),
'lanes': [],
}
for lane_section in road.findall('.//laneSection'):
for lane in lane_section.findall('.//lane'):
lane_data = {
'id': int(lane.get('id')),
'type': lane.get('type'),
'speed': None,
}
speed = lane.find('.//speed')
if speed is not None:
lane_data['speed'] = float(speed.get('max', 0))
road_data['lanes'].append(lane_data)
roads.append(road_data)
return roads
6. 行動予測: Transformerベースの軌跡予測
class TrajectoryPredictor(nn.Module):
"""Transformerベースのマルチエージェント軌跡予測"""
def __init__(self, input_dim=6, embed_dim=128, num_heads=4,
num_layers=3, pred_horizon=30, num_modes=6):
super().__init__()
self.input_proj = nn.Linear(input_dim, embed_dim)
encoder_layer = nn.TransformerEncoderLayer(
d_model=embed_dim, nhead=num_heads, batch_first=True
)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
self.mode_query = nn.Embedding(num_modes, embed_dim)
decoder_layer = nn.TransformerDecoderLayer(
d_model=embed_dim, nhead=num_heads, batch_first=True
)
self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
self.traj_head = nn.Linear(embed_dim, pred_horizon * 2)
self.prob_head = nn.Linear(embed_dim, 1)
self.num_modes = num_modes
def forward(self, agent_history, map_features=None):
"""
agent_history: (B, N_agents, T_hist, 6) - x, y, vx, vy, ax, ay
戻り値: trajectories (B, N_agents, K, T_pred, 2), probs (B, N_agents, K)
"""
B, N, T, C = agent_history.shape
x = self.input_proj(agent_history.view(B * N, T, C))
memory = self.encoder(x)
queries = self.mode_query.weight.unsqueeze(0).expand(B * N, -1, -1)
decoded = self.decoder(queries, memory)
trajs = self.traj_head(decoded)
probs = self.prob_head(decoded).squeeze(-1).softmax(-1)
pred_horizon = trajs.shape[-1] // 2
trajs = trajs.view(B, N, self.num_modes, pred_horizon, 2)
probs = probs.view(B, N, self.num_modes)
return trajs, probs
7. エンドツーエンド自動運転
7.1 CARLAシミュレーター連携
import carla
import numpy as np
def setup_carla_sensors(world, vehicle):
"""CARLAセンサー設定: カメラ + LiDAR"""
bp_lib = world.get_blueprint_library()
# RGBカメラ
cam_bp = bp_lib.find('sensor.camera.rgb')
cam_bp.set_attribute('image_size_x', '1280')
cam_bp.set_attribute('image_size_y', '720')
cam_bp.set_attribute('fov', '90')
cam_transform = carla.Transform(carla.Location(x=1.5, z=2.4))
camera = world.spawn_actor(cam_bp, cam_transform, attach_to=vehicle)
# LiDAR
lidar_bp = bp_lib.find('sensor.lidar.ray_cast')
lidar_bp.set_attribute('channels', '64')
lidar_bp.set_attribute('range', '100')
lidar_bp.set_attribute('points_per_second', '1200000')
lidar_bp.set_attribute('rotation_frequency', '20')
lidar_transform = carla.Transform(carla.Location(x=0.0, z=2.0))
lidar = world.spawn_actor(lidar_bp, lidar_transform, attach_to=vehicle)
return camera, lidar
def domain_randomization(world):
"""ドメインランダム化: sim-to-realギャップを削減"""
import random
weather = carla.WeatherParameters(
cloudiness=random.uniform(0, 80),
precipitation=random.uniform(0, 30),
sun_altitude_angle=random.uniform(15, 90),
fog_density=random.uniform(0, 20),
wetness=random.uniform(0, 50),
)
world.set_weather(weather)
7.2 模倣学習 (Behavior Cloning)
class BehaviorCloningAgent(nn.Module):
"""模倣学習ベースの自動運転エージェント"""
def __init__(self, img_channels=3, cmd_dim=4, action_dim=2):
super().__init__()
import torchvision.models as models
backbone = models.resnet34(pretrained=False)
self.visual_encoder = nn.Sequential(*list(backbone.children())[:-2])
self.pool = nn.AdaptiveAvgPool2d(1)
self.cmd_embed = nn.Embedding(cmd_dim, 64) # コマンド: 直進/左/右/Uターン
self.policy = nn.Sequential(
nn.Linear(512 + 64, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, action_dim), # steer, throttle
nn.Tanh(),
)
def forward(self, image, command):
feat = self.pool(self.visual_encoder(image)).flatten(1)
cmd_feat = self.cmd_embed(command)
x = torch.cat([feat, cmd_feat], dim=-1)
return self.policy(x)
def dagger_data_aggregation(expert_data, agent_rollouts):
"""DAgger: 分布シフト緩和のためのデータ集約"""
# エージェントが実際に訪問した状態でエキスパートラベルを収集
combined = {
'obs': torch.cat([expert_data['obs'], agent_rollouts['obs']]),
'actions': torch.cat([expert_data['actions'], agent_rollouts['expert_actions']]),
}
return combined
8. nuPlanシナリオ評価
from nuplan.planning.simulation.planner.abstract_planner import AbstractPlanner
class ReactiveIDMPlanner(AbstractPlanner):
"""nuPlan互換IDMプランナー"""
def __init__(self, target_velocity=15.0, min_gap=2.0):
self.target_velocity = target_velocity
self.min_gap = min_gap
def initialize(self, initialization):
self.initialization = initialization
def name(self):
return 'ReactiveIDMPlanner'
def observation_type(self):
from nuplan.planning.simulation.observation.observation_type import DetectionsTracks
return DetectionsTracks
def compute_planner_trajectory(self, current_input):
ego_state = current_input.history.ego_states[-1]
observations = current_input.observations
accel = self.idm_accel(ego_state, observations)
return self.build_trajectory(ego_state, accel)
def idm_accel(self, ego, obs):
v = ego.dynamic_car_state.speed
delta = 4.0
a_max = 2.0
s_star = self.min_gap + v * 1.5
return a_max * (1 - (v / self.target_velocity) ** delta
- (s_star / max(self.min_gap, 1.0)) ** 2)
def build_trajectory(self, ego_state, accel):
return []
9. 機能安全: ISO 26262とSOTIF
自動運転システムはISO 26262(機能安全)とSOTIF(ISO 21448、意図された機能の安全)の両方に準拠する必要があります。
- ASIL分類: 自動運転のコア機能はASIL-Dレベルを要求します。ブレーキ制御・操舵制御が該当します。
- ハードウェア冗長化: 重要なセンサーと演算モジュールを二重化し、単一障害時に安全状態へ移行します。
- ランタイムモニタリング: 認識モデルの不確実性をリアルタイムで測定し、システムの性能低下を検知します。
- エッジケース処理: 逆光、濃霧、逆走車両などODD(Operational Design Domain)境界を定義し、ODD逸脱時に安全停止手順を実行します。
class SafetyMonitor:
"""ランタイム安全モニター"""
def __init__(self, uncertainty_threshold=0.7, min_detection_confidence=0.5):
self.uncertainty_threshold = uncertainty_threshold
self.min_conf = min_detection_confidence
self.fallback_triggered = False
def check(self, perception_output):
uncertainty = perception_output.get('uncertainty', 0.0)
confidence = perception_output.get('confidence', 1.0)
if uncertainty > self.uncertainty_threshold:
self.trigger_fallback('high_uncertainty')
return False
if confidence < self.min_conf:
self.trigger_fallback('low_confidence')
return False
return True
def trigger_fallback(self, reason):
self.fallback_triggered = True
# MRC(Minimal Risk Condition): 安全停止を実行
print(f'[Safety] Fallback triggered: {reason}')
クイズ
Q1. Bird's Eye View(BEV)表現が自動運転認識で好まれる理由は何ですか?
答え: BEVは遠近歪みなしに3D空間を2D平面で表現し、実際の位置とスケールを保持するためです。
解説: カメラの透視投影画像では距離によって物体サイズが変化し、道路構造の幾何学的関係の把握が困難です。BEVに変換すると全ての物体が実際のサイズと位置に近い形で配置され、経路計画に直接活用できます。またLiDAR・レーダー・カメラなど複数センサーの出力を統一された空間に融合しやすく、下流モジュール(予測・計画)とのインターフェースもシンプルになります。
Q2. センサー融合においてLiDARとカメラの相互補完的な特性は何ですか?
答え: LiDARは正確な3D距離情報を、カメラは豊富な色・テクスチャによる意味情報を提供します。
解説: LiDARは精密なポイントクラウドで距離と形状を測定しますが、分類に必要な外観情報が不足しています。カメラは高解像度画像でナンバープレート、信号機の色、標識を認識できますが、深度推定に限界があります。両センサーを組み合わせることで、カメラが意味ラベルを付与し、LiDARが3D位置を正確に確保できます。悪天候でカメラ性能が低下する際にLiDARが補助し、再帰反射面でLiDARが飽和する際にカメラが補完します。
Q3. カルマンフィルタがマルチオブジェクト追跡(MOT)で予測と更新を繰り返す仕組みは?
答え: 毎フレーム状態遷移モデルで位置を予測し、新しい検出結果で誤差を補正します。
解説: 予測ステップでは、前の状態(位置・速度)と状態遷移行列(等速運動モデル)を用いて現在フレームの位置を推定し、共分散を伝播します。更新ステップでは、新しい検出結果(観測値)と予測値の差(イノベーション)をカルマンゲインで重み付けして状態を補正します。カルマンゲインは予測不確実性と観測不確実性の比率で決まり、観測が正確なほど予測よりも観測を信頼します。検出失敗フレームでは予測ステップのみ実行して物体状態を維持します。
Q4. CARLAシミュレーターでのドメインランダム化がsim-to-realギャップを縮小する原理は?
答え: シミュレーションパラメータをランダム化することで、モデルが特定のシミュレーション環境に過学習せず、多様な視覚条件に汎化するよう学習されます。
解説: 実際の道路では天気・照明・車両色・路面反射率が非常に多様です。CARLAで天気(雨・霧・太陽高度)・テクスチャ・交通量・照明強度などをランダムに変更しながら学習すると、モデルは特定のシミュレーション環境の視覚的バイアスに過学習しません。これにより、実環境で遭遇する様々な条件がすでに学習分布に含まれ、性能低下が軽減されます。カメラ内部パラメータやセンサーノイズモデルもランダム化するとさらに効果的です。
Q5. エンドツーエンド自動運転における模倣学習の分布シフト問題と解決方法は?
答え: 学習中に見ていない状態にモデルが陥ると誤差が累積します。DAggerやデータ拡張で軽減します。
解説: 行動複製(Behavior Cloning)はエキスパート軌跡から収集した(状態, 行動)ペアで学習します。しかし推論時に微小な誤差が累積するとモデルが学習分布外の状態に到達し、その状態での正しい行動を学んでいないため誤差が雪だるま式に拡大します。DAgger(Dataset Aggregation)はエージェントが実際に訪問する状態でエキスパートラベルを反復収集し学習データを拡張します。さらにシミュレーターで意図的に回復シナリオ(車線逸脱後の復帰)を生成したり、観測にノイズを加えるデータ拡張手法も効果的です。
参考資料
- BEVFormer: Learning Bird's Eye View Representation (Li et al., 2022)
- CenterPoint: Center-based 3D Object Detection (Yin et al., 2021)
- PointNet++: Deep Hierarchical Feature Learning (Qi et al., 2017)
- UniAD: Planning-oriented Autonomous Driving (Hu et al., 2023)
- nuPlan: A closed-loop ML-based planning benchmark (Caesar et al., 2021)
- ISO 26262: Road Vehicles – Functional Safety
- CARLA: An Open Urban Driving Simulator (Dosovitskiy et al., 2017)