Skip to content
Published on

OpenTelemetry Collector本番運用ガイド:パイプラインアーキテクチャ、カスタムプロセッサ開発、スケーリング戦略

Authors
  • Name
    Twitter
OpenTelemetry Collector

はじめに

OpenTelemetry Collectorは、ベンダー非依存の現代オブザーバビリティ基盤の中核です。構成可能なパイプラインモデルを通じて、テレメトリデータ(トレース、メトリクス、ログ)の受信、処理、エクスポートを行います。基本的なCollectorの起動は簡単ですが、数百のマイクロサービスと毎秒数百万のスパンを処理する本番規模での運用には、慎重なアーキテクチャ設計、チューニングされたプロセッサ、および意図的なスケーリング戦略が求められます。

本ガイドでは、本番Collectorデプロイメントのライフサイクル全体をカバーします:パイプラインアーキテクチャの内部構造、Receiver/Processor/Exporter構成、GoによるカスタムProcessor開発、Agent vs Gatewayデプロイメントパターン、Memory LimiterとBatch Processorのチューニング、テールサンプリング戦略、障害復旧手順、および代替テレメトリパイプラインとの詳細比較です。

テレメトリパイプライン比較

Collector自体に深入りする前に、他の一般的なテレメトリツールとの位置づけを理解しておく価値があります。

機能OTel CollectorFluentdVectorTelegraf
言語GoRuby/CRustGo
シグナルタイプトレース、メトリクス、ログログのみログ、メトリクス、トレース主にメトリクス
プロトコルサポートOTLP、Jaeger、Zipkin、PrometheusなどSyslog、HTTP、TCP、ファイルテールSyslog、HTTP、ファイル、KafkaSNMP、StatsD、Prometheusなど
プラグインエコシステムcontribで100以上のReceiver/Exporter800以上のコミュニティプラグイン100以上のSource/Sink300以上のInput/Outputプラグイン
メモリ(10Kイベント/秒)100-200 MB200-400 MB50-100 MB80-150 MB
処理モデルパイプライン(Receiver-Processor-Exporter)Filter/Match/OutputチェーンTransform/Sink DAGInput/Processor/Aggregator/Output
CNCFステータスGraduated(OpenTelemetry)GraduatedSandbox(過去)非CNCF
最適用途フルスタックオブザーバビリティログ集約高性能ログルーティングメトリクス収集
ベンダーロックインなし(設計上ベンダー非依存)低(InfluxDB親和性)

Collectorは、3つすべてのシグナルタイプにわたる統一テレメトリ収集を単一デプロイメントで必要とする場合に最適です。Fluentdは複雑なパースを伴う純粋なログパイプラインに強みがあります。Vectorは最小限のリソース使用量で生のスループットに優れています。Telegrafは特にInfluxDBとの組み合わせでメトリクス重視のワークロードに最適です。

パイプラインアーキテクチャの詳細

Collectorパイプラインは、テレメトリデータが取り込みからエクスポートまでたどるパスを定義します。各パイプラインは1つのシグナルタイプ(トレース、メトリクス、またはログ)で動作します。

コアコンポーネント

Receiverはネットワークポートでリッスンするか、エンドポイントを能動的にスクレイプしてテレメトリデータを取り込みます。単一のReceiverインスタンスは内部のファンアウトコンシューマを通じて複数のパイプラインにデータを送信できます。

Processorはデータを順番に変換します。フィルタリング、エンリッチメント、バッチング、サンプリング、またはレート制限が可能です。Processorの順序は重要です:Memory Limiterを常に最初に配置する必要があります。

Exporterは処理済みデータをバックエンドに送信します。複数のパイプラインが同じExporterインスタンスを共有できます。Exporterはリトライロジックとキューイングを処理します。

Connector(Collector v0.86+で導入)はExporterとReceiverの両方として機能し、パイプライン間のデータフローを可能にします。一般的なユースケースは、トレーススパンからREDメトリクスを生成するspanmetricsコネクタです。

パイプライン構成

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318
  prometheus:
    config:
      scrape_configs:
        - job_name: 'kubernetes-pods'
          kubernetes_sd_configs:
            - role: pod
          relabel_configs:
            - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
              action: keep
              regex: true
  filelog:
    include:
      - /var/log/pods/*/*/*.log
    operators:
      - type: json_parser
        timestamp:
          parse_from: attributes.time
          layout: '%Y-%m-%dT%H:%M:%S.%LZ'

processors:
  memory_limiter:
    check_interval: 1s
    limit_mib: 1800
    spike_limit_mib: 400
  batch:
    timeout: 1s
    send_batch_size: 1024
    send_batch_max_size: 2048
  attributes:
    actions:
      - key: environment
        value: production
        action: upsert
      - key: collector.version
        value: '0.98.0'
        action: insert
  resource:
    attributes:
      - key: service.namespace
        value: platform
        action: upsert

exporters:
  otlp/tempo:
    endpoint: tempo-distributor.observability:4317
    tls:
      insecure: true
    retry_on_failure:
      enabled: true
      initial_interval: 5s
      max_interval: 30s
      max_elapsed_time: 300s
    sending_queue:
      enabled: true
      num_consumers: 10
      queue_size: 5000
  prometheusremotewrite:
    endpoint: http://mimir-distributor.observability:9009/api/v1/push
    resource_to_telemetry_conversion:
      enabled: true
  otlp/loki:
    endpoint: loki-distributor.observability:3100
    tls:
      insecure: true

connectors:
  spanmetrics:
    histogram:
      explicit:
        buckets: [5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2.5s, 5s, 10s]
    dimensions:
      - name: http.method
      - name: http.status_code
    metrics_flush_interval: 15s

service:
  telemetry:
    logs:
      level: info
    metrics:
      address: 0.0.0.0:8888
  pipelines:
    traces:
      receivers: [otlp]
      processors: [memory_limiter, batch]
      exporters: [otlp/tempo, spanmetrics]
    metrics:
      receivers: [otlp, prometheus, spanmetrics]
      processors: [memory_limiter, batch]
      exporters: [prometheusremotewrite]
    logs:
      receivers: [otlp, filelog]
      processors: [memory_limiter, attributes, batch]
      exporters: [otlp/loki]

この構成は、spanmetricsコネクタがトレースからメトリクスへの橋渡しをする完全な3シグナルパイプラインを示しています。

Memory LimiterとBatch Processor

この2つのProcessorは本番環境では必須です。すべてのパイプラインに両方を含める必要があります。

Memory Limiter

Memory LimiterはCollectorプロセスのヒープ使用量を監視してOOMキルを防止します。メモリがlimit_mibを超えると、Collectorは受信データの拒否とガベージコレクションのトリガーを開始します。spike_limit_mibは想定される最大スパイクを定義するため、実効ソフトリミットはlimit_mib - spike_limit_mibとなります。

ベストプラクティス:

  • limit_mibをコンテナメモリリミットの70-80%に設定します。2Giポッドの場合、1400-1600 MiBを使用します。
  • spike_limit_miblimit_mibの20-25%に設定します。
  • check_intervalを1sに設定します。より短い間隔はCPU使用量を増加させ、より長い間隔はオーバーシュートのリスクがあります。
  • Memory Limiterを常にProcessorチェーンの最初に配置します。

Batch Processor

Batch Processorはエクスポート前にテレメトリをバッチにグループ化し、リクエストあたりのオーバーヘッドを削減し、圧縮率を向上させます。

processors:
  batch:
    # この数のアイテムが蓄積されたらバッチを送信
    send_batch_size: 1024
    # 過大なバッチを防ぐハードキャップ
    send_batch_max_size: 2048
    # この期間後にバッファされたものを送信
    timeout: 1s

ガイドライン:

  • send_batch_sizeは512-2048がほとんどのワークロードで適切です。
  • timeoutは1-5sでレイテンシとスループットのバランスをとります。リアルタイムアラートパイプラインではより低い値にします。
  • Batch ProcessorはMemory Limiterおよびフィルタリング/サンプリングProcessorの後に配置します。ドロップ前のデータをバッチングするのはCPUの無駄です。

テールサンプリング戦略

テールサンプリングは、トレースのすべてのスパンを収集した後にサンプリング決定を行います。これによりインテリジェントな決定が可能になります:すべてのエラートレースを保持、遅いトレースを保持、正常なトレースを低レートでサンプリング。テールサンプリングProcessorはopentelemetry-collector-contribディストリビューションに含まれています。

重要な制約

同じトレースに属するすべてのスパンは、同じCollectorインスタンスに到着する必要があります。Gatewayデプロイメントでは、AgentティアでトレースIDルーティング付きのロードバランシングExporterを使用します。

# Agentティア構成:トレースIDでGatewayにルーティング
exporters:
  loadbalancing:
    protocol:
      otlp:
        tls:
          insecure: true
    resolver:
      dns:
        hostname: otel-gateway-headless.observability
        port: 4317

# Gatewayティア構成:テールサンプリング
processors:
  tail_sampling:
    decision_wait: 10s
    num_traces: 100000
    expected_new_traces_per_sec: 2000
    policies:
      # ポリシー1:エラートレースを常に保持
      - name: errors-policy
        type: status_code
        status_code:
          status_codes: [ERROR]
      # ポリシー2:遅いトレースを常に保持(2秒超)
      - name: latency-policy
        type: latency
        latency:
          threshold_ms: 2000
      # ポリシー3:重要サービスに関連するトレースを保持
      - name: critical-services
        type: string_attribute
        string_attribute:
          key: service.name
          values: [payment-service, auth-service, order-service]
      # ポリシー4:それ以外の確率的サンプリング
      - name: probabilistic-policy
        type: probabilistic
        probabilistic:
          sampling_percentage: 5
      # 複合ポリシー:上記を組み合わせる
      - name: composite-policy
        type: composite
        composite:
          max_total_spans_per_second: 5000
          policy_order: [errors-policy, latency-policy, critical-services, probabilistic-policy]
          rate_allocation:
            - policy: errors-policy
              percent: 40
            - policy: latency-policy
              percent: 25
            - policy: critical-services
              percent: 25
            - policy: probabilistic-policy
              percent: 10

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [memory_limiter, tail_sampling, batch]
      exporters: [otlp/tempo]

テールサンプリングのメモリサイジング

decision_waitパラメータは、Collectorがサンプリング決定を行う前にスパンをメモリに保持する時間を決定します。num_traces: 100000で平均スパンサイズが5 KBの場合、トレースバッファだけで約500 MBが必要です。Gatewayポッドのサイズを適切に設定してください:decision_waitが10sで毎秒2000の新しいトレースを受信する場合、同時に20,000トレースを保持しています。トレースあたりの平均スパン数とスパンサイズを掛けてメモリを見積もります。

カスタムProcessor開発

組み込みProcessorが要件を満たさない場合、Collector SDKを使用してGoでカスタムProcessorを作成できます。以下は、すべてのテレメトリにデプロイメントメタデータを追加するProcessorの完全な例です。

プロジェクト構造

# Processorモジュールの作成
mkdir -p processor/deploymetadataprocessor
cd processor/deploymetadataprocessor

# Goモジュールの初期化
go mod init github.com/myorg/otelcol-custom/processor/deploymetadataprocessor
go get go.opentelemetry.io/collector/processor
go get go.opentelemetry.io/collector/pdata

設定

// config.go
package deploymetadataprocessor

type Config struct {
    ClusterName    string `mapstructure:"cluster_name"`
    Region         string `mapstructure:"region"`
    DeploymentEnv  string `mapstructure:"deployment_env"`
    AddBuildInfo   bool   `mapstructure:"add_build_info"`
}

func createDefaultConfig() *Config {
    return &Config{
        ClusterName:   "default",
        Region:        "us-east-1",
        DeploymentEnv: "production",
        AddBuildInfo:  true,
    }
}

ファクトリ

// factory.go
package deploymetadataprocessor

import (
    "context"

    "go.opentelemetry.io/collector/component"
    "go.opentelemetry.io/collector/consumer"
    "go.opentelemetry.io/collector/processor"
)

const (
    typeStr   = "deploy_metadata"
    stability = component.StabilityLevelDevelopment
)

func NewFactory() processor.Factory {
    return processor.NewFactory(
        component.MustNewType(typeStr),
        func() component.Config {
            return createDefaultConfig()
        },
        processor.WithTraces(createTracesProcessor, stability),
        processor.WithMetrics(createMetricsProcessor, stability),
        processor.WithLogs(createLogsProcessor, stability),
    )
}

func createTracesProcessor(
    ctx context.Context,
    set processor.Settings,
    cfg component.Config,
    next consumer.Traces,
) (processor.Traces, error) {
    pCfg := cfg.(*Config)
    return &deployMetadataTracesProcessor{
        config: pCfg,
        next:   next,
        logger: set.Logger,
    }, nil
}

// createMetricsProcessorとcreateLogsProcessorも同じパターンに従います

Processor実装

// processor_traces.go
package deploymetadataprocessor

import (
    "context"

    "go.opentelemetry.io/collector/consumer"
    "go.opentelemetry.io/collector/pdata/ptrace"
    "go.uber.org/zap"
)

type deployMetadataTracesProcessor struct {
    config *Config
    next   consumer.Traces
    logger *zap.Logger
}

func (p *deployMetadataTracesProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
    rss := td.ResourceSpans()
    for i := 0; i < rss.Len(); i++ {
        rs := rss.At(i)
        attrs := rs.Resource().Attributes()
        attrs.PutStr("deploy.cluster", p.config.ClusterName)
        attrs.PutStr("deploy.region", p.config.Region)
        attrs.PutStr("deploy.env", p.config.DeploymentEnv)
        if p.config.AddBuildInfo {
            attrs.PutStr("deploy.collector_build", "custom-v1.2.0")
        }
    }
    return p.next.ConsumeTraces(ctx, td)
}

func (p *deployMetadataTracesProcessor) Capabilities() consumer.Capabilities {
    return consumer.Capabilities{MutatesData: true}
}

func (p *deployMetadataTracesProcessor) Start(_ context.Context, _ component.Host) error {
    p.logger.Info("deploy_metadata processor started",
        zap.String("cluster", p.config.ClusterName),
        zap.String("region", p.config.Region))
    return nil
}

func (p *deployMetadataTracesProcessor) Shutdown(_ context.Context) error {
    return nil
}

OCBによるビルド

OpenTelemetry Collector Builder(ocb)を使用して、カスタムProcessorを含むカスタムCollectorバイナリをコンパイルします。

# builder-config.yaml
dist:
  name: custom-otelcol
  description: デプロイメタデータProcessor付きカスタムOTel Collector
  output_path: ./dist
  version: 1.0.0

receivers:
  - gomod: go.opentelemetry.io/collector/receiver/otlpreceiver v0.98.0
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.98.0

processors:
  - gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.98.0
  - gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.98.0
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.98.0
  - gomod: github.com/myorg/otelcol-custom/processor/deploymetadataprocessor v0.0.1

exporters:
  - gomod: go.opentelemetry.io/collector/exporter/otlpexporter v0.98.0
# カスタムCollectorのビルド
ocb --config builder-config.yaml

# 設定ファイルで実行
./dist/custom-otelcol --config otel-collector-config.yaml

Agent vs Gatewayデプロイメント

本番環境では通常、Collectorを2つのティアでデプロイします:各ノードのAgentと集中処理用のGatewayです。

Agentモード(DaemonSet)

AgentはKubernetes DaemonSetとして実行され、ノードごとに1つのPodがあります。その役割は軽量です:ローカルテレメトリの受信、ノードレベルのメタデータの追加、Gatewayへの転送です。

# agent-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: otel-agent
  namespace: observability
spec:
  selector:
    matchLabels:
      app: otel-agent
  template:
    metadata:
      labels:
        app: otel-agent
    spec:
      containers:
        - name: otel-agent
          image: otel/opentelemetry-collector-contrib:0.98.0
          args: ['--config=/etc/otel/config.yaml']
          resources:
            requests:
              cpu: 100m
              memory: 256Mi
            limits:
              cpu: 500m
              memory: 512Mi
          ports:
            - containerPort: 4317 # OTLP gRPC
            - containerPort: 4318 # OTLP HTTP
          volumeMounts:
            - name: config
              mountPath: /etc/otel
            - name: varlog
              mountPath: /var/log
              readOnly: true
      volumes:
        - name: config
          configMap:
            name: otel-agent-config
        - name: varlog
          hostPath:
            path: /var/log

Agent構成は最小限にすべきです:OTLP Receiver、Memory Limiter、Resource Detection Processor(k8sメタデータ追加用)、およびGatewayを指すロードバランシングExporterです。

Gatewayモード(Deployment)

GatewayはKubernetes DeploymentとしてHorizontal Pod Autoscalerとともに実行されます。重い処理を担当します:テールサンプリング、スパンからメトリクスへの変換、属性エンリッチメント、バックエンドへのエクスポートです。

# gateway-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: otel-gateway
  namespace: observability
spec:
  replicas: 3
  selector:
    matchLabels:
      app: otel-gateway
  template:
    metadata:
      labels:
        app: otel-gateway
    spec:
      containers:
        - name: otel-gateway
          image: custom-otelcol:1.0.0
          args: ['--config=/etc/otel/config.yaml']
          resources:
            requests:
              cpu: '1'
              memory: 4Gi
            limits:
              cpu: '2'
              memory: 6Gi
          ports:
            - containerPort: 4317
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: otel-gateway-hpa
  namespace: observability
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: otel-gateway
  minReplicas: 3
  maxReplicas: 12
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 60
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 70

スケーリング判断フレームワーク

  • 1,000スパン/秒未満:スタンドアロンモードの単一Collectorで十分です。
  • 1,000-10,000スパン/秒:各ノードのAgentモードと小規模Gatewayクラスタ(2-3レプリカ)。
  • 10,000-100,000スパン/秒:HPAを備えたAgent-Gatewayパターン、トレースIDアフィニティ用のロードバランシングExporter、Gatewayでのテールサンプリング。
  • 100,000スパン/秒超:シャーディングを伴うマルチティアGateway。シグナルタイプまたはテナントごとにパーティション分割された複数のGatewayクラスタを検討します。Memory Limiterによるバックプレッシャー伝搬を実装します。

障害ケースと復旧手順

ケース1:GatewayのOOMキル

症状:GatewayポッドがOOMKilledステータスで繰り返し再起動する。

根本原因:テールサンプリングのdecision_waitがトラフィック量に対して長すぎるか、num_tracesが不足している。

復旧

# 1. 現在のメモリ使用量を確認
kubectl top pods -n observability -l app=otel-gateway

# 2. OOMイベントを調査
kubectl describe pod otel-gateway-xxx -n observability | grep -A5 "Last State"

# 3. 即時緩和:decision_waitを短縮しレプリカを増加
kubectl scale deployment otel-gateway -n observability --replicas=6

# 4. 長期対策:テールサンプリング設定を調整
# decision_waitを10sから5sに短縮
# num_tracesを100000から200000に増加
# Podメモリリミットを8Giに、memory_limiterのlimit_mibを5600に増加

ケース2:エクスポート障害時のデータロス

症状:バックエンド(Tempo/Mimir)が一時的に利用不可。テレメトリデータが失われる。

復旧:ファイルストレージ拡張で永続キューイングを構成します。

extensions:
  file_storage:
    directory: /var/otel/queue
    timeout: 10s
    compaction:
      on_start: true
      directory: /var/otel/queue/compaction

exporters:
  otlp/tempo:
    endpoint: tempo-distributor:4317
    sending_queue:
      enabled: true
      num_consumers: 10
      queue_size: 10000
      storage: file_storage
    retry_on_failure:
      enabled: true
      initial_interval: 5s
      max_interval: 60s
      max_elapsed_time: 600s

service:
  extensions: [file_storage]

この構成はエクスポートキューをディスクに永続化します。Collectorが再起動するかバックエンドが回復すると、キューに入ったデータが送信されます。

ケース3:高カーディナリティの爆発

症状:メトリクスパイプラインのメモリが際限なく増大する。Prometheus Remote Writeがデータを拒否する。

復旧:フィルタProcessorを使用して、エクスポート前に高カーディナリティ属性を削除します。

processors:
  filter:
    metrics:
      exclude:
        match_type: regexp
        metric_names:
          - '.*_temp_.*'
        resource_attributes:
          - key: service.instance.id
            value: '.*debug.*'
  transform:
    metric_statements:
      - context: datapoint
        statements:
          - delete_key(attributes, "http.url")
          - delete_key(attributes, "user.id")

ケース4:AgentがGatewayに到達できない

症状:Agentが接続拒否エラーをログ出力する。テレメトリがサイレントにドロップされる。

復旧:Agentで境界付きリトライ付きの送信キューを有効にします。Readinessプローブ用のヘルスチェック拡張を追加します。

extensions:
  health_check:
    endpoint: 0.0.0.0:13133
    path: /health

exporters:
  otlp:
    endpoint: otel-gateway.observability:4317
    sending_queue:
      enabled: true
      queue_size: 2000
    retry_on_failure:
      enabled: true
      initial_interval: 1s
      max_interval: 30s
      max_elapsed_time: 120s

service:
  extensions: [health_check]

本番レディネスチェックリスト

  • Memory Limiterがすべてのパイプラインの最初のProcessorである
  • Batch ProcessorがExporterの直前の最後のProcessorである
  • 送信キューがすべてのExporterでリトライ構成とともに有効になっている
  • ヘルスチェック拡張がKubernetesのLiveness/Readinessプローブ用にデプロイされている
  • Collectorの自己オブザーバビリティメトリクス(ポート8888)がPrometheusによってスクレイプされている
  • Kubernetesマニフェストにリソースリミットが設定されている(requestsとlimits)
  • Gatewayデプロイメント用にHPAが構成されている
  • テールサンプリングがアクティブな場合、トレースIDルーティング付きのロードバランシングExporterが使用されている
  • 重要パイプラインでファイルストレージ拡張が永続キューイング用に構成されている
  • Collectorのキュー深度、ドロップされたスパン、エクスポートレイテンシを監視するダッシュボード
  • otelcol_exporter_send_failed_spans_totalotelcol_processor_refused_spans_totalのアラート設定

参考文献