Split View: 자율주행 AI 완전 정복: 3D 인식, BEVFormer, 엔드-투-엔드 학습까지
자율주행 AI 완전 정복: 3D 인식, BEVFormer, 엔드-투-엔드 학습까지
자율주행 AI 완전 가이드
자율주행 차량은 인지, 예측, 계획, 제어라는 네 가지 핵심 모듈이 실시간으로 연동되는 복잡한 AI 시스템입니다. 이 글에서는 LiDAR 기반 3D 인식부터 BEVFormer 카메라 인식, 다중 객체 추적, HD 지도, 엔드-투-엔드 학습, 시뮬레이션, 기능 안전까지 전체 스택을 체계적으로 다룹니다.
1. 자율주행 스택 개요
자율주행 파이프라인은 크게 네 단계로 구성됩니다.
- 인식(Perception): 센서 데이터에서 주변 환경을 파악합니다. LiDAR, 카메라, 레이더 등 다중 센서를 융합하여 3D 객체 탐지 및 주행 가능 영역을 추출합니다.
- 예측(Prediction): 주변 차량, 보행자의 미래 궤적을 예측합니다. Transformer 기반 모델이 복잡한 상호작용을 모델링합니다.
- 계획(Planning): 목적지까지 최적 경로를 생성합니다. 규칙 기반과 학습 기반 방법이 공존합니다.
- 제어(Control): 가속, 제동, 조향 명령을 생성합니다. PID 컨트롤러나 모델 예측 제어(MPC)를 활용합니다.
# 자율주행 파이프라인 추상화 예시
class AutonomousDrivingStack:
def __init__(self, perception, prediction, planner, controller):
self.perception = perception
self.prediction = prediction
self.planner = planner
self.controller = controller
def step(self, sensor_data):
# 1. 인식
scene = self.perception.process(sensor_data)
# 2. 예측
future_states = self.prediction.forecast(scene)
# 3. 계획
trajectory = self.planner.plan(scene, future_states)
# 4. 제어
commands = self.controller.compute(trajectory)
return commands
2. LiDAR 포인트 클라우드 3D 인식
2.1 PointNet++
PointNet++는 포인트 클라우드를 직접 처리하는 계층적 신경망입니다. Set Abstraction 레이어로 지역 구조를 반복적으로 집약합니다.
import torch
import torch.nn as nn
class SetAbstraction(nn.Module):
"""PointNet++ Set Abstraction 레이어"""
def __init__(self, npoint, radius, nsample, in_channel, mlp):
super().__init__()
self.npoint = npoint
self.radius = radius
self.nsample = nsample
self.mlp_convs = nn.ModuleList()
self.mlp_bns = nn.ModuleList()
last_channel = in_channel
for out_channel in mlp:
self.mlp_convs.append(nn.Conv2d(last_channel, out_channel, 1))
self.mlp_bns.append(nn.BatchNorm2d(out_channel))
last_channel = out_channel
def forward(self, xyz, points):
# xyz: (B, N, 3), points: (B, C, N)
# farthest point sampling + ball query 후 MLP 적용
new_xyz = self.farthest_point_sample(xyz, self.npoint)
grouped = self.ball_query(xyz, new_xyz, points)
for conv, bn in zip(self.mlp_convs, self.mlp_bns):
grouped = torch.relu(bn(conv(grouped)))
new_points = grouped.max(dim=-1)[0]
return new_xyz, new_points
def farthest_point_sample(self, xyz, npoint):
# FPS 구현 (간략화)
B, N, _ = xyz.shape
centroids = torch.zeros(B, npoint, dtype=torch.long)
distance = torch.ones(B, N) * 1e10
farthest = torch.randint(0, N, (B,))
for i in range(npoint):
centroids[:, i] = farthest
centroid = xyz[torch.arange(B), farthest, :].unsqueeze(1)
dist = ((xyz - centroid) ** 2).sum(-1)
distance = torch.min(distance, dist)
farthest = distance.max(-1)[1]
return xyz[torch.arange(B).unsqueeze(1), centroids]
def ball_query(self, xyz, new_xyz, points):
# 반경 내 이웃 포인트 그룹화 (간략화)
B, N, C = points.transpose(1, 2).shape
return points.unsqueeze(-1).expand(-1, -1, -1, self.nsample)
2.2 VoxelNet
VoxelNet은 포인트 클라우드를 3D 복셀로 분할하고 Voxel Feature Encoding(VFE) 레이어로 특징을 추출한 후 3D Convolutional Backbone을 통과시킵니다.
class VoxelFeatureEncoder(nn.Module):
"""VoxelNet VFE 레이어"""
def __init__(self, in_channels=4, out_channels=128):
super().__init__()
self.fc = nn.Sequential(
nn.Linear(in_channels, 64),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Linear(64, out_channels),
nn.BatchNorm1d(out_channels),
nn.ReLU(),
)
def forward(self, voxel_features, num_points):
# voxel_features: (N_voxels, max_points, C)
B, M, C = voxel_features.shape
x = self.fc(voxel_features.view(-1, C))
x = x.view(B, M, -1)
# 유효 포인트만 마스킹 후 max pooling
mask = torch.arange(M, device=x.device).unsqueeze(0) < num_points.unsqueeze(1)
x = x * mask.unsqueeze(-1).float()
return x.max(dim=1)[0]
3. BEVFormer: 카메라 기반 BEV 인식
BEVFormer는 다중 카메라 이미지를 Bird's Eye View(BEV) 공간으로 변환하여 3D 객체 탐지를 수행합니다. Deformable Attention으로 3D 공간의 쿼리 포인트를 이미지 평면에 투영하여 특징을 집약합니다.
import torch
import torch.nn as nn
class BEVFormerLayer(nn.Module):
"""BEVFormer Spatial Cross-Attention 레이어"""
def __init__(self, embed_dim=256, num_heads=8, num_cameras=6):
super().__init__()
self.num_cameras = num_cameras
self.embed_dim = embed_dim
self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)
self.ffn = nn.Sequential(
nn.Linear(embed_dim, embed_dim * 4),
nn.GELU(),
nn.Linear(embed_dim * 4, embed_dim),
)
def forward(self, bev_query, camera_features, lidar2img):
"""
bev_query: (B, H*W, C) - BEV 격자 쿼리
camera_features: (B, N_cam, H_img, W_img, C) - 카메라 특징
lidar2img: (B, N_cam, 4, 4) - LiDAR to 이미지 투영 행렬
"""
B, Q, C = bev_query.shape
# BEV 쿼리를 3D 레퍼런스 포인트로 변환 후 각 카메라에 투영
ref_3d = self.get_reference_points(B, device=bev_query.device)
projected = self.project_to_image(ref_3d, lidar2img) # (B, Q, N_cam, 2)
# 각 카메라 특징을 샘플링하여 집약
sampled = self.sample_camera_features(camera_features, projected)
# Cross-attention
attn_out, _ = self.attn(bev_query, sampled, sampled)
bev_query = self.norm1(bev_query + attn_out)
bev_query = self.norm2(bev_query + self.ffn(bev_query))
return bev_query
def get_reference_points(self, B, device, H=200, W=200, Z=8):
xs = torch.linspace(0, 1, W, device=device)
ys = torch.linspace(0, 1, H, device=device)
zs = torch.linspace(0, 1, Z, device=device)
grid = torch.stack(torch.meshgrid(xs, ys, zs), dim=-1)
return grid.view(1, -1, 3).expand(B, -1, -1)
def project_to_image(self, pts_3d, lidar2img):
# 3D 포인트를 이미지 좌표로 투영 (간략화)
return pts_3d[..., :2]
def sample_camera_features(self, features, coords):
# grid_sample로 특징 샘플링 (간략화)
B, Q, C = features.shape[0], coords.shape[1], features.shape[-1]
return torch.zeros(B, Q, C, device=features.device)
4. 3D 객체 탐지 및 다중 객체 추적
4.1 CenterPoint
CenterPoint는 BEV 히트맵에서 객체 중심을 탐지하고 크기, 방향, 속도를 회귀합니다.
class CenterPointHead(nn.Module):
"""CenterPoint 탐지 헤드"""
def __init__(self, in_channels=256, num_classes=10):
super().__init__()
self.heatmap = nn.Conv2d(in_channels, num_classes, 1)
self.offset = nn.Conv2d(in_channels, 2, 1) # x, y 오프셋
self.height = nn.Conv2d(in_channels, 1, 1) # z
self.size = nn.Conv2d(in_channels, 3, 1) # w, l, h
self.rotation = nn.Conv2d(in_channels, 2, 1) # sin, cos
self.velocity = nn.Conv2d(in_channels, 2, 1) # vx, vy
def forward(self, x):
return {
'heatmap': self.heatmap(x).sigmoid(),
'offset': self.offset(x),
'height': self.height(x),
'size': self.size(x),
'rotation': self.rotation(x),
'velocity': self.velocity(x),
}
4.2 칼만 필터 기반 다중 객체 추적
import numpy as np
class KalmanTracker:
"""3D 객체 추적을 위한 칼만 필터"""
def __init__(self, initial_state):
# 상태 벡터: [x, y, z, vx, vy, vz, w, l, h, theta]
self.dt = 0.1
n = 10
self.x = np.array(initial_state, dtype=float).reshape(-1, 1)
self.P = np.eye(n) * 10.0 # 초기 공분산
# 상태 전이 행렬 (등속 운동 모델)
self.F = np.eye(n)
for i in range(3):
self.F[i, i + 3] = self.dt
# 관측 행렬 (위치+크기+방향만 관측)
self.H = np.zeros((7, n))
for i in range(7):
self.H[i, i] = 1.0
self.Q = np.eye(n) * 0.1 # 프로세스 노이즈
self.R = np.eye(7) * 1.0 # 관측 노이즈
def predict(self):
"""예측 단계"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
return self.x[:7].flatten()
def update(self, z):
"""업데이트 단계"""
z = np.array(z).reshape(-1, 1)
y = z - self.H @ self.x # 혁신
S = self.H @ self.P @ self.H.T + self.R # 혁신 공분산
K = self.P @ self.H.T @ np.linalg.inv(S) # 칼만 이득
self.x = self.x + K @ y
self.P = (np.eye(len(self.x)) - K @ self.H) @ self.P
return self.x[:7].flatten()
class MOT3D:
"""헝가리안 알고리즘 기반 3D 다중 객체 추적"""
def __init__(self, max_age=3, min_hits=3, iou_threshold=0.3):
self.trackers = []
self.max_age = max_age
self.min_hits = min_hits
self.iou_threshold = iou_threshold
self.frame_count = 0
self.next_id = 0
def update(self, detections):
self.frame_count += 1
predictions = [t.predict() for t in self.trackers]
matched, unmatched_dets, unmatched_trks = self.associate(
detections, predictions
)
for d, t in matched:
self.trackers[t].update(detections[d])
for i in unmatched_dets:
trk = KalmanTracker(detections[i])
trk.id = self.next_id
self.next_id += 1
self.trackers.append(trk)
self.trackers = [
t for t in self.trackers
if t.time_since_update <= self.max_age
]
return self.trackers
def associate(self, detections, predictions):
# IoU 기반 헝가리안 매칭 (간략화)
if not predictions:
return [], list(range(len(detections))), []
return [], list(range(len(detections))), []
5. HD 지도와 SLAM
5.1 NDT Matching
Normal Distribution Transform(NDT)은 포인트 클라우드를 정규분포로 표현하여 빠른 스캔 매칭을 수행합니다.
class NDTMatching:
"""NDT 스캔 매칭 (자율주행 위치 추정)"""
def __init__(self, resolution=1.0):
self.resolution = resolution
self.voxel_map = {}
def build_map(self, points):
"""포인트 클라우드를 NDT 복셀 맵으로 변환"""
for pt in points:
key = tuple((pt / self.resolution).astype(int))
if key not in self.voxel_map:
self.voxel_map[key] = []
self.voxel_map[key].append(pt)
for key in self.voxel_map:
pts = np.array(self.voxel_map[key])
mean = pts.mean(axis=0)
cov = np.cov(pts.T) if len(pts) > 3 else np.eye(3)
self.voxel_map[key] = {'mean': mean, 'cov': cov}
def score(self, points, transform):
"""변환 행렬에 대한 NDT 스코어 계산"""
score = 0.0
R = transform[:3, :3]
t = transform[:3, 3]
for pt in points:
pt_transformed = R @ pt + t
key = tuple((pt_transformed / self.resolution).astype(int))
if key in self.voxel_map:
voxel = self.voxel_map[key]
diff = pt_transformed - voxel['mean']
score += np.exp(-0.5 * diff @ np.linalg.inv(voxel['cov']) @ diff)
return score
5.2 OpenDRIVE 파싱
import xml.etree.ElementTree as ET
def parse_opendrive(xodr_path):
"""OpenDRIVE HD 지도 파싱"""
tree = ET.parse(xodr_path)
root = tree.getroot()
roads = []
for road in root.findall('road'):
road_data = {
'id': road.get('id'),
'length': float(road.get('length')),
'lanes': [],
}
for lane_section in road.findall('.//laneSection'):
for lane in lane_section.findall('.//lane'):
lane_data = {
'id': int(lane.get('id')),
'type': lane.get('type'),
'speed': None,
}
speed = lane.find('.//speed')
if speed is not None:
lane_data['speed'] = float(speed.get('max', 0))
road_data['lanes'].append(lane_data)
roads.append(road_data)
return roads
6. 행동 예측: Transformer 기반 궤적 예측
class TrajectoryPredictor(nn.Module):
"""Transformer 기반 다중 에이전트 궤적 예측"""
def __init__(self, input_dim=6, embed_dim=128, num_heads=4,
num_layers=3, pred_horizon=30, num_modes=6):
super().__init__()
self.input_proj = nn.Linear(input_dim, embed_dim)
encoder_layer = nn.TransformerEncoderLayer(
d_model=embed_dim, nhead=num_heads, batch_first=True
)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
self.mode_query = nn.Embedding(num_modes, embed_dim)
decoder_layer = nn.TransformerDecoderLayer(
d_model=embed_dim, nhead=num_heads, batch_first=True
)
self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
self.traj_head = nn.Linear(embed_dim, pred_horizon * 2)
self.prob_head = nn.Linear(embed_dim, 1)
self.num_modes = num_modes
def forward(self, agent_history, map_features=None):
"""
agent_history: (B, N_agents, T_hist, 6) - x,y,vx,vy,ax,ay
반환: trajectories (B, N_agents, K, T_pred, 2), probs (B, N_agents, K)
"""
B, N, T, C = agent_history.shape
x = self.input_proj(agent_history.view(B * N, T, C))
memory = self.encoder(x) # (B*N, T, embed_dim)
queries = self.mode_query.weight.unsqueeze(0).expand(B * N, -1, -1)
decoded = self.decoder(queries, memory) # (B*N, K, embed_dim)
trajs = self.traj_head(decoded) # (B*N, K, T_pred*2)
probs = self.prob_head(decoded).squeeze(-1).softmax(-1)
pred_horizon = trajs.shape[-1] // 2
trajs = trajs.view(B, N, self.num_modes, pred_horizon, 2)
probs = probs.view(B, N, self.num_modes)
return trajs, probs
7. 엔드-투-엔드 자율주행
7.1 CARLA 시뮬레이터 연동
import carla
import numpy as np
def setup_carla_sensors(world, vehicle):
"""CARLA 센서 설정: 카메라 + LiDAR"""
bp_lib = world.get_blueprint_library()
# RGB 카메라
cam_bp = bp_lib.find('sensor.camera.rgb')
cam_bp.set_attribute('image_size_x', '1280')
cam_bp.set_attribute('image_size_y', '720')
cam_bp.set_attribute('fov', '90')
cam_transform = carla.Transform(carla.Location(x=1.5, z=2.4))
camera = world.spawn_actor(cam_bp, cam_transform, attach_to=vehicle)
# LiDAR
lidar_bp = bp_lib.find('sensor.lidar.ray_cast')
lidar_bp.set_attribute('channels', '64')
lidar_bp.set_attribute('range', '100')
lidar_bp.set_attribute('points_per_second', '1200000')
lidar_bp.set_attribute('rotation_frequency', '20')
lidar_transform = carla.Transform(carla.Location(x=0.0, z=2.0))
lidar = world.spawn_actor(lidar_bp, lidar_transform, attach_to=vehicle)
return camera, lidar
def domain_randomization(world, weather_presets=None):
"""도메인 랜덤화: sim-to-real gap 감소"""
import random
weather = carla.WeatherParameters(
cloudiness=random.uniform(0, 80),
precipitation=random.uniform(0, 30),
sun_altitude_angle=random.uniform(15, 90),
fog_density=random.uniform(0, 20),
wetness=random.uniform(0, 50),
)
world.set_weather(weather)
7.2 모방 학습 (Behavior Cloning)
class BehaviorCloningAgent(nn.Module):
"""모방 학습 기반 자율주행 에이전트"""
def __init__(self, img_channels=3, cmd_dim=4, action_dim=2):
super().__init__()
import torchvision.models as models
backbone = models.resnet34(pretrained=False)
self.visual_encoder = nn.Sequential(*list(backbone.children())[:-2])
self.pool = nn.AdaptiveAvgPool2d(1)
self.cmd_embed = nn.Embedding(cmd_dim, 64) # 명령: 직진/좌/우/유턴
self.policy = nn.Sequential(
nn.Linear(512 + 64, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, action_dim), # steer, throttle
nn.Tanh(),
)
def forward(self, image, command):
feat = self.pool(self.visual_encoder(image)).flatten(1)
cmd_feat = self.cmd_embed(command)
x = torch.cat([feat, cmd_feat], dim=-1)
return self.policy(x)
def dagger_data_augmentation(expert_data, agent_rollouts):
"""DAgger: distributional shift 완화를 위한 데이터 집계"""
# 에이전트 방문 상태에서 전문가 레이블 수집
combined = {
'obs': torch.cat([expert_data['obs'], agent_rollouts['obs']]),
'actions': torch.cat([expert_data['actions'], agent_rollouts['expert_actions']]),
}
return combined
8. nuPlan 시나리오 평가
from nuplan.planning.simulation.planner.abstract_planner import AbstractPlanner
class ReactiveIDMPlanner(AbstractPlanner):
"""nuPlan 호환 IDM 플래너"""
def __init__(self, target_velocity=15.0, min_gap=2.0):
self.target_velocity = target_velocity
self.min_gap = min_gap
def initialize(self, initialization):
self.initialization = initialization
def name(self):
return 'ReactiveIDMPlanner'
def observation_type(self):
from nuplan.planning.simulation.observation.observation_type import DetectionsTracks
return DetectionsTracks
def compute_planner_trajectory(self, current_input):
ego_state = current_input.history.ego_states[-1]
observations = current_input.observations
# IDM 모델로 종방향 가속도 계산
accel = self.idm_accel(ego_state, observations)
return self.build_trajectory(ego_state, accel)
def idm_accel(self, ego, obs):
v = ego.dynamic_car_state.speed
delta = 4.0 # 가속 지수
a_max = 2.0
s_star = self.min_gap + v * 1.5
return a_max * (1 - (v / self.target_velocity) ** delta
- (s_star / max(self.min_gap, 1.0)) ** 2)
def build_trajectory(self, ego_state, accel):
return []
9. 기능 안전: ISO 26262와 SOTIF
자율주행 시스템은 ISO 26262(기능 안전)와 SOTIF(ISO 21448, 의도된 기능의 안전)를 모두 준수해야 합니다.
- ASIL 분류: 자율주행의 핵심 기능은 ASIL-D 수준을 요구합니다. 브레이크 제어, 조향 제어가 해당됩니다.
- 하드웨어 이중화: 핵심 센서와 컴퓨팅 모듈을 이중화하여 단일 장애 시 안전 상태로 전환합니다.
- 런타임 모니터링: 인식 모델의 불확실성(uncertainty)을 실시간 측정하여 시스템 성능 저하를 감지합니다.
- 엣지 케이스 처리: 표준 시나리오 외에 역광, 안개, 도로 이탈 차량 등 ODD(Operational Design Domain) 경계를 정의하고 안전 정지 절차를 수행합니다.
class SafetyMonitor:
"""런타임 안전 모니터"""
def __init__(self, uncertainty_threshold=0.7, min_detection_confidence=0.5):
self.uncertainty_threshold = uncertainty_threshold
self.min_conf = min_detection_confidence
self.fallback_triggered = False
def check(self, perception_output):
uncertainty = perception_output.get('uncertainty', 0.0)
confidence = perception_output.get('confidence', 1.0)
if uncertainty > self.uncertainty_threshold:
self.trigger_fallback('high_uncertainty')
return False
if confidence < self.min_conf:
self.trigger_fallback('low_confidence')
return False
return True
def trigger_fallback(self, reason):
self.fallback_triggered = True
# MRC(Minimal Risk Condition): 안전 정지 실행
print(f'[Safety] Fallback triggered: {reason}')
퀴즈
Q1. Bird's Eye View(BEV) 표현이 자율주행 인식에서 선호되는 이유는 무엇인가요?
정답: BEV는 원근 왜곡 없이 3D 공간을 2D 평면으로 표현하기 때문입니다.
설명: 카메라의 원근 이미지에서는 거리에 따라 객체 크기가 달라지고, 차선이나 도로 구조의 기하학적 관계 파악이 어렵습니다. BEV로 변환하면 모든 객체가 실제 크기와 위치에 가깝게 배치되어 경로 계획에 바로 활용할 수 있습니다. 또한 LiDAR, 레이더, 카메라 등 다중 센서 출력을 하나의 통일된 공간으로 융합하기 용이하고, 다운스트림 모듈(예측, 계획)과의 인터페이스가 단순해집니다.
Q2. 센서 퓨전에서 LiDAR와 카메라의 상호 보완적 특성은 무엇인가요?
정답: LiDAR는 정확한 3D 거리 정보를, 카메라는 풍부한 색상/텍스처 정보를 제공합니다.
설명: LiDAR는 정밀한 포인트 클라우드로 거리와 형상을 측정하지만 객체 분류에 필요한 외형 정보가 부족합니다. 카메라는 고해상도 이미지로 차량 번호판, 신호등 색상, 도로 표지판을 인식하지만 깊이 추정에 한계가 있습니다. 두 센서를 결합하면 카메라로 의미론적 레이블을 부여하고 LiDAR로 3D 위치를 정확히 확보할 수 있습니다. 악천후에서 카메라 성능이 저하될 때 LiDAR가 보조하고, 역반사 표면에서 LiDAR가 포화될 때 카메라가 보완합니다.
Q3. 칼만 필터가 다중 객체 추적(MOT)에서 예측과 업데이트를 반복하는 방식은 어떻게 되나요?
정답: 매 프레임마다 상태 전이 모델로 위치를 예측하고, 새로운 탐지 결과로 오차를 보정합니다.
설명: 예측 단계에서는 이전 상태(위치, 속도)와 상태 전이 행렬(등속 운동 모델)을 이용해 현재 프레임의 위치를 추정하고 공분산을 전파합니다. 업데이트 단계에서는 새로운 탐지 결과(관측값)와 예측값의 차이(혁신)를 칼만 이득으로 가중하여 상태를 보정합니다. 칼만 이득은 예측 불확실성과 관측 불확실성의 비율로 결정되어, 관측이 정확할수록 예측보다 관측을 더 신뢰합니다. 탐지 실패 프레임에서는 예측 단계만 수행하여 객체 상태를 유지합니다.
Q4. CARLA 시뮬레이터에서 도메인 랜덤화(domain randomization)가 sim-to-real gap을 줄이는 원리는?
정답: 시뮬레이션 파라미터를 무작위로 변화시켜 모델이 다양한 시각적 조건에 강건하게 학습하도록 합니다.
설명: 실제 도로에서는 날씨, 조명, 차량 색상, 도로 표면 반사율이 매우 다양합니다. CARLA에서 날씨(비, 안개, 태양 고도), 텍스처, 교통량, 조명 강도 등을 무작위로 변경하며 학습하면 모델은 특정 시뮬레이션 환경의 시각적 편향에 과적합되지 않습니다. 이를 통해 실제 환경에서 마주치는 다양한 조건들이 이미 학습 분포에 포함되어 있어 성능 저하가 줄어듭니다. 추가로 카메라 내재 파라미터, 센서 노이즈 모델도 랜덤화하면 더욱 효과적입니다.
Q5. 엔드-투-엔드 자율주행에서 모방 학습의 distributional shift 문제와 해결 방법은?
정답: 훈련 중 보지 못한 상태에 모델이 빠지면 오류가 누적됩니다. DAgger나 데이터 증강으로 완화합니다.
설명: 행동 복제(Behavior Cloning)는 전문가 궤적에서 수집한 (상태, 행동) 쌍으로 학습합니다. 하지만 추론 시 미세한 오류가 누적되면 모델이 훈련 분포를 벗어난 상태에 도달하고, 이 상태에서의 올바른 행동을 배우지 못했기 때문에 오류가 눈덩이처럼 커집니다. DAgger(Dataset Aggregation)는 에이전트가 실제로 방문하는 상태에서 전문가 레이블을 반복적으로 수집하여 훈련 데이터를 확장합니다. 추가로 시뮬레이터에서 의도적으로 복구 시나리오(차선 이탈 후 복귀)를 생성하거나 관측에 노이즈를 추가하는 데이터 증강 방법도 효과적입니다.
참고 자료
- BEVFormer: Learning Bird's Eye View Representation (Li et al., 2022)
- CenterPoint: Center-based 3D Object Detection (Yin et al., 2021)
- PointNet++: Deep Hierarchical Feature Learning (Qi et al., 2017)
- UniAD: Planning-oriented Autonomous Driving (Hu et al., 2023)
- nuPlan: A closed-loop ML-based planning benchmark (Caesar et al., 2021)
- ISO 26262: Road Vehicles – Functional Safety
- CARLA: An Open Urban Driving Simulator (Dosovitskiy et al., 2017)
Autonomous Vehicle AI: 3D Perception, BEVFormer, and End-to-End Learning
Complete Guide to Autonomous Vehicle AI
Autonomous vehicles are complex AI systems where perception, prediction, planning, and control modules operate in real time. This guide covers the full stack: LiDAR-based 3D perception, BEVFormer camera processing, multi-object tracking, HD maps, end-to-end learning, simulation, and functional safety.
1. The Autonomous Driving Stack
The autonomous driving pipeline consists of four core stages.
- Perception: Understands the surrounding environment from sensor data. Fuses multiple sensors — LiDAR, cameras, radar — to perform 3D object detection and drivable area extraction.
- Prediction: Forecasts future trajectories of nearby vehicles and pedestrians. Transformer-based models capture complex multi-agent interactions.
- Planning: Generates an optimal path to the destination. Rule-based and learning-based methods coexist.
- Control: Produces acceleration, braking, and steering commands. Implemented via PID controllers or Model Predictive Control (MPC).
# Autonomous driving pipeline abstraction
class AutonomousDrivingStack:
def __init__(self, perception, prediction, planner, controller):
self.perception = perception
self.prediction = prediction
self.planner = planner
self.controller = controller
def step(self, sensor_data):
# 1. Perceive
scene = self.perception.process(sensor_data)
# 2. Predict
future_states = self.prediction.forecast(scene)
# 3. Plan
trajectory = self.planner.plan(scene, future_states)
# 4. Control
commands = self.controller.compute(trajectory)
return commands
2. LiDAR Point Cloud 3D Perception
2.1 PointNet++
PointNet++ processes point clouds directly through a hierarchical neural network. Set Abstraction layers progressively aggregate local structure.
import torch
import torch.nn as nn
class SetAbstraction(nn.Module):
"""PointNet++ Set Abstraction layer"""
def __init__(self, npoint, radius, nsample, in_channel, mlp):
super().__init__()
self.npoint = npoint
self.radius = radius
self.nsample = nsample
self.mlp_convs = nn.ModuleList()
self.mlp_bns = nn.ModuleList()
last_channel = in_channel
for out_channel in mlp:
self.mlp_convs.append(nn.Conv2d(last_channel, out_channel, 1))
self.mlp_bns.append(nn.BatchNorm2d(out_channel))
last_channel = out_channel
def forward(self, xyz, points):
# xyz: (B, N, 3), points: (B, C, N)
# Apply MLP after farthest point sampling + ball query
new_xyz = self.farthest_point_sample(xyz, self.npoint)
grouped = self.ball_query(xyz, new_xyz, points)
for conv, bn in zip(self.mlp_convs, self.mlp_bns):
grouped = torch.relu(bn(conv(grouped)))
new_points = grouped.max(dim=-1)[0]
return new_xyz, new_points
def farthest_point_sample(self, xyz, npoint):
B, N, _ = xyz.shape
centroids = torch.zeros(B, npoint, dtype=torch.long)
distance = torch.ones(B, N) * 1e10
farthest = torch.randint(0, N, (B,))
for i in range(npoint):
centroids[:, i] = farthest
centroid = xyz[torch.arange(B), farthest, :].unsqueeze(1)
dist = ((xyz - centroid) ** 2).sum(-1)
distance = torch.min(distance, dist)
farthest = distance.max(-1)[1]
return xyz[torch.arange(B).unsqueeze(1), centroids]
def ball_query(self, xyz, new_xyz, points):
B, N, C = points.transpose(1, 2).shape
return points.unsqueeze(-1).expand(-1, -1, -1, self.nsample)
2.2 VoxelNet
VoxelNet voxelizes the point cloud into 3D grids, applies Voxel Feature Encoding (VFE) layers, then passes the result through a 3D convolutional backbone.
class VoxelFeatureEncoder(nn.Module):
"""VoxelNet VFE layer"""
def __init__(self, in_channels=4, out_channels=128):
super().__init__()
self.fc = nn.Sequential(
nn.Linear(in_channels, 64),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Linear(64, out_channels),
nn.BatchNorm1d(out_channels),
nn.ReLU(),
)
def forward(self, voxel_features, num_points):
# voxel_features: (N_voxels, max_points, C)
B, M, C = voxel_features.shape
x = self.fc(voxel_features.view(-1, C))
x = x.view(B, M, -1)
mask = torch.arange(M, device=x.device).unsqueeze(0) < num_points.unsqueeze(1)
x = x * mask.unsqueeze(-1).float()
return x.max(dim=1)[0]
3. BEVFormer: Camera-Based BEV Perception
BEVFormer transforms multi-camera images into Bird's Eye View (BEV) space for 3D object detection. Deformable Attention projects 3D reference points from BEV space onto image planes to aggregate features.
import torch
import torch.nn as nn
class BEVFormerLayer(nn.Module):
"""BEVFormer Spatial Cross-Attention layer"""
def __init__(self, embed_dim=256, num_heads=8, num_cameras=6):
super().__init__()
self.num_cameras = num_cameras
self.embed_dim = embed_dim
self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)
self.ffn = nn.Sequential(
nn.Linear(embed_dim, embed_dim * 4),
nn.GELU(),
nn.Linear(embed_dim * 4, embed_dim),
)
def forward(self, bev_query, camera_features, lidar2img):
"""
bev_query: (B, H*W, C) - BEV grid queries
camera_features: (B, N_cam, H_img, W_img, C) - camera features
lidar2img: (B, N_cam, 4, 4) - LiDAR-to-image projection matrices
"""
B, Q, C = bev_query.shape
ref_3d = self.get_reference_points(B, device=bev_query.device)
projected = self.project_to_image(ref_3d, lidar2img)
sampled = self.sample_camera_features(camera_features, projected)
attn_out, _ = self.attn(bev_query, sampled, sampled)
bev_query = self.norm1(bev_query + attn_out)
bev_query = self.norm2(bev_query + self.ffn(bev_query))
return bev_query
def get_reference_points(self, B, device, H=200, W=200, Z=8):
xs = torch.linspace(0, 1, W, device=device)
ys = torch.linspace(0, 1, H, device=device)
zs = torch.linspace(0, 1, Z, device=device)
grid = torch.stack(torch.meshgrid(xs, ys, zs), dim=-1)
return grid.view(1, -1, 3).expand(B, -1, -1)
def project_to_image(self, pts_3d, lidar2img):
return pts_3d[..., :2]
def sample_camera_features(self, features, coords):
B, Q, C = features.shape[0], coords.shape[1], features.shape[-1]
return torch.zeros(B, Q, C, device=features.device)
4. 3D Object Detection and Multi-Object Tracking
4.1 CenterPoint
CenterPoint detects object centers as heatmap peaks in BEV space and regresses size, orientation, and velocity.
class CenterPointHead(nn.Module):
"""CenterPoint detection head"""
def __init__(self, in_channels=256, num_classes=10):
super().__init__()
self.heatmap = nn.Conv2d(in_channels, num_classes, 1)
self.offset = nn.Conv2d(in_channels, 2, 1) # x, y offset
self.height = nn.Conv2d(in_channels, 1, 1) # z
self.size = nn.Conv2d(in_channels, 3, 1) # w, l, h
self.rotation = nn.Conv2d(in_channels, 2, 1) # sin, cos
self.velocity = nn.Conv2d(in_channels, 2, 1) # vx, vy
def forward(self, x):
return {
'heatmap': self.heatmap(x).sigmoid(),
'offset': self.offset(x),
'height': self.height(x),
'size': self.size(x),
'rotation': self.rotation(x),
'velocity': self.velocity(x),
}
4.2 Kalman Filter for Multi-Object Tracking
import numpy as np
class KalmanTracker:
"""Kalman filter for 3D object tracking"""
def __init__(self, initial_state):
# State vector: [x, y, z, vx, vy, vz, w, l, h, theta]
self.dt = 0.1
n = 10
self.x = np.array(initial_state, dtype=float).reshape(-1, 1)
self.P = np.eye(n) * 10.0
# State transition (constant velocity model)
self.F = np.eye(n)
for i in range(3):
self.F[i, i + 3] = self.dt
# Observation matrix (position + size + heading)
self.H = np.zeros((7, n))
for i in range(7):
self.H[i, i] = 1.0
self.Q = np.eye(n) * 0.1 # Process noise
self.R = np.eye(7) * 1.0 # Measurement noise
def predict(self):
"""Prediction step"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
return self.x[:7].flatten()
def update(self, z):
"""Update step"""
z = np.array(z).reshape(-1, 1)
y = z - self.H @ self.x # Innovation
S = self.H @ self.P @ self.H.T + self.R # Innovation covariance
K = self.P @ self.H.T @ np.linalg.inv(S) # Kalman gain
self.x = self.x + K @ y
self.P = (np.eye(len(self.x)) - K @ self.H) @ self.P
return self.x[:7].flatten()
class MOT3D:
"""Hungarian algorithm-based 3D multi-object tracking"""
def __init__(self, max_age=3, min_hits=3, iou_threshold=0.3):
self.trackers = []
self.max_age = max_age
self.min_hits = min_hits
self.iou_threshold = iou_threshold
self.frame_count = 0
self.next_id = 0
def update(self, detections):
self.frame_count += 1
predictions = [t.predict() for t in self.trackers]
matched, unmatched_dets, unmatched_trks = self.associate(
detections, predictions
)
for d, t in matched:
self.trackers[t].update(detections[d])
for i in unmatched_dets:
trk = KalmanTracker(detections[i])
trk.id = self.next_id
self.next_id += 1
self.trackers.append(trk)
self.trackers = [
t for t in self.trackers
if t.time_since_update <= self.max_age
]
return self.trackers
def associate(self, detections, predictions):
if not predictions:
return [], list(range(len(detections))), []
return [], list(range(len(detections))), []
5. HD Maps and SLAM
5.1 NDT Matching
Normal Distribution Transform (NDT) represents point clouds as probability distributions for fast scan matching.
class NDTMatching:
"""NDT scan matching for localization"""
def __init__(self, resolution=1.0):
self.resolution = resolution
self.voxel_map = {}
def build_map(self, points):
"""Convert point cloud to NDT voxel map"""
for pt in points:
key = tuple((pt / self.resolution).astype(int))
if key not in self.voxel_map:
self.voxel_map[key] = []
self.voxel_map[key].append(pt)
for key in self.voxel_map:
pts = np.array(self.voxel_map[key])
mean = pts.mean(axis=0)
cov = np.cov(pts.T) if len(pts) > 3 else np.eye(3)
self.voxel_map[key] = {'mean': mean, 'cov': cov}
def score(self, points, transform):
"""Compute NDT score for a given transform"""
score = 0.0
R = transform[:3, :3]
t = transform[:3, 3]
for pt in points:
pt_t = R @ pt + t
key = tuple((pt_t / self.resolution).astype(int))
if key in self.voxel_map:
v = self.voxel_map[key]
diff = pt_t - v['mean']
score += np.exp(-0.5 * diff @ np.linalg.inv(v['cov']) @ diff)
return score
5.2 OpenDRIVE Parsing
import xml.etree.ElementTree as ET
def parse_opendrive(xodr_path):
"""Parse OpenDRIVE HD map format"""
tree = ET.parse(xodr_path)
root = tree.getroot()
roads = []
for road in root.findall('road'):
road_data = {
'id': road.get('id'),
'length': float(road.get('length')),
'lanes': [],
}
for lane_section in road.findall('.//laneSection'):
for lane in lane_section.findall('.//lane'):
lane_data = {
'id': int(lane.get('id')),
'type': lane.get('type'),
'speed': None,
}
speed = lane.find('.//speed')
if speed is not None:
lane_data['speed'] = float(speed.get('max', 0))
road_data['lanes'].append(lane_data)
roads.append(road_data)
return roads
6. Behavior Prediction: Transformer-Based Trajectory Forecasting
class TrajectoryPredictor(nn.Module):
"""Transformer-based multi-agent trajectory prediction"""
def __init__(self, input_dim=6, embed_dim=128, num_heads=4,
num_layers=3, pred_horizon=30, num_modes=6):
super().__init__()
self.input_proj = nn.Linear(input_dim, embed_dim)
encoder_layer = nn.TransformerEncoderLayer(
d_model=embed_dim, nhead=num_heads, batch_first=True
)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
self.mode_query = nn.Embedding(num_modes, embed_dim)
decoder_layer = nn.TransformerDecoderLayer(
d_model=embed_dim, nhead=num_heads, batch_first=True
)
self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
self.traj_head = nn.Linear(embed_dim, pred_horizon * 2)
self.prob_head = nn.Linear(embed_dim, 1)
self.num_modes = num_modes
def forward(self, agent_history, map_features=None):
"""
agent_history: (B, N_agents, T_hist, 6) - x, y, vx, vy, ax, ay
Returns: trajectories (B, N_agents, K, T_pred, 2), probs (B, N_agents, K)
"""
B, N, T, C = agent_history.shape
x = self.input_proj(agent_history.view(B * N, T, C))
memory = self.encoder(x)
queries = self.mode_query.weight.unsqueeze(0).expand(B * N, -1, -1)
decoded = self.decoder(queries, memory)
trajs = self.traj_head(decoded)
probs = self.prob_head(decoded).squeeze(-1).softmax(-1)
pred_horizon = trajs.shape[-1] // 2
trajs = trajs.view(B, N, self.num_modes, pred_horizon, 2)
probs = probs.view(B, N, self.num_modes)
return trajs, probs
7. End-to-End Autonomous Driving
7.1 CARLA Simulator Integration
import carla
import numpy as np
def setup_carla_sensors(world, vehicle):
"""Configure CARLA sensors: camera + LiDAR"""
bp_lib = world.get_blueprint_library()
# RGB camera
cam_bp = bp_lib.find('sensor.camera.rgb')
cam_bp.set_attribute('image_size_x', '1280')
cam_bp.set_attribute('image_size_y', '720')
cam_bp.set_attribute('fov', '90')
cam_transform = carla.Transform(carla.Location(x=1.5, z=2.4))
camera = world.spawn_actor(cam_bp, cam_transform, attach_to=vehicle)
# LiDAR
lidar_bp = bp_lib.find('sensor.lidar.ray_cast')
lidar_bp.set_attribute('channels', '64')
lidar_bp.set_attribute('range', '100')
lidar_bp.set_attribute('points_per_second', '1200000')
lidar_bp.set_attribute('rotation_frequency', '20')
lidar_transform = carla.Transform(carla.Location(x=0.0, z=2.0))
lidar = world.spawn_actor(lidar_bp, lidar_transform, attach_to=vehicle)
return camera, lidar
def domain_randomization(world):
"""Domain randomization to reduce sim-to-real gap"""
import random
weather = carla.WeatherParameters(
cloudiness=random.uniform(0, 80),
precipitation=random.uniform(0, 30),
sun_altitude_angle=random.uniform(15, 90),
fog_density=random.uniform(0, 20),
wetness=random.uniform(0, 50),
)
world.set_weather(weather)
7.2 Imitation Learning (Behavior Cloning)
class BehaviorCloningAgent(nn.Module):
"""Behavior cloning autonomous driving agent"""
def __init__(self, img_channels=3, cmd_dim=4, action_dim=2):
super().__init__()
import torchvision.models as models
backbone = models.resnet34(pretrained=False)
self.visual_encoder = nn.Sequential(*list(backbone.children())[:-2])
self.pool = nn.AdaptiveAvgPool2d(1)
self.cmd_embed = nn.Embedding(cmd_dim, 64) # Commands: straight/left/right/u-turn
self.policy = nn.Sequential(
nn.Linear(512 + 64, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, action_dim), # steer, throttle
nn.Tanh(),
)
def forward(self, image, command):
feat = self.pool(self.visual_encoder(image)).flatten(1)
cmd_feat = self.cmd_embed(command)
x = torch.cat([feat, cmd_feat], dim=-1)
return self.policy(x)
def dagger_data_aggregation(expert_data, agent_rollouts):
"""DAgger: mitigate distributional shift via iterative data collection"""
# Collect expert labels at states visited by the agent
combined = {
'obs': torch.cat([expert_data['obs'], agent_rollouts['obs']]),
'actions': torch.cat([expert_data['actions'], agent_rollouts['expert_actions']]),
}
return combined
8. nuPlan Scenario Evaluation
from nuplan.planning.simulation.planner.abstract_planner import AbstractPlanner
class ReactiveIDMPlanner(AbstractPlanner):
"""nuPlan-compatible IDM planner"""
def __init__(self, target_velocity=15.0, min_gap=2.0):
self.target_velocity = target_velocity
self.min_gap = min_gap
def initialize(self, initialization):
self.initialization = initialization
def name(self):
return 'ReactiveIDMPlanner'
def observation_type(self):
from nuplan.planning.simulation.observation.observation_type import DetectionsTracks
return DetectionsTracks
def compute_planner_trajectory(self, current_input):
ego_state = current_input.history.ego_states[-1]
observations = current_input.observations
accel = self.idm_accel(ego_state, observations)
return self.build_trajectory(ego_state, accel)
def idm_accel(self, ego, obs):
v = ego.dynamic_car_state.speed
delta = 4.0
a_max = 2.0
s_star = self.min_gap + v * 1.5
return a_max * (1 - (v / self.target_velocity) ** delta
- (s_star / max(self.min_gap, 1.0)) ** 2)
def build_trajectory(self, ego_state, accel):
return []
9. Functional Safety: ISO 26262 and SOTIF
Autonomous driving systems must comply with both ISO 26262 (functional safety) and SOTIF (ISO 21448, safety of the intended functionality).
- ASIL Classification: Core autonomous driving functions require ASIL-D, including brake and steering control.
- Hardware Redundancy: Critical sensors and compute modules are duplicated so the system transitions to a safe state on single-point failures.
- Runtime Monitoring: Uncertainty estimates from perception models are measured in real time to detect performance degradation.
- Edge Case Handling: The Operational Design Domain (ODD) boundary is defined for scenarios like backlit roads, dense fog, and wrong-way vehicles, with safe-stop procedures for ODD exits.
class SafetyMonitor:
"""Runtime safety monitor"""
def __init__(self, uncertainty_threshold=0.7, min_detection_confidence=0.5):
self.uncertainty_threshold = uncertainty_threshold
self.min_conf = min_detection_confidence
self.fallback_triggered = False
def check(self, perception_output):
uncertainty = perception_output.get('uncertainty', 0.0)
confidence = perception_output.get('confidence', 1.0)
if uncertainty > self.uncertainty_threshold:
self.trigger_fallback('high_uncertainty')
return False
if confidence < self.min_conf:
self.trigger_fallback('low_confidence')
return False
return True
def trigger_fallback(self, reason):
self.fallback_triggered = True
# MRC (Minimal Risk Condition): initiate safe stop
print(f'[Safety] Fallback triggered: {reason}')
Quiz
Q1. Why is Bird's Eye View (BEV) representation preferred in autonomous driving perception?
Answer: BEV removes perspective distortion, representing 3D space on a 2D plane that preserves true positions and scales.
Explanation: In perspective camera images, object size varies with distance and the geometric relationships of road structures are difficult to interpret. BEV places all objects at approximately their true size and position, making the representation directly usable for path planning. It also simplifies sensor fusion — LiDAR, radar, and camera outputs can all be projected into a single unified space — and creates a clean interface for downstream prediction and planning modules.
Q2. What are the complementary characteristics of LiDAR and cameras in sensor fusion?
Answer: LiDAR provides accurate 3D distance measurements; cameras provide rich color and texture for semantic understanding.
Explanation: LiDAR produces precise point clouds for distance and shape measurement, but lacks the visual appearance information needed for classification. Cameras capture high-resolution images that enable recognition of license plates, traffic light colors, and road signs, but have inherent limitations in depth estimation. Fusing both allows cameras to provide semantic labels while LiDAR anchors the 3D position. LiDAR compensates when cameras degrade in poor weather, while cameras compensate when LiDAR returns are saturated by retro-reflective surfaces.
Q3. How does the Kalman filter cycle through predict and update steps in multi-object tracking (MOT)?
Answer: Each frame, the state transition model predicts future positions, then new detections correct the estimate via measurement update.
Explanation: In the prediction step, the previous state (position, velocity) and a state transition matrix (constant velocity model) are used to estimate the current position and propagate the error covariance. In the update step, the difference between the new detection (measurement) and the prediction (the innovation) is weighted by the Kalman gain to correct the state. The Kalman gain is determined by the ratio of prediction uncertainty to measurement uncertainty — the more accurate the measurement, the more the filter trusts it over the prediction. In frames where detection fails, only the prediction step runs, maintaining track continuity.
Q4. How does domain randomization in CARLA reduce the sim-to-real gap?
Answer: Randomizing simulation parameters during training forces the model to generalize across diverse visual conditions rather than overfitting to a fixed simulation appearance.
Explanation: Real roads feature enormous variation in weather, lighting, vehicle colors, and road surface reflectance. By randomizing parameters such as weather (rain, fog, sun altitude), textures, traffic density, and lighting intensity in CARLA, the trained model does not overfit to the visual biases of any specific simulation configuration. The wide range of conditions encountered in the real world is thus already covered by the training distribution, reducing performance degradation at deployment. Randomizing camera intrinsics and sensor noise models provides additional robustness.
Q5. What is the distributional shift problem in imitation learning for end-to-end driving, and how is it addressed?
Answer: Errors compound as the model visits states unseen during training. DAgger and data augmentation are standard mitigations.
Explanation: Behavior cloning learns from (state, action) pairs collected along expert trajectories. During inference, small errors accumulate, pushing the model into states outside the training distribution where it has never learned the correct action — causing a snowball effect of growing errors. DAgger (Dataset Aggregation) iteratively collects expert labels at states actually visited by the agent, expanding the training distribution to match deployment. Additional strategies include generating recovery scenarios in simulation (e.g., lane departure and recovery), adding noise to observations, and using data augmentation to expose the model to a broader range of inputs.
References
- BEVFormer: Learning Bird's Eye View Representation (Li et al., 2022)
- CenterPoint: Center-based 3D Object Detection (Yin et al., 2021)
- PointNet++: Deep Hierarchical Feature Learning (Qi et al., 2017)
- UniAD: Planning-oriented Autonomous Driving (Hu et al., 2023)
- nuPlan: A closed-loop ML-based planning benchmark (Caesar et al., 2021)
- ISO 26262: Road Vehicles – Functional Safety
- CARLA: An Open Urban Driving Simulator (Dosovitskiy et al., 2017)