Skip to content
Published on

OpenTelemetry Collectorパイプライン設計実践ガイド — Receiver、Processor、Exporter

Authors
  • Name
    Twitter
OpenTelemetry Collector Pipeline

はじめに

マイクロサービス環境においてTraces、Metrics、Logsを統合的に収集・処理することはObservabilityの核心です。OpenTelemetry Collectorはベンダー中立なテレメトリパイプラインで、さまざまなソースからデータを収集し、任意のバックエンドに送信します。

この記事では、OTel Collectorのアーキテクチャを理解し、プロダクション環境でのパイプライン設計を解説します。

OTel Collectorアーキテクチャ

パイプライン構造

# データフロー
# ReceiverProcessorExporter
#
# Receiver: データ収集(OTLP、Jaeger、Prometheus、Fluentdなど)
# Processor: データ加工(フィルタリング、変換、バッチング、サンプリング)
# Exporter: データ送信(OTLP、Jaeger、Prometheus、Lokiなど)
#
# 1つのCollectorに複数のパイプラインを構成可能:
# - traces pipeline
# - metrics pipeline
# - logs pipeline

Collectorデプロイパターン

# パターン1: Agent(サイドカー/DaemonSet)
# 各ノード/Podに配置、ローカル収集

# パターン2: Gateway(中央集中)
# クラスター内の独立サービスとして配置、トラフィック集中

# パターン3: Agent + Gateway(推奨)
# Agentがローカル収集Gatewayが中央処理/ルーティング

インストール

Kubernetes(Helm)

# OpenTelemetry Operatorインストール
helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
helm repo update

# Collectorインストール(DaemonSetモード)
helm install otel-collector open-telemetry/opentelemetry-collector \
  --namespace observability \
  --create-namespace \
  --values collector-values.yaml

Docker

docker run -d --name otel-collector \
  -p 4317:4317 \
  -p 4318:4318 \
  -p 8888:8888 \
  -v $(pwd)/otel-collector-config.yaml:/etc/otelcol/config.yaml \
  otel/opentelemetry-collector-contrib:0.96.0

パイプライン設定

基本設定構造

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    timeout: 5s
    send_batch_size: 1000
    send_batch_max_size: 1500

exporters:
  debug:
    verbosity: detailed

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [debug]
    metrics:
      receivers: [otlp]
      processors: [batch]
      exporters: [debug]
    logs:
      receivers: [otlp]
      processors: [batch]
      exporters: [debug]

プロダクションパイプライン

# production-config.yaml
receivers:
  # OTLP(アプリケーションSDKから送信)
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
        max_recv_msg_size_mib: 4
      http:
        endpoint: 0.0.0.0:4318
        cors:
          allowed_origins: ['*']

  # Prometheusスクレイピング
  prometheus:
    config:
      scrape_configs:
        - job_name: 'kubernetes-pods'
          kubernetes_sd_configs:
            - role: pod
          relabel_configs:
            - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
              action: keep
              regex: true

  # ホストメトリクス
  hostmetrics:
    collection_interval: 30s
    scrapers:
      cpu: {}
      memory: {}
      disk: {}
      network: {}
      load: {}

  # Kubernetesイベント
  k8s_events:
    namespaces: [default, production]

processors:
  # バッチング
  batch:
    timeout: 5s
    send_batch_size: 1000

  # メモリ制限
  memory_limiter:
    check_interval: 1s
    limit_mib: 1500
    spike_limit_mib: 512

  # リソース情報の付加
  resourcedetection:
    detectors: [env, system, docker, gcp, aws, azure]
    timeout: 5s

  # K8sメタデータの付加
  k8sattributes:
    auth_type: serviceAccount
    extract:
      metadata:
        - k8s.namespace.name
        - k8s.deployment.name
        - k8s.pod.name
        - k8s.node.name

  # 不要な属性の削除
  attributes:
    actions:
      - key: http.request.header.authorization
        action: delete
      - key: db.statement
        action: hash # 機密クエリのハッシュ化

  # テールサンプリング(tracesのみ)
  tail_sampling:
    decision_wait: 10s
    num_traces: 100000
    policies:
      - name: error-policy
        type: status_code
        status_code:
          status_codes: [ERROR]
      - name: slow-policy
        type: latency
        latency:
          threshold_ms: 1000
      - name: probabilistic-policy
        type: probabilistic
        probabilistic:
          sampling_percentage: 10

  # フィルタリング
  filter:
    metrics:
      exclude:
        match_type: regexp
        metric_names:
          - 'go_.*'
          - 'process_.*'

exporters:
  # Traces → Tempo
  otlp/tempo:
    endpoint: tempo.observability.svc:4317
    tls:
      insecure: true

  # Metrics → Prometheus/Mimir
  prometheusremotewrite:
    endpoint: http://mimir.observability.svc:9009/api/v1/push
    tls:
      insecure: true
    resource_to_telemetry_conversion:
      enabled: true

  # Logs → Loki
  loki:
    endpoint: http://loki.observability.svc:3100/loki/api/v1/push
    default_labels_enabled:
      exporter: true
      job: true

  # デバッグ(トラブルシューティング用)
  debug:
    verbosity: basic

extensions:
  # ヘルスチェック
  health_check:
    endpoint: 0.0.0.0:13133

  # 自己メトリクス
  zpages:
    endpoint: 0.0.0.0:55679

  # pprof(プロファイリング)
  pprof:
    endpoint: 0.0.0.0:1777

service:
  extensions: [health_check, zpages, pprof]

  pipelines:
    traces:
      receivers: [otlp]
      processors:
        [memory_limiter, resourcedetection, k8sattributes, attributes, tail_sampling, batch]
      exporters: [otlp/tempo]

    metrics:
      receivers: [otlp, prometheus, hostmetrics]
      processors: [memory_limiter, resourcedetection, k8sattributes, filter, batch]
      exporters: [prometheusremotewrite]

    logs:
      receivers: [otlp, k8s_events]
      processors: [memory_limiter, resourcedetection, k8sattributes, attributes, batch]
      exporters: [loki]

  telemetry:
    logs:
      level: info
    metrics:
      address: 0.0.0.0:8888

Receiver詳細

OTLP Receiver

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
        max_recv_msg_size_mib: 4
        keepalive:
          server_parameters:
            max_connection_idle: 11s
            max_connection_age: 30s
      http:
        endpoint: 0.0.0.0:4318

Filelog Receiver(ログファイル収集)

receivers:
  filelog:
    include:
      - /var/log/pods/*/*/*.log
    exclude:
      - /var/log/pods/*/otel-collector*/*.log
    start_at: beginning
    include_file_path: true
    operators:
      - type: router
        routes:
          - output: parse_json
            expr: 'body matches "^\\{"'
          - output: parse_plain
            expr: 'body matches "^[^{]"'
      - id: parse_json
        type: json_parser
        timestamp:
          parse_from: attributes.timestamp
          layout: '%Y-%m-%dT%H:%M:%S.%fZ'
      - id: parse_plain
        type: regex_parser
        regex: '^(?P<timestamp>\S+) (?P<level>\S+) (?P<message>.*)'

Processor詳細

テールサンプリング(重要!)

processors:
  tail_sampling:
    decision_wait: 10s
    num_traces: 100000
    expected_new_traces_per_sec: 1000
    policies:
      # エラーは100%収集
      - name: errors
        type: status_code
        status_code:
          status_codes: [ERROR]

      # 1秒以上遅いリクエストは100%収集
      - name: slow-traces
        type: latency
        latency:
          threshold_ms: 1000

      # 特定サービスは100%収集
      - name: critical-services
        type: string_attribute
        string_attribute:
          key: service.name
          values: [payment-service, auth-service]

      # 残りは5%のみ収集
      - name: probabilistic
        type: probabilistic
        probabilistic:
          sampling_percentage: 5

      # 複合ポリシー
      - name: composite-policy
        type: composite
        composite:
          max_total_spans_per_second: 1000
          policy_order: [errors, slow-traces, critical-services, probabilistic]
          rate_allocation:
            - policy: errors
              percent: 30
            - policy: slow-traces
              percent: 30
            - policy: critical-services
              percent: 20
            - policy: probabilistic
              percent: 20

Transform Processor

processors:
  transform:
    trace_statements:
      - context: span
        statements:
          # 属性の追加
          - set(attributes["deployment.environment"], "production")
          # 属性の変換
          - replace_pattern(attributes["http.url"], "password=\\w+", "password=***")
          # 条件付き処理
          - set(attributes["error.category"], "timeout") where attributes["error.type"] == "DeadlineExceeded"

    metric_statements:
      - context: datapoint
        statements:
          - set(attributes["env"], "prod")

    log_statements:
      - context: log
        statements:
          # ログ本文から情報を抽出
          - set(attributes["user_id"], ExtractPatterns(body, "user_id=(?P<user_id>\\w+)"))

Kubernetesデプロイ

Agent(DaemonSet)+ Gatewayパターン

# agent-config.yaml(DaemonSet)
apiVersion: opentelemetry.io/v1beta1
kind: OpenTelemetryCollector
metadata:
  name: otel-agent
  namespace: observability
spec:
  mode: daemonset
  config:
    receivers:
      otlp:
        protocols:
          grpc:
            endpoint: 0.0.0.0:4317
      hostmetrics:
        collection_interval: 30s
        scrapers:
          cpu: {}
          memory: {}

    processors:
      memory_limiter:
        limit_mib: 512
      batch:
        timeout: 5s

    exporters:
      # Gatewayへ送信
      otlp:
        endpoint: otel-gateway.observability.svc:4317
        tls:
          insecure: true

    service:
      pipelines:
        traces:
          receivers: [otlp]
          processors: [memory_limiter, batch]
          exporters: [otlp]
        metrics:
          receivers: [otlp, hostmetrics]
          processors: [memory_limiter, batch]
          exporters: [otlp]
---
# gateway-config.yaml(Deployment)
apiVersion: opentelemetry.io/v1beta1
kind: OpenTelemetryCollector
metadata:
  name: otel-gateway
  namespace: observability
spec:
  mode: deployment
  replicas: 3
  config:
    receivers:
      otlp:
        protocols:
          grpc:
            endpoint: 0.0.0.0:4317

    processors:
      memory_limiter:
        limit_mib: 2048
      tail_sampling:
        decision_wait: 10s
        policies:
          - name: errors
            type: status_code
            status_code:
              status_codes: [ERROR]
          - name: probabilistic
            type: probabilistic
            probabilistic:
              sampling_percentage: 10
      batch:
        timeout: 10s
        send_batch_size: 5000

    exporters:
      otlp/tempo:
        endpoint: tempo.observability.svc:4317
        tls:
          insecure: true
      prometheusremotewrite:
        endpoint: http://mimir.observability.svc:9009/api/v1/push
      loki:
        endpoint: http://loki.observability.svc:3100/loki/api/v1/push

    service:
      pipelines:
        traces:
          receivers: [otlp]
          processors: [memory_limiter, tail_sampling, batch]
          exporters: [otlp/tempo]
        metrics:
          receivers: [otlp]
          processors: [memory_limiter, batch]
          exporters: [prometheusremotewrite]
        logs:
          receivers: [otlp]
          processors: [memory_limiter, batch]
          exporters: [loki]

トラブルシューティング

自己メトリクスの確認

# Collector自己メトリクス(ポート8888)
curl http://localhost:8888/metrics | grep otelcol

# 主要メトリクス:
# otelcol_receiver_accepted_spans - 受信されたspan数
# otelcol_receiver_refused_spans - 拒否されたspan数
# otelcol_processor_dropped_spans - ドロップされたspan数
# otelcol_exporter_sent_spans - 送信されたspan数
# otelcol_exporter_send_failed_spans - 送信失敗span数

zPagesによるデバッグ

# http://localhost:55679/debug/tracez — 最近のtrace確認
# http://localhost:55679/debug/pipelinez — パイプラインステータス

まとめ

OpenTelemetry Collectorパイプライン設計の要点:

  1. Agent + Gatewayパターン:ローカル収集+中央処理で効率的な運用
  2. テールサンプリング:エラー/遅いリクエストは100%、残りは確率的サンプリングでコスト削減
  3. Memory Limiterは必須:OOM防止のためのメモリ制限
  4. Processorの順序が重要:memory_limiter → sampling → batch の順序を推奨
  5. ベンダー中立:バックエンド変更時はExporterのみ交換

クイズ(6問)

Q1. OTel Collectorパイプラインの3つの構成要素は? Receiver、Processor、Exporter

Q2. Agent + Gatewayパターンでのそれぞれの役割は? Agent:各ノードでのローカル収集、Gateway:中央での処理/ルーティング/送信

Q3. テールサンプリングがヘッドサンプリングより優れている理由は? トレース全体を確認してからサンプリング判断するため、エラー/遅いリクエストを見逃さない

Q4. memory_limiter Processorをパイプラインの最初に配置する理由は? 受信データが多い場合にOOMを防止するため、最初にメモリをチェック

Q5. Collectorの自己メトリクスを確認する方法は? ポート8888の/metricsエンドポイントまたはzPages(ポート55679)

Q6. batch Processorのtimeoutとsend_batch_sizeの関係は? timeout時間に到達するかsend_batch_sizeに到達した場合にバッチを送信(先に発生した条件)