Skip to content
Published on

오픈소스 실시간 대화형 음성 챗봇 구축 가이드: Barge-In(응답 중단) 지원 아키텍처와 구현

Authors
  • Name
    Twitter

들어가며

음성 AI 어시스턴트를 써본 사람이라면 한 번쯤 경험했을 것이다. AI가 장황하게 대답하고 있는데, 끝까지 들어야만 다음 말을 할 수 있는 답답함. 사람과 사람의 대화에서는 상대방이 말하는 도중에 끼어들 수 있고(barge-in), 상대방도 자연스럽게 말을 멈추고 내 말을 듣는다. 실시간 음성 챗봇에서 이 barge-in을 구현하는 것은 단순히 "TTS를 멈추면 되는 것 아닌가?" 수준의 문제가 아니다. 마이크 입력과 스피커 출력이 동시에 돌아가는 full-duplex 환경에서 에코를 제거하고, VAD(Voice Activity Detection)로 사용자 발화를 감지하며, 진행 중인 LLM 생성과 TTS 합성을 즉시 취소하고, 새로운 사용자 입력을 처리하는 파이프라인 전체를 제어해야 한다.

이 글에서는 100% 오픈소스로 구성할 수 있는 실시간 음성 챗봇 아키텍처를 설계하고, barge-in을 포함한 핵심 기능을 Python으로 구현하는 방법을 다룬다. 상용 API 없이 로컬 GPU 하나(또는 CPU만)로 동작하는 시스템을 목표로 한다.

오픈소스 스택 비교 및 선택

VAD(Voice Activity Detection) 비교

사용자가 말하고 있는지 침묵인지를 판별하는 VAD는 barge-in의 핵심 트리거다. 실시간 시스템에서는 30ms 이내에 판별을 완료해야 한다.

항목Silero VADwebrtcvadPicovoice Cobra
방식DNN (PyTorch/ONNX)GMM 기반 신호처리DNN (독점 엔진)
정확도 (TPR@5%FPR)87.7%50%98.5%
모델 크기1.8 MB~100 KB~5 MB
처리 시간 (30ms 청크)~1ms0.1ms 미만~0.5ms
언어 지원6000+ 언어언어 무관다국어
라이선스MITBSD상용(제한 무료)
스트리밍OOO

추천: 오픈소스 구성에서는 Silero VAD가 정확도와 라이선스 면에서 최적이다. webrtcvad는 너무 많은 false negative(발화 절반 누락)를 보이며, Picovoice Cobra는 정확도가 높지만 상용 라이선스다.

웨이크워드(Wake Word) 옵션

항상 마이크를 켜두는 대신 특정 호출어로 챗봇을 깨우려면 웨이크워드 감지가 필요하다.

항목openWakeWordPorcupineSnowboy (레거시)
커스텀 워드O (학습 필요)O (콘솔에서 생성)O
내장 VADSilero VAD 통합XX
정확도중상
라이선스Apache 2.0상용(제한 무료)Apache 2.0 (지원 종료)

추천: 오픈소스 완전 구성 시 openWakeWord + Silero VAD 조합. 상용 혼합 가능 시 Porcupine이 정확도 면에서 우세.

STT(Speech-to-Text) 비교

항목faster-whisperwhisper.cppVosk
기반CTranslate2 (Whisper)ggml (Whisper)Kaldi/독자 모델
한국어 WER~12% (large-v3)~12% (large-v3)~25%
GPU 가속CUDA (CTranslate2)Metal/CUDA/VulkanX (CPU only)
실시간 스트리밍△ (VAD 기반 청크)△ (청크 단위)O (네이티브)
메모리~3 GB (large-v3)~3 GB (large-v3)~50 MB (small)
처리 속도 (GPU)~15x RT~10x RT~1x RT (CPU)
라이선스MITMITApache 2.0

추천: GPU 환경에서는 faster-whisper (large-v3 또는 medium)가 정확도·속도 모두 최적. CPU 전용 경량 환경에서는 Vosk가 실시간 스트리밍 네이티브 지원으로 유리.

LLM 로컬 서빙 비교

항목OllamavLLMLocalAI
설치 난이도★☆☆ (원커맨드)★★☆★★☆
OpenAI 호환 APIOOO
동시 사용자 처리약함 (1~2명)강함 (PagedAttention)중간
GPU 메모리 효율중간높음 (KV 캐시 최적화)중간
스트리밍O (SSE)O (SSE)O (SSE)
모델 생태계Ollama Hub (풍부)HuggingFace 직접다수 지원
적합 시나리오개인/프로토타입프로덕션/다중사용자멀티모달 통합

추천: 프로토타입 및 단일 사용자 환경에서는 Ollama가 설치·운영 편의성 최고. 동시 접속 5+ 환경에서는 vLLM이 처리량과 레이턴시 예측성에서 우세. 두 도구 모두 OpenAI 호환 API를 제공하므로 코드 수정 없이 교체 가능하다.

TTS(Text-to-Speech) 비교

항목PiperCoqui XTTS v2StyleTTS 2
아키텍처VITSGPT + VITSDiffusion + Style
한국어O (커뮤니티 모델)O (17개 언어)△ (파인튜닝 필요)
음성 복제XO (6초 샘플)O (파인튜닝)
스트리밍O (청크 단위)O (200ms 미만 지연)X (배치 합성)
GPU 필요X (CPU OK)O (권장)O (필수)
합성 속도~50x RT (CPU)~5x RT (GPU)~3x RT (GPU)
음질 (MOS)3.8~4.14.2~4.54.3~4.5
라이선스MITMPL 2.0MIT
유지보수활발⚠️ Coqui 사업 종료커뮤니티

추천: barge-in 시스템에서는 Piper가 최적이다. CPU만으로 50배 실시간 속도를 달성하여 TTS 지연이 거의 없고, 스트리밍 합성을 지원하며, 즉시 중단이 용이하다. 음질이 더 중요한 경우 XTTS v2를 사용하되, Coqui AI 사업 종료(2025.12) 이후 커뮤니티 포크(coqui-tts)를 활용해야 한다.

실시간 오디오 전송 비교

항목WebSocket (raw)LiveKitaiortc
프로토콜WS over TCPWebRTC (SFU)WebRTC (P2P/SFU)
지연시간50~200ms50ms 미만50ms 미만
에코 제거직접 구현내장 AEC직접 구현
NAT 통과추가 설정내장 TURN/STUNICE 지원
확장성직접 구현SFU 자동 확장제한적
Python SDKwebsocketslivekit-agentsaiortc
구현 복잡도낮음중간높음

추천: 로컬 단독 실행이면 WebSocket으로 충분. 브라우저 클라이언트가 필요하거나 네트워크 품질이 가변적이면 LiveKit이 에코 제거·NAT 통과·SFU 확장을 모두 제공한다. LiveKit의 Agents 프레임워크는 STT/LLM/TTS 파이프라인과 barge-in 턴 감지를 내장하고 있어 프로덕션 환경에서 유리하다.

아키텍처 설계

전체 파이프라인 아키텍처

┌─────────────────────────────────────────────────────────────┐
Client Device│  ┌──────────┐    ┌──────────┐    ┌──────────┐               │
│  │   Mic    │───▶│  Audio   │───▶│ WebSocket│──── network ──┤
│  │  Input   │    │  Capture │    │  Client  │               │
│  └──────────┘    └──────────┘    └──────────┘               │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐               │
│  │ Speaker  │◀───│  Audio   │◀───│ WebSocket│◀── network ──┤
│  │  Output  │    │ Playback │    │  Client  │               │
│  └──────────┘    └──────────┘    └──────────┘               │
└─────────────────────────────────────────────────────────────┘
                           │ ▲
                     audio │ │ audio
                     bytes │ │ bytes
                           ▼ │
┌─────────────────────────────────────────────────────────────┐
Voice Pipeline Server│                                                             │
│  ┌───────────┐   ┌───────────┐   ┌───────────┐             │
│  │ Silero    │──▶│ faster-   │──▶│  Ollama   │             │
│  │ VAD       │   │ whisper   │     (LLM)    │             │
│  │           │    (STT)     │   │           │             │
│  └───────────┘   └───────────┘   └─────┬─────┘             │
│       │                                 │                   │
│       │  barge-in                token   │                   │
│       │  signal                 stream  │                   │
│       ▼                                 ▼                   │
│  ┌───────────┐               ┌───────────┐                  │
│  │  State    │◀─────────────▶│  Piper    │                  │
│  │  Machine  │   cancel/       (TTS)    │                  │
│  │           │   resume      │           │                  │
│  └───────────┘               └───────────┘                  │
│                                    │                        │
│                              audio │ chunks                 │
│                                    ▼                        │
│                             ┌───────────┐                   │
│                             │  Audio    │                   │
│                             │  Output   │                   │
│                             │  Queue    │                   │
│                             └───────────┘                   │
└─────────────────────────────────────────────────────────────┘

핵심 설계 원칙

  1. 비동기 파이프라인: 모든 컴포넌트는 asyncio 기반으로 동작하며, asyncio.Queue로 연결
  2. 즉시 취소 가능: LLM 스트리밍과 TTS 합성은 asyncio.Task로 관리하여 .cancel() 호출로 즉시 중단
  3. Full-Duplex: 마이크 입력과 스피커 출력이 동시에 동작하며, 에코 제거(AEC)로 자기 출력을 필터링
  4. 상태 기반 제어: 상태머신이 전체 파이프라인의 동작 흐름을 결정

상태머신 설계

barge-in을 안정적으로 구현하려면 명확한 상태 전이(state transition)가 필수다. 다음 5개 상태로 설계한다.

                    ┌──────────────────────────────────┐
                    │                                  │
                    ▼                                  │
              ┌──────────┐                             │
              │          │  voice_detected             │
         ┌───▶│   IDLE   │─────────────────┐           │
         │    │          │                 │           │
         │    └──────────┘                 ▼           │
         │                          ┌──────────┐       │
         │                          │LISTENING │       │
         │                          │          │       │
         │                          └────┬─────┘       │
         │                               │             │
         │                    silence_   │             │
         │                    detected   │             │
         │                               ▼             │
         │                          ┌──────────┐       │
         │         timeout/THINKING  │       │
         │         error            │          │       │
         │           ┌──────────────┴────┬─────┘       │
         │           │                   │             │
         │           │        first_     │             │
         │           │        audio_     │             │
         │           │        chunk      │             │
         │           ▼                   ▼             │
         │    ┌──────────┐         ┌──────────┐        │
         │    │  ERROR   │         │ SPEAKING │        │
         │    │          │         │          │───┐    │
         │    └──────────┘         └────┬─────┘   │    │
         │                              │         │    │
         │                   voice_     │   barge_│    │
         │                   end        │   in    │    │
         │                              │         │    │
         │                              ▼         ▼    │
         │                                  ┌──────────┐
         └──────────────────────────────────│INTERRUPTED                                            └──────────┘

상태 전이 규칙

from enum import Enum, auto

class State(Enum):
    IDLE = auto()        # 대기 중 (웨이크워드 또는 음성 감지 대기)
    LISTENING = auto()   # 사용자 발화 녹음 중
    THINKING = auto()    # STT → LLM 처리 중
    SPEAKING = auto()    # TTS 음성 재생 중
    INTERRUPTED = auto() # barge-in 발생, 정리 후 LISTENING 전환

TRANSITIONS = {
    State.IDLE:        {Event.VOICE_DETECTED: State.LISTENING},
    State.LISTENING:   {Event.SILENCE_DETECTED: State.THINKING,
                        Event.TIMEOUT: State.IDLE},
    State.THINKING:    {Event.FIRST_AUDIO_CHUNK: State.SPEAKING,
                        Event.ERROR: State.IDLE,
                        Event.BARGE_IN: State.INTERRUPTED},
    State.SPEAKING:    {Event.PLAYBACK_DONE: State.IDLE,
                        Event.BARGE_IN: State.INTERRUPTED},
    State.INTERRUPTED: {Event.CLEANUP_DONE: State.LISTENING},
}

Barge-In 발생 시 처리 순서

  1. VAD가 SPEAKING/THINKING 중 사용자 음성 감지
  2. 상태를 INTERRUPTED로 전환
  3. 즉시 실행: TTS 오디오 출력 큐 클리어 + 스피커 재생 중단
  4. 즉시 실행: LLM 스트리밍 Task 취소 (task.cancel())
  5. 즉시 실행: TTS 합성 Task 취소
  6. 부분 응답 히스토리 보존 (컨텍스트 유지)
  7. CLEANUP_DONE 이벤트 발행 → LISTENING 전환
  8. 새로운 사용자 발화 녹음 시작

Python 최소 동작 예시

아래는 마이크 입력 → VAD → STT → LLM → TTS → 스피커 출력의 전체 파이프라인을 barge-in 지원과 함께 구현한 최소 동작 코드다.

의존성 설치

pip install silero-vad faster-whisper openai-whisper piper-tts \
            sounddevice numpy httpx asyncio
# Ollama 설치 (macOS/Linux)
curl -fsSL https://ollama.com/install.sh | sh
ollama pull gemma3:4b  # 또는 원하는 모델

핵심 코드

import asyncio
import numpy as np
import sounddevice as sd
import httpx
import io
import wave
from enum import Enum, auto
from collections import deque
from dataclasses import dataclass, field
from typing import Optional

# ──────────────────────────────────────────────
# 1. 상태머신 정의
# ──────────────────────────────────────────────
class State(Enum):
    IDLE = auto()
    LISTENING = auto()
    THINKING = auto()
    SPEAKING = auto()
    INTERRUPTED = auto()

class Event(Enum):
    VOICE_DETECTED = auto()
    SILENCE_DETECTED = auto()
    FIRST_AUDIO_CHUNK = auto()
    PLAYBACK_DONE = auto()
    BARGE_IN = auto()
    CLEANUP_DONE = auto()
    TIMEOUT = auto()
    ERROR = auto()

TRANSITIONS = {
    State.IDLE:        {Event.VOICE_DETECTED: State.LISTENING},
    State.LISTENING:   {Event.SILENCE_DETECTED: State.THINKING,
                        Event.TIMEOUT: State.IDLE},
    State.THINKING:    {Event.FIRST_AUDIO_CHUNK: State.SPEAKING,
                        Event.ERROR: State.IDLE,
                        Event.BARGE_IN: State.INTERRUPTED},
    State.SPEAKING:    {Event.PLAYBACK_DONE: State.IDLE,
                        Event.BARGE_IN: State.INTERRUPTED},
    State.INTERRUPTED: {Event.CLEANUP_DONE: State.LISTENING},
}

@dataclass
class PipelineContext:
    state: State = State.IDLE
    audio_buffer: bytearray = field(default_factory=bytearray)
    conversation: list = field(default_factory=list)
    llm_task: Optional[asyncio.Task] = None
    tts_task: Optional[asyncio.Task] = None
    playback_queue: asyncio.Queue = field(default_factory=asyncio.Queue)
    interrupt_event: asyncio.Event = field(default_factory=asyncio.Event)

    def transition(self, event: Event) -> bool:
        allowed = TRANSITIONS.get(self.state, {})
        if event in allowed:
            old = self.state
            self.state = allowed[event]
            print(f"[FSM] {old.name} --{event.name}--> {self.state.name}")
            return True
        print(f"[FSM] {self.state.name}: {event.name} ignored")
        return False

# ──────────────────────────────────────────────
# 2. VAD 모듈 (Silero VAD)
# ──────────────────────────────────────────────
import torch

class VoiceActivityDetector:
    def __init__(self, threshold: float = 0.5):
        self.model, self.utils = torch.hub.load(
            'snakers4/silero-vad', 'silero_vad', onnx=True
        )
        self.threshold = threshold
        self.sample_rate = 16000
        self._silence_frames = 0
        self.silence_limit = 30  # 30 frames × 30ms = 900ms

    def process_chunk(self, audio_chunk: np.ndarray) -> str:
        """30ms 오디오 청크를 받아 'speech'/'silence'/'end' 반환."""
        tensor = torch.from_numpy(audio_chunk).float()
        prob = self.model(tensor, self.sample_rate).item()

        if prob >= self.threshold:
            self._silence_frames = 0
            return "speech"
        else:
            self._silence_frames += 1
            if self._silence_frames >= self.silence_limit:
                self._silence_frames = 0
                return "end"
            return "silence"

    def reset(self):
        self.model.reset_states()
        self._silence_frames = 0

# ──────────────────────────────────────────────
# 3. STT 모듈 (faster-whisper)
# ──────────────────────────────────────────────
from faster_whisper import WhisperModel

class SpeechToText:
    def __init__(self, model_size: str = "medium", device: str = "cuda"):
        self.model = WhisperModel(
            model_size, device=device, compute_type="float16"
        )

    def transcribe(self, audio: np.ndarray) -> str:
        segments, _ = self.model.transcribe(
            audio, language="ko", beam_size=5,
            vad_filter=True
        )
        return " ".join(seg.text for seg in segments).strip()

# ──────────────────────────────────────────────
# 4. LLM 모듈 (Ollama OpenAI 호환 API)
# ──────────────────────────────────────────────
async def stream_llm_response(
    messages: list[dict],
    model: str = "gemma3:4b",
    base_url: str = "http://localhost:11434/v1",
) -> asyncio.AsyncGenerator:
    """Ollama에 스트리밍 요청, 토큰 단위로 yield."""
    async with httpx.AsyncClient(timeout=30.0) as client:
        async with client.stream(
            "POST",
            f"{base_url}/chat/completions",
            json={
                "model": model,
                "messages": messages,
                "stream": True,
                "max_tokens": 512,
            },
        ) as resp:
            async for line in resp.aiter_lines():
                if not line.startswith("data: "):
                    continue
                data = line[6:]
                if data == "[DONE]":
                    break
                import json
                chunk = json.loads(data)
                delta = chunk["choices"][0].get("delta", {})
                if content := delta.get("content"):
                    yield content

# ──────────────────────────────────────────────
# 5. TTS 모듈 (Piper)
# ──────────────────────────────────────────────
import subprocess

class TextToSpeech:
    def __init__(self, model_path: str, config_path: str):
        self.model_path = model_path
        self.config_path = config_path

    async def synthesize(self, text: str) -> bytes:
        """텍스트를 WAV 바이트로 변환."""
        proc = await asyncio.create_subprocess_exec(
            "piper",
            "--model", self.model_path,
            "--config", self.config_path,
            "--output-raw",
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, _ = await proc.communicate(text.encode("utf-8"))
        return stdout

    async def synthesize_streaming(
        self, text_chunks: asyncio.Queue, audio_queue: asyncio.Queue
    ):
        """문장 단위로 TTS 합성 후 오디오 큐에 삽입."""
        buffer = ""
        sentence_delimiters = {".", "!", "?", "。", "!", "?", "\n"}

        while True:
            chunk = await text_chunks.get()
            if chunk is None:  # 종료 시그널
                if buffer.strip():
                    audio = await self.synthesize(buffer)
                    await audio_queue.put(audio)
                await audio_queue.put(None)  # 재생 종료 시그널
                break

            buffer += chunk
            # 문장 완성 시 즉시 합성
            for delim in sentence_delimiters:
                if delim in buffer:
                    parts = buffer.split(delim, 1)
                    sentence = parts[0] + delim
                    buffer = parts[1] if len(parts) > 1 else ""
                    if sentence.strip():
                        audio = await self.synthesize(sentence.strip())
                        await audio_queue.put(audio)
                    break

# ──────────────────────────────────────────────
# 6. 오디오 재생 (barge-in 대응)
# ──────────────────────────────────────────────
async def play_audio(ctx: PipelineContext, sample_rate: int = 22050):
    """오디오 큐에서 청크를 꺼내 재생. interrupt 시 즉시 중단."""
    while True:
        audio_bytes = await ctx.playback_queue.get()
        if audio_bytes is None:
            ctx.transition(Event.PLAYBACK_DONE)
            break

        audio_array = np.frombuffer(audio_bytes, dtype=np.int16)
        chunk_size = sample_rate // 10  # 100ms 단위 재생

        for i in range(0, len(audio_array), chunk_size):
            if ctx.interrupt_event.is_set():
                # barge-in! 즉시 재생 중단
                # 큐 비우기
                while not ctx.playback_queue.empty():
                    try:
                        ctx.playback_queue.get_nowait()
                    except asyncio.QueueEmpty:
                        break
                return

            segment = audio_array[i:i + chunk_size]
            sd.play(segment, samplerate=sample_rate, blocking=True)

# ──────────────────────────────────────────────
# 7. 메인 파이프라인 (Barge-In 지원)
# ──────────────────────────────────────────────
async def run_pipeline():
    ctx = PipelineContext()
    vad = VoiceActivityDetector(threshold=0.5)
    stt = SpeechToText(model_size="medium", device="cuda")
    tts = TextToSpeech(
        model_path="ko_KR-kss-medium.onnx",
        config_path="ko_KR-kss-medium.onnx.json",
    )

    SAMPLE_RATE = 16000
    CHUNK_MS = 30
    CHUNK_SAMPLES = int(SAMPLE_RATE * CHUNK_MS / 1000)

    print("[Pipeline] 시작. 말씀하세요...")

    while True:
        ctx.interrupt_event.clear()

        # ── IDLE → LISTENING ──
        if ctx.state == State.IDLE:
            vad.reset()
            ctx.audio_buffer = bytearray()

            # 마이크에서 30ms 청크 읽기
            audio_chunk = sd.rec(
                CHUNK_SAMPLES, samplerate=SAMPLE_RATE,
                channels=1, dtype="float32"
            )
            sd.wait()
            audio_np = audio_chunk.flatten()

            result = vad.process_chunk(audio_np)
            if result == "speech":
                ctx.transition(Event.VOICE_DETECTED)
                ctx.audio_buffer.extend(
                    (audio_np * 32767).astype(np.int16).tobytes()
                )

        # ── LISTENING ──
        elif ctx.state == State.LISTENING:
            audio_chunk = sd.rec(
                CHUNK_SAMPLES, samplerate=SAMPLE_RATE,
                channels=1, dtype="float32"
            )
            sd.wait()
            audio_np = audio_chunk.flatten()
            ctx.audio_buffer.extend(
                (audio_np * 32767).astype(np.int16).tobytes()
            )

            result = vad.process_chunk(audio_np)
            if result == "end":
                ctx.transition(Event.SILENCE_DETECTED)

        # ── THINKING ──
        elif ctx.state == State.THINKING:
            # STT 변환
            audio_data = np.frombuffer(
                ctx.audio_buffer, dtype=np.int16
            ).astype(np.float32) / 32767.0
            user_text = stt.transcribe(audio_data)
            print(f"[User] {user_text}")

            if not user_text.strip():
                ctx.transition(Event.ERROR)
                continue

            ctx.conversation.append({"role": "user", "content": user_text})

            # LLM 스트리밍 + TTS 동시 실행
            text_queue: asyncio.Queue = asyncio.Queue()
            full_response = []

            async def llm_to_tts():
                try:
                    async for token in stream_llm_response(ctx.conversation):
                        if ctx.interrupt_event.is_set():
                            break
                        full_response.append(token)
                        await text_queue.put(token)
                    await text_queue.put(None)
                except asyncio.CancelledError:
                    await text_queue.put(None)

            async def tts_worker():
                await tts.synthesize_streaming(text_queue, ctx.playback_queue)

            async def vad_monitor():
                """THINKING/SPEAKING 중 barge-in 감지."""
                while ctx.state in (State.THINKING, State.SPEAKING):
                    chunk = sd.rec(
                        CHUNK_SAMPLES, samplerate=SAMPLE_RATE,
                        channels=1, dtype="float32"
                    )
                    sd.wait()
                    r = vad.process_chunk(chunk.flatten())
                    if r == "speech" and ctx.state == State.SPEAKING:
                        print("[Barge-In] 사용자 발화 감지! 응답 중단")
                        ctx.interrupt_event.set()
                        ctx.transition(Event.BARGE_IN)
                        return
                    await asyncio.sleep(0.01)

            # 태스크 실행
            ctx.llm_task = asyncio.create_task(llm_to_tts())
            ctx.tts_task = asyncio.create_task(tts_worker())
            playback_task = asyncio.create_task(play_audio(ctx))
            monitor_task = asyncio.create_task(vad_monitor())

            ctx.transition(Event.FIRST_AUDIO_CHUNK)

            # 완료 또는 인터럽트 대기
            done, pending = await asyncio.wait(
                [ctx.llm_task, ctx.tts_task, playback_task, monitor_task],
                return_when=asyncio.FIRST_COMPLETED,
            )

            # barge-in 발생 시 정리
            if ctx.interrupt_event.is_set():
                for task in pending:
                    task.cancel()
                    try:
                        await task
                    except asyncio.CancelledError:
                        pass

                # 부분 응답 보존
                partial = "".join(full_response)
                if partial:
                    ctx.conversation.append(
                        {"role": "assistant", "content": partial + " [중단됨]"}
                    )
                ctx.transition(Event.CLEANUP_DONE)
                ctx.audio_buffer = bytearray()
                vad.reset()
            else:
                # 정상 완료
                for task in pending:
                    await task
                response_text = "".join(full_response)
                ctx.conversation.append(
                    {"role": "assistant", "content": response_text}
                )
                print(f"[Assistant] {response_text}")

        # ── INTERRUPTED → LISTENING ──
        elif ctx.state == State.INTERRUPTED:
            ctx.transition(Event.CLEANUP_DONE)

        await asyncio.sleep(0.001)

if __name__ == "__main__":
    asyncio.run(run_pipeline())

코드 핵심 포인트

  1. interrupt_event: asyncio.Event로 barge-in 시그널을 전파. 모든 워커가 이 이벤트를 체크
  2. asyncio.wait(return_when=FIRST_COMPLETED): VAD 모니터가 barge-in을 감지하면 나머지 태스크를 즉시 취소
  3. 부분 응답 보존: 중단된 응답도 [중단됨] 태그와 함께 대화 히스토리에 보존하여 컨텍스트 유실 방지
  4. 문장 단위 TTS: LLM이 토큰을 스트리밍하는 동안 문장 구분자(., ?, ! 등)를 만나면 즉시 TTS 합성 시작

오류 처리 및 큐 설계

재시도 전략

import asyncio
from dataclasses import dataclass

@dataclass
class RetryConfig:
    max_retries: int = 3
    base_delay: float = 0.5
    max_delay: float = 5.0

async def with_retry(coro_fn, config: RetryConfig = RetryConfig()):
    """지수 백오프 재시도 래퍼."""
    for attempt in range(config.max_retries):
        try:
            return await coro_fn()
        except Exception as e:
            if attempt == config.max_retries - 1:
                raise
            delay = min(
                config.base_delay * (2 ** attempt),
                config.max_delay
            )
            print(f"[Retry] {attempt+1}/{config.max_retries}: {e}, "
                  f"{delay}s 후 재시도")
            await asyncio.sleep(delay)

큐 관리 패턴

class AudioPipelineQueues:
    """파이프라인 큐 관리. 각 단계 간 비동기 큐로 연결."""

    def __init__(self, maxsize: int = 100):
        self.vad_to_stt: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
        self.stt_to_llm: asyncio.Queue = asyncio.Queue(maxsize=10)
        self.llm_to_tts: asyncio.Queue = asyncio.Queue(maxsize=50)
        self.tts_to_speaker: asyncio.Queue = asyncio.Queue(maxsize=20)

    async def flush_all(self):
        """barge-in 시 모든 큐를 비움."""
        for q in [self.vad_to_stt, self.stt_to_llm,
                   self.llm_to_tts, self.tts_to_speaker]:
            while not q.empty():
                try:
                    q.get_nowait()
                except asyncio.QueueEmpty:
                    break

    async def flush_downstream(self):
        """STT 이후 큐만 비움 (barge-in 시 새 입력은 보존)."""
        for q in [self.llm_to_tts, self.tts_to_speaker]:
            while not q.empty():
                try:
                    q.get_nowait()
                except asyncio.QueueEmpty:
                    break

에러 복구 매트릭스

에러 유형발생 단계복구 전략사용자 피드백
마이크 접근 실패Audio Input디바이스 재탐색, 3회 재시도"마이크를 확인해주세요" TTS
VAD 모델 로드 실패초기화ONNX → PyTorch 폴백로그 출력
STT 타임아웃THINKING짧은 오디오로 재시도무응답 → IDLE 복귀
LLM 서버 연결 실패THINKING3회 재시도 + 지수 백오프"잠시 후 다시 시도합니다"
LLM 응답 없음THINKING타임아웃 15초 → IDLE"다시 말씀해주세요"
TTS 합성 실패SPEAKING해당 문장 스킵, 다음 문장일부 누락 가능
오디오 출력 실패SPEAKING출력 디바이스 재탐색로그 경고

실무 구성 가이드

로컬 단독 구성 vs 서버-클라이언트 구성

로컬 단독 (All-in-One)

┌──────────────────────────────────┐
Single MachineMicVADSTTLLMTTS│         → Speaker│                                  │
GPU: RTX 3060+ (12GB VRAM)│  또는 CPU-only (Vosk + Piper)└──────────────────────────────────┘

장점: 네트워크 지연 0ms, 프라이버시 완벽, 인터넷 불필요 단점: 하드웨어 제약, 모델 크기 제한

서버-클라이언트 구성

┌──────────┐     WebSocket/     ┌────────────────┐
Client  │◀────WebRTC────────▶│   GPU Server (라즈베리파이│    audio stream   │ STT + LLM + TTS/ 브라우저) │                  │                │
└──────────┘                    └────────────────┘

장점: 클라이언트 경량화, 강력한 모델 사용 가능, 다중 클라이언트 단점: 네트워크 지연 추가(20~100ms), 서버 비용

하이브리드 구성 (추천)

┌──────────────────┐          ┌────────────────┐
Client       │  text    │   GPU ServerMicVADSTT │─────────▶│  LLM (vLLM)  (Vosk, 로컬)    │  text    │               │
SpeakerTTS   │◀─────────│               │
  (Piper, 로컬)   │          └────────────────┘
└──────────────────┘

VAD/STT/TTS는 로컬(지연 최소화), LLM만 서버에서 처리. 텍스트만 네트워크를 통과하므로 대역폭 부담 최소.

GPU 없는 환경 최적화 팁

GPU가 없는 환경(라즈베리파이, 구형 노트북 등)에서도 실시간 음성 챗봇을 구동할 수 있다.

컴포넌트GPU 구성CPU-only 구성비고
VADSilero VAD (ONNX)Silero VAD (ONNX)동일 — ONNX Runtime은 CPU에서도 1ms
STTfaster-whisper large-v3Vosk (small-ko)Vosk는 50MB 모델로 실시간 스트리밍
LLMOllama (gemma3:4b)Ollama (gemma3:1b, q4_0)또는 원격 서버에 위임
TTSPiper (medium)Piper (low quality)CPU에서도 50x 실시간

추가 최적화 팁:

  1. ONNX Runtime 사용: Silero VAD와 Piper 모두 ONNX 모델 지원. onnxruntime으로 CPU 추론 최적화
  2. INT8 양자화: faster-whisper의 compute_type="int8"로 CPU 성능 2~3배 향상
  3. 오디오 청크 크기 조절: 30ms → 60ms로 늘리면 VAD 호출 횟수 절반 (정확도 트레이드오프)
  4. LLM 양자화: q4_0 또는 q4_K_M 양자화로 RAM 사용량 60% 절감
  5. 배치 처리 회피: CPU 환경에서는 스트리밍 우선. 배치 크기 1로 고정

한국어 품질 개선 팁

한국어 음성 인식과 합성은 영어 대비 추가적인 최적화가 필요하다.

STT 한국어 최적화:

  • faster-whisper 사용 시 language="ko" 명시 (자동 감지 대비 WER 3~5% 개선)
  • initial_prompt="음성 인식 결과입니다." 설정으로 한국어 컨텍스트 힌트 제공
  • beam_size=5 이상 권장 (한국어는 동음이의어가 많아 beam search 효과 큼)
  • VAD 필터 활성화 (vad_filter=True)로 무음 구간 제거

TTS 한국어 최적화:

  • Piper 한국어 모델: ko_KR-kss-medium (KSS 데이터셋 기반)
  • XTTS v2는 한국어 zero-shot 지원하나, 파인튜닝 시 자연스러움 대폭 향상
  • 숫자/영문 혼합 텍스트 전처리: "3시 30분" → "세시 삼십분" 변환 규칙 적용
  • 문장 단위 합성 시 마침표 기준 분할 (한국어는 쉼표 후에도 억양 변화가 적어 문장 단위가 적절)

LLM 한국어 최적화:

  • 시스템 프롬프트에 "한국어로 간결하게 답변하세요. 2~3문장 이내로." 명시
  • Ollama 모델 선택: gemma3:4b(한국어 품질 양호) 또는 EXAONE-3.5-2.4B(LG 한국어 특화)
  • 응답 길이 제한 (max_tokens: 256)으로 barge-in 전 응답 완료 확률 향상

지연시간 벤치마크 기준

실시간 음성 대화에서 사용자가 자연스러움을 느끼는 지연시간 기준은 다음과 같다.

구간별 지연시간 목표

구간목표허용 한계측정 방법
VAD 판별5ms 이내30ms 이내청크 입력 → 결과 반환
STT (발화 종료→텍스트)500ms 이내1500ms 이내silence_end → text_ready
LLM TTFT (첫 토큰)300ms 이내1000ms 이내request → first_token
LLM 전체 응답2000ms 이내5000ms 이내request → last_token
TTS (텍스트→첫 오디오)100ms 이내300ms 이내text_ready → first_audio
총 응답 지연800ms 이내2000ms 이내silence_end → first_audio_out
Barge-in 반응100ms 이내200ms 이내voice_detected → playback_stop

환경별 달성 가능 지연시간

환경총 응답 지연Barge-in 반응비고
RTX 4090 + SSD~500ms~50ms최적 구성
RTX 3060 (12GB)~800ms~60ms권장 최소 GPU
M2 MacBook Pro~900ms~70msMetal 가속
CPU only (i7-12700)~2500ms~80msVosk + Piper 조합
Raspberry Pi 5~4000ms~100ms원격 LLM 필수

구현 체크리스트

프로젝트를 시작하기 전에 아래 체크리스트를 확인하자.

환경 준비

  • Python 3.10+ 설치
  • CUDA 12.x + cuDNN (GPU 사용 시)
  • portaudio 시스템 라이브러리 설치 (brew install portaudio / apt install portaudio19-dev)
  • 마이크 및 스피커 동작 확인
  • Ollama 설치 및 모델 다운로드 (ollama pull gemma3:4b)

모델 다운로드

  • Silero VAD ONNX 모델 (자동 다운로드)
  • faster-whisper 모델 (medium 또는 large-v3)
  • Piper 한국어 모델 (ko_KR-kss-medium)
  • (선택) openWakeWord 모델

파이프라인 검증

  • VAD 단독 테스트: 마이크 → VAD → 콘솔 출력
  • STT 단독 테스트: WAV 파일 → faster-whisper → 텍스트
  • LLM 단독 테스트: curl로 Ollama 스트리밍 응답 확인
  • TTS 단독 테스트: 텍스트 → Piper → WAV 파일 재생
  • 통합 테스트: 전체 파이프라인 동작 확인
  • barge-in 테스트: 응답 중 발화 시 즉시 중단 확인

프로덕션 배포

  • 에러 로깅 설정 (구조화된 로그)
  • 메트릭 수집 (구간별 지연시간)
  • 메모리 누수 테스트 (장시간 구동)
  • 동시 접속 테스트 (서버-클라이언트 구성 시)

트러블슈팅 가이드

증상원인해결 방법
VAD가 항상 speech로 판별threshold 너무 낮음 / 배경 소음threshold를 0.6~0.8로 올림. 소음 환경에서는 0.7 이상 권장
VAD가 발화를 놓침threshold 너무 높음 / 마이크 게인 낮음threshold를 0.3~0.4로 낮춤. 시스템 마이크 볼륨 확인
STT에서 자기 목소리 인식 (에코)스피커 → 마이크 피드백헤드셋 사용 또는 소프트웨어 AEC 적용 (speexdsp 등)
STT 한국어 인식률 낮음잘못된 모델 또는 설정language="ko" 명시, beam_size=5, large-v3 모델 사용
LLM 첫 토큰 지연 >2초모델 콜드 스타트 / GPU 메모리 부족ollama run으로 미리 로드. GPU 메모리 확인
TTS 합성 후 끊김/노이즈샘플레이트 불일치Piper 출력 샘플레이트(보통 22050)와 sd.play 일치 확인
barge-in 반응 느림 (>500ms)VAD 청크 크기 큼 / 모니터 루프 지연청크를 30ms로 줄이고, asyncio.sleep 값 최소화
barge-in이 너무 자주 발동에코 제거 미흡 / VAD 민감도 높음AEC 적용, SPEAKING 중 VAD threshold를 0.8로 높임
메모리 계속 증가오디오 버퍼/큐 미정리flush_all() 호출 확인. 대화 히스토리 길이 제한
asyncio 이벤트 루프 블로킹sd.rec(blocking=True) 사용sd.InputStream 콜백 기반으로 변경

고급 주제: LiveKit Agents로 프로덕션 구축

로컬 프로토타입을 넘어 프로덕션 환경으로 가려면, LiveKit Agents 프레임워크를 고려하자. WebRTC 기반 SFU 서버에 STT/LLM/TTS 파이프라인과 턴 감지를 내장하고 있으며, 브라우저·모바일·IoT 클라이언트를 모두 지원한다.

# LiveKit Agents 기반 barge-in 음성 챗봇 (간략 예시)
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli
from livekit.agents.voice import Agent, AgentSession
from livekit.plugins import silero, openai, deepgram

async def entrypoint(ctx: JobContext):
    await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

    agent = Agent(
        vad=silero.VAD.load(),
        stt=deepgram.STT(language="ko"),
        llm=openai.LLM(
            base_url="http://localhost:11434/v1",  # Ollama
            model="gemma3:4b",
        ),
        tts=openai.TTS(),  # 또는 커스텀 Piper TTS 플러그인
        # barge-in은 기본 활성화 — VAD가 사용자 발화 감지 시
        # 자동으로 LLM/TTS를 중단하고 새 턴 시작
    )
    session = AgentSession()
    await session.start(agent=agent, room=ctx.room)

if __name__ == "__main__":
    cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

LiveKit Agents는 barge-in을 턴 감지 트랜스포머 모델로 처리하여, 단순 VAD 기반보다 정교한 중단 판단(기침·배경소음 구분)을 제공한다.

추가 오픈소스 프레임워크

프레임워크설명barge-in한국어GitHub Stars
Pipecat멀티모달 대화 AI 프레임워크OO5k+
Vocode음성 에이전트 빌더O2.5k+
RealtimeSTT실시간 STT 라이브러리 (Silero VAD 내장)O3k+
ShuoSub-500ms 전화 에이전트OX신규

마치며

오픈소스만으로 실시간 barge-in 음성 챗봇을 구축하는 것은 충분히 가능하다. 핵심은 세 가지다:

  1. 비동기 파이프라인: 모든 컴포넌트를 asyncio로 연결하고, Task.cancel()로 즉시 중단 가능하게 설계
  2. 상태머신 기반 제어: 5개 상태(IDLE/LISTENING/THINKING/SPEAKING/INTERRUPTED)로 명확한 전이 규칙 정의
  3. 지연시간 예산 관리: 총 800ms 이내를 목표로, 각 단계별 예산을 할당하고 측정

가장 실용적인 시작점은 Silero VAD + faster-whisper + Ollama + Piper 조합이다. GPU가 있다면 800ms 이내 응답이 가능하고, CPU만으로도 Vosk + Piper 조합에 원격 LLM을 연결하면 2초대 응답을 달성할 수 있다.

한국어 환경에서는 STT의 language="ko" 명시, LLM의 간결한 응답 유도, TTS의 숫자/영문 전처리가 품질을 좌우한다. 이 글의 코드를 기반으로 각 모듈을 하나씩 교체·개선해가며 자신만의 음성 AI 어시스턴트를 만들어보자.

참고자료

  1. Silero VAD — GitHub — MIT 라이선스 VAD, ONNX/PyTorch 지원
  2. faster-whisper — GitHub — CTranslate2 기반 Whisper 고속 추론
  3. whisper.cpp — GitHub — ggml 기반 Whisper C++ 구현, Metal/CUDA/Vulkan
  4. Vosk — GitHub — 경량 오프라인 STT, 스트리밍 네이티브 지원
  5. Piper TTS — GitHub — VITS 기반 경량 TTS, 다국어 지원
  6. Ollama — 공식 사이트 — 로컬 LLM 원커맨드 서빙
  7. LiveKit Agents — GitHub — 실시간 음성 AI 에이전트 프레임워크
  8. Pipecat — GitHub — 오픈소스 멀티모달 대화 AI 프레임워크
  9. openWakeWord — GitHub — 오픈소스 웨이크워드 감지
  10. RealtimeSTT — GitHub — 실시간 STT with VAD and wake word
  11. vLLM — GitHub — 고성능 LLM 서빙 엔진
  12. Coqui TTS (커뮤니티 포크) — PyPI — XTTS v2 음성 복제 TTS