Skip to content
Published on

[심층 강화학습] 19. 심층 강화학습의 실전 응용 사례

Authors

개요

심층 강화학습은 Atari 게임과 바둑을 넘어 다양한 실전 영역에서 활용되고 있다. 이 글에서는 로봇 제어, 자율주행, 자원 관리, 추천 시스템, 자연어 처리, 게임 AI 등에서의 응용 사례와 실전 적용 시 고려사항을 정리한다.


로봇 제어 (Robotics)

시뮬레이션에서 현실로 (Sim-to-Real)

로봇 강화학습의 가장 큰 도전은 시뮬레이션과 현실의 간극(sim-to-real gap)이다. 시뮬레이터에서 학습한 정책이 실제 로봇에서는 작동하지 않을 수 있다.

import torch
import torch.nn as nn
import numpy as np

class DomainRandomization:
    """도메인 랜덤화: 시뮬레이션 파라미터를 무작위로 변경하여
    현실 환경에 대한 일반화 능력 향상"""

    def __init__(self, base_env):
        self.base_env = base_env

    def randomize(self):
        """물리 파라미터를 랜덤화"""
        params = {
            'friction': np.random.uniform(0.5, 1.5),
            'mass_scale': np.random.uniform(0.8, 1.2),
            'gravity': np.random.uniform(9.5, 10.1),
            'actuator_noise': np.random.uniform(0.0, 0.05),
            'sensor_noise': np.random.uniform(0.0, 0.02),
        }
        self.base_env.set_physics_params(params)
        return params

    def step_with_randomization(self, action):
        """노이즈가 추가된 환경 스텝"""
        # 행동에 노이즈 추가
        noisy_action = action + np.random.normal(
            0, self.current_params['actuator_noise'],
            size=action.shape
        )
        obs, reward, done, info = self.base_env.step(noisy_action)

        # 관측에 노이즈 추가
        noisy_obs = obs + np.random.normal(
            0, self.current_params['sensor_noise'],
            size=obs.shape
        )
        return noisy_obs, reward, done, info

로봇 매니퓰레이션 학습

class RobotGraspingPolicy(nn.Module):
    """로봇 파지(grasping) 정책"""

    def __init__(self, image_size=84, proprioception_size=7,
                 action_size=4):
        super().__init__()
        # 카메라 이미지 처리
        self.vision = nn.Sequential(
            nn.Conv2d(3, 32, 8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, 4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, 3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
        )

        # 비전 특징 크기 계산
        with torch.no_grad():
            dummy = torch.zeros(1, 3, image_size, image_size)
            vision_size = self.vision(dummy).shape[1]

        # 고유 감각(관절 각도 등) + 비전 결합
        self.policy = nn.Sequential(
            nn.Linear(vision_size + proprioception_size, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
        )

        self.mu = nn.Linear(128, action_size)
        self.log_std = nn.Parameter(torch.zeros(action_size))

    def forward(self, image, proprioception):
        vision_features = self.vision(image)
        combined = torch.cat([vision_features, proprioception], dim=-1)
        features = self.policy(combined)
        mu = self.mu(features)
        std = self.log_std.exp()
        return mu, std

핵심 도전과제

  • 안전성: 실제 로봇은 위험한 행동을 시도할 수 없다
  • 샘플 효율성: 실제 환경에서의 데이터 수집이 느리고 비싸다
  • 보상 설계: 복잡한 조작 작업의 보상을 정의하기 어렵다

자율주행 (Autonomous Driving)

RL 기반 자율주행 아키텍처

class DrivingPolicy(nn.Module):
    """자율주행 정책 네트워크"""

    def __init__(self):
        super().__init__()

        # 센서 퓨전: 카메라 + 라이다 + 레이더
        self.camera_encoder = nn.Sequential(
            nn.Conv2d(3, 32, 5, stride=2),
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2),
            nn.ReLU(),
            nn.Flatten(),
        )

        self.lidar_encoder = nn.Sequential(
            nn.Linear(360, 128),  # 360도 라이다 포인트
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
        )

        # 주행 상태 (속도, 가속도, 조향 각도 등)
        self.state_encoder = nn.Sequential(
            nn.Linear(10, 32),
            nn.ReLU(),
        )

        # 통합 후 행동 결정
        # 특징 크기는 실제 입력에 따라 조정
        self.decision = nn.Sequential(
            nn.Linear(256 + 64 + 32, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
        )

        # 연속 행동: 조향, 가속, 제동
        self.steering = nn.Linear(128, 1)
        self.throttle = nn.Linear(128, 1)
        self.brake = nn.Linear(128, 1)

    def forward(self, camera, lidar, state):
        cam_feat = self.camera_encoder(camera)
        lid_feat = self.lidar_encoder(lidar)
        state_feat = self.state_encoder(state)

        combined = torch.cat([cam_feat, lid_feat, state_feat], dim=-1)
        features = self.decision(combined)

        steering = torch.tanh(self.steering(features))
        throttle = torch.sigmoid(self.throttle(features))
        brake = torch.sigmoid(self.brake(features))

        return steering, throttle, brake

보상 설계

def driving_reward(state, action, next_state):
    """자율주행 보상 함수"""
    reward = 0.0

    # 진행 보상: 목적지 방향으로 이동
    progress = next_state['distance_to_goal'] - state['distance_to_goal']
    reward += -progress * 10.0  # 가까워지면 양수

    # 차선 유지 보상
    lane_deviation = abs(next_state['lane_offset'])
    reward -= lane_deviation * 2.0

    # 속도 보상: 적정 속도 유지
    speed = next_state['speed']
    target_speed = next_state['speed_limit']
    speed_diff = abs(speed - target_speed) / target_speed
    reward -= speed_diff * 1.0

    # 안전 페널티
    if next_state['collision']:
        reward -= 100.0
    if next_state['traffic_violation']:
        reward -= 50.0

    # 편안함: 급가속/급감속 페널티
    jerk = abs(action['acceleration_change'])
    reward -= jerk * 0.5

    return reward

자원 관리 (Resource Management)

클라우드 자원 스케줄링

class ResourceScheduler(nn.Module):
    """클라우드 자원 스케줄링 RL 에이전트"""

    def __init__(self, num_servers, num_job_types, num_actions):
        super().__init__()

        # 상태: 서버 부하 + 대기열 + 현재 시간 특성
        state_size = num_servers * 3 + num_job_types * 2 + 4

        self.net = nn.Sequential(
            nn.Linear(state_size, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
        )
        self.policy = nn.Linear(128, num_actions)
        self.value = nn.Linear(128, 1)

    def forward(self, state):
        features = self.net(state)
        return self.policy(features), self.value(features)

def resource_reward(state, action, next_state, config):
    """자원 관리 보상"""
    reward = 0.0

    # 처리량 최대화
    throughput = next_state['completed_jobs']
    reward += throughput * config['throughput_weight']

    # 지연 시간 최소화
    avg_latency = next_state['avg_latency']
    reward -= avg_latency * config['latency_weight']

    # 자원 비용 최소화
    cost = next_state['resource_cost']
    reward -= cost * config['cost_weight']

    # SLA 위반 페널티
    sla_violations = next_state['sla_violations']
    reward -= sla_violations * config['sla_penalty']

    return reward

네트워크 라우팅 최적화

class NetworkRouter(nn.Module):
    """네트워크 트래픽 라우팅 에이전트"""

    def __init__(self, num_nodes, num_links):
        super().__init__()
        state_size = num_nodes + num_links * 2  # 노드 수요 + 링크 용량/부하

        self.encoder = nn.Sequential(
            nn.Linear(state_size, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
        )

        # 각 링크의 트래픽 비율 결정
        self.router = nn.Sequential(
            nn.Linear(128, num_links),
            nn.Softmax(dim=-1),  # 트래픽 분배 비율
        )

    def forward(self, network_state):
        features = self.encoder(network_state)
        routing_weights = self.router(features)
        return routing_weights

추천 시스템 (Recommendation Systems)

순차적 추천에 RL 적용

추천 시스템을 강화학습으로 모델링하면 장기적인 사용자 만족도를 최적화할 수 있다.

class RecommendationAgent(nn.Module):
    """RL 기반 추천 에이전트"""

    def __init__(self, num_items, embed_dim=64, hidden_size=128):
        super().__init__()

        # 아이템 임베딩
        self.item_embedding = nn.Embedding(num_items, embed_dim)

        # 사용자 히스토리 인코딩 (GRU)
        self.history_encoder = nn.GRU(
            embed_dim, hidden_size, batch_first=True
        )

        # 사용자 특성 인코딩
        self.user_encoder = nn.Linear(20, 32)  # 사용자 프로파일

        # Q-네트워크: 각 아이템의 가치 예측
        self.q_network = nn.Sequential(
            nn.Linear(hidden_size + 32, 256),
            nn.ReLU(),
            nn.Linear(256, num_items),
        )

    def forward(self, user_history, user_features):
        # 히스토리 인코딩
        history_embedded = self.item_embedding(user_history)
        _, hidden = self.history_encoder(history_embedded)
        history_feature = hidden.squeeze(0)

        # 사용자 특성 인코딩
        user_feature = self.user_encoder(user_features)

        # Q 값 계산
        combined = torch.cat([history_feature, user_feature], dim=-1)
        q_values = self.q_network(combined)

        return q_values

def recommendation_reward(user_response, item_type):
    """추천 보상: 단기 + 장기"""
    immediate = 0.0

    if user_response == 'click':
        immediate += 1.0
    elif user_response == 'purchase':
        immediate += 5.0
    elif user_response == 'skip':
        immediate -= 0.1

    # 다양성 보너스: 다양한 카테고리 추천 장려
    # (장기적 사용자 만족을 위해)

    return immediate

자연어 처리 (NLP)

RLHF: 인간 피드백 기반 강화학습

현대 대규모 언어모델(LLM)의 핵심 학습 기법이다:

class RLHFTrainer:
    """RLHF 학습 개념적 구현"""

    def __init__(self, policy_model, reward_model, ref_model,
                 kl_coef=0.1):
        self.policy = policy_model      # 학습 대상 LLM
        self.reward_model = reward_model # 인간 선호도 모델
        self.ref_model = ref_model       # 기준 모델 (고정)
        self.kl_coef = kl_coef

        self.optimizer = torch.optim.Adam(
            self.policy.parameters(), lr=1e-5
        )

    def compute_reward(self, prompt, response):
        """보상 = 보상 모델 점수 - KL 페널티"""
        # 보상 모델 점수
        reward_score = self.reward_model.score(prompt, response)

        # KL 발산 페널티 (기준 모델과의 차이)
        policy_logprobs = self.policy.log_prob(prompt, response)
        ref_logprobs = self.ref_model.log_prob(prompt, response)
        kl_penalty = policy_logprobs - ref_logprobs

        return reward_score - self.kl_coef * kl_penalty

    def train_step(self, prompts):
        """PPO 기반 RLHF 학습 스텝"""
        # 1. 현재 정책으로 응답 생성
        responses = self.policy.generate(prompts)

        # 2. 보상 계산
        rewards = [
            self.compute_reward(p, r)
            for p, r in zip(prompts, responses)
        ]

        # 3. PPO 업데이트
        self._ppo_update(prompts, responses, rewards)

텍스트 요약에 RL 적용

class SummarizationRL:
    """강화학습 기반 텍스트 요약"""

    def __init__(self, model, rouge_weight=1.0, length_weight=0.1):
        self.model = model
        self.rouge_weight = rouge_weight
        self.length_weight = length_weight

    def reward_function(self, source, generated_summary, reference):
        """ROUGE 점수 기반 보상"""
        rouge_score = compute_rouge(generated_summary, reference)

        # 길이 페널티: 너무 길거나 짧은 요약 방지
        target_ratio = 0.3  # 원문의 30%
        actual_ratio = len(generated_summary) / max(len(source), 1)
        length_penalty = -abs(actual_ratio - target_ratio)

        reward = (self.rouge_weight * rouge_score
                  + self.length_weight * length_penalty)
        return reward

게임 AI (Beyond Atari)

OpenAI Five (Dota 2)

OpenAI Five는 5명이 한 팀인 Dota 2에서 세계 챔피언 팀을 이겼다. 핵심 도전:

  • 부분 관측: 전장 안개(fog of war)로 적 위치를 모름
  • 장기 계획: 45분 이상의 긴 경기
  • 팀 협력: 5명의 에이전트가 협력해야 함
  • 거대한 행동 공간: 수천 개의 가능한 행동

멀티 에이전트 협력 학습

class MultiAgentPolicy(nn.Module):
    """멀티 에이전트 협력 정책 (파라미터 공유)"""

    def __init__(self, obs_size, act_size, num_agents,
                 comm_size=32):
        super().__init__()
        self.num_agents = num_agents

        # 개별 관측 인코더
        self.obs_encoder = nn.Sequential(
            nn.Linear(obs_size, 128),
            nn.ReLU(),
        )

        # 통신 채널
        self.comm_encoder = nn.Sequential(
            nn.Linear(128, comm_size),
            nn.ReLU(),
        )

        # 수신 메시지 통합
        self.message_integrator = nn.Sequential(
            nn.Linear(comm_size * (num_agents - 1), 64),
            nn.ReLU(),
        )

        # 최종 정책
        self.policy = nn.Sequential(
            nn.Linear(128 + 64, 128),
            nn.ReLU(),
            nn.Linear(128, act_size),
        )

    def forward(self, observations):
        """
        observations: (batch, num_agents, obs_size)
        """
        batch_size = observations.shape[0]

        # 각 에이전트의 관측 인코딩
        obs_flat = observations.view(-1, observations.shape[-1])
        encoded = self.obs_encoder(obs_flat)
        encoded = encoded.view(batch_size, self.num_agents, -1)

        # 통신 메시지 생성
        messages = self.comm_encoder(
            encoded.view(-1, encoded.shape[-1])
        ).view(batch_size, self.num_agents, -1)

        # 각 에이전트별 타 에이전트 메시지 수집 및 정책 계산
        all_policies = []
        for i in range(self.num_agents):
            other_msgs = torch.cat(
                [messages[:, j] for j in range(self.num_agents) if j != i],
                dim=-1
            )
            integrated = self.message_integrator(other_msgs)
            combined = torch.cat([encoded[:, i], integrated], dim=-1)
            policy_logits = self.policy(combined)
            all_policies.append(policy_logits)

        return torch.stack(all_policies, dim=1)

실전 적용 시 고려사항

공통 도전과제

  1. 보상 설계: 잘못된 보상은 의도하지 않은 행동을 유발한다 (reward hacking)
# 나쁜 예: 점수만 최대화하면 비윤리적 전략 학습 가능
reward = game_score

# 좋은 예: 다양한 목표를 균형 있게 반영
reward = (score_weight * game_score
          - safety_weight * violations
          + fairness_weight * equity_metric)
  1. 샘플 효율성: 실제 환경에서의 데이터 수집이 비싸다

  2. 안전성: 학습 중 위험한 행동을 방지해야 한다

  3. 평가: 시뮬레이터 성능이 반드시 실전 성능을 보장하지 않는다

실전 적용 체크리스트

항목확인 내용
문제 정의상태/행동/보상이 명확하게 정의되었는가
시뮬레이터충분히 현실적인 시뮬레이터가 있는가
기준선간단한 규칙 기반 방법과 비교했는가
안전성학습 중 안전 제약이 보장되는가
평가다양한 시나리오에서 테스트했는가
배포실시간 추론 성능이 충분한가

핵심 요약

  • 강화학습은 로봇 제어, 자율주행, 자원 관리, 추천, NLP, 게임 등 폭넓게 활용된다
  • Sim-to-Real, RLHF, 멀티 에이전트 등 도메인별 특화 기법이 중요하다
  • 보상 설계, 안전성, 샘플 효율성이 실전 적용의 핵심 과제이다
  • 실전 적용 시 간단한 기준선부터 시작하여 점진적으로 복잡도를 높이는 것이 좋다

다음 글에서는 이 시리즈의 마지막으로 심층 강화학습 알고리즘의 총정리와 선택 가이드를 다룬다.