Skip to content
Published on

OpenTelemetry分散トレーシング実践ガイド:計装・収集・分析パイプラインの構築と運用

Authors
  • Name
    Twitter
OpenTelemetry Distributed Tracing

はじめに

マイクロサービスアーキテクチャにおいて、一つのユーザーリクエストは数十のサービスを経由して処理される。どのサービスで遅延が発生したか、どの呼び出し経路でエラーが伝播したかを把握するには、分散トレーシングが不可欠である。OpenTelemetry(OTel)はCNCF卒業プロジェクトで、トレース(Traces)、メトリクス(Metrics)、ログ(Logs)を統合されたAPIとSDKで収集するオブザーバビリティ標準である。

本記事では、OpenTelemetryのアーキテクチャとコア概念から、Python/Node.js/Go言語別の計装方法、Collectorパイプライン設定、サンプリング戦略、バックエンド比較、そして本番環境で遭遇する障害事例とチェックリストまで、実践運用に必要なすべての内容を解説する。

OpenTelemetryアーキテクチャ概要

コアコンポーネント

OpenTelemetryは以下のコンポーネントで構成されている。

  • API: ベンダー非依存の計装インターフェース。ライブラリ開発者が使用する。
  • SDK: APIの具体的な実装体。サンプリング、バッチ処理、エクスポートなどを担当する。
  • Collector: テレメトリデータを受信(receive)、処理(process)、エクスポートする(export)スタンドアロンプロセスである。
  • Exporters: 収集されたデータをJaeger、Tempo、Datadogなどのバックエンドに送信するモジュールである。
  • Instrumentation Libraries: 自動計装をサポートするフレームワーク別ライブラリである。

トレースモデル

分散トレーシングのコア概念は以下の通りである。

概念説明
Trace一つのリクエストに対する全体経路。複数のSpanで構成される
Spanトレース内の個別の作業単位
SpanContextSpanID、TraceID、TraceFlags、TraceStateを含むコンテキスト
TraceIDトレースを一意に識別する128ビットID
SpanIDスパンを一意に識別する64ビットID
Parent Span現在のスパンを作成した上位スパン
Baggageトレース全体にわたって伝播されるキーバリューペア
Attributesスパンに付与されるメタデータ(key-value)
Eventsスパン内で発生した時点ベースのイベント(ログに類似)
Links他のトレース/スパンとの因果関係接続

手動計装(Manual Instrumentation)

Python計装

# pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes

# リソース定義
resource = Resource.create({
    ResourceAttributes.SERVICE_NAME: "order-service",
    ResourceAttributes.SERVICE_VERSION: "1.2.0",
    ResourceAttributes.DEPLOYMENT_ENVIRONMENT: "production",
})

# TracerProvider設定
provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)

# Tracer作成
tracer = trace.get_tracer("order-service", "1.2.0")


# 使用例:注文処理
def create_order(customer_id: str, items: list) -> dict:
    with tracer.start_as_current_span(
        "create_order",
        attributes={
            "customer.id": customer_id,
            "order.item_count": len(items),
        },
    ) as span:
        try:
            # 在庫確認
            with tracer.start_as_current_span("check_inventory") as inventory_span:
                available = check_inventory(items)
                inventory_span.set_attribute("inventory.all_available", available)

            if not available:
                span.set_status(trace.StatusCode.ERROR, "Inventory not available")
                raise ValueError("Some items are out of stock")

            # 決済処理
            with tracer.start_as_current_span("process_payment") as payment_span:
                payment_result = process_payment(customer_id, items)
                payment_span.set_attribute("payment.transaction_id", payment_result["tx_id"])
                payment_span.add_event("payment_completed", {
                    "amount": payment_result["amount"],
                    "currency": "KRW",
                })

            # 注文保存
            with tracer.start_as_current_span("save_order"):
                order = save_to_database(customer_id, items, payment_result)

            span.set_attribute("order.id", order["id"])
            return order

        except Exception as e:
            span.set_status(trace.StatusCode.ERROR, str(e))
            span.record_exception(e)
            raise

Node.js計装

// npm install @opentelemetry/api @opentelemetry/sdk-node
// npm install @opentelemetry/exporter-trace-otlp-grpc
// npm install @opentelemetry/semantic-conventions

const { NodeSDK } = require('@opentelemetry/sdk-node')
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-grpc')
const { Resource } = require('@opentelemetry/resources')
const { ATTR_SERVICE_NAME, ATTR_SERVICE_VERSION } = require('@opentelemetry/semantic-conventions')
const { trace, SpanStatusCode } = require('@opentelemetry/api')

// SDK初期化
const sdk = new NodeSDK({
  resource: new Resource({
    [ATTR_SERVICE_NAME]: 'user-service',
    [ATTR_SERVICE_VERSION]: '2.1.0',
  }),
  traceExporter: new OTLPTraceExporter({
    url: 'http://otel-collector:4317',
  }),
})

sdk.start()

const tracer = trace.getTracer('user-service', '2.1.0')

// 使用例:ユーザー検索
async function getUser(userId) {
  return tracer.startActiveSpan('getUser', async (span) => {
    try {
      span.setAttribute('user.id', userId)

      // DB検索
      const user = await tracer.startActiveSpan('db.query', async (dbSpan) => {
        dbSpan.setAttribute('db.system', 'postgresql')
        dbSpan.setAttribute('db.statement', 'SELECT * FROM users WHERE id = ?')
        const result = await db.query('SELECT * FROM users WHERE id = $1', [userId])
        dbSpan.setAttribute('db.row_count', result.rows.length)
        dbSpan.end()
        return result.rows[0]
      })

      if (!user) {
        span.setStatus({ code: SpanStatusCode.ERROR, message: 'User not found' })
        return null
      }

      // キャッシュ更新
      await tracer.startActiveSpan('cache.set', async (cacheSpan) => {
        cacheSpan.setAttribute('cache.system', 'redis')
        cacheSpan.setAttribute('cache.key', `user:${userId}`)
        await redis.set(`user:${userId}`, JSON.stringify(user), 'EX', 3600)
        cacheSpan.end()
      })

      span.setStatus({ code: SpanStatusCode.OK })
      return user
    } catch (error) {
      span.setStatus({ code: SpanStatusCode.ERROR, message: error.message })
      span.recordException(error)
      throw error
    } finally {
      span.end()
    }
  })
}

Go計装

package main

import (
    "context"
    "log"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/codes"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
    "go.opentelemetry.io/otel/trace"
)

func initTracer() (*sdktrace.TracerProvider, error) {
    exporter, err := otlptracegrpc.New(
        context.Background(),
        otlptracegrpc.WithEndpoint("otel-collector:4317"),
        otlptracegrpc.WithInsecure(),
    )
    if err != nil {
        return nil, err
    }

    res, err := resource.New(
        context.Background(),
        resource.WithAttributes(
            semconv.ServiceName("payment-service"),
            semconv.ServiceVersion("3.0.1"),
            semconv.DeploymentEnvironmentKey.String("production"),
        ),
    )
    if err != nil {
        return nil, err
    }

    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(res),
        sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(0.1))),
    )
    otel.SetTracerProvider(tp)
    return tp, nil
}

var tracer = otel.Tracer("payment-service")

func ProcessPayment(ctx context.Context, orderID string, amount float64) error {
    ctx, span := tracer.Start(ctx, "ProcessPayment",
        trace.WithAttributes(
            attribute.String("order.id", orderID),
            attribute.Float64("payment.amount", amount),
        ),
    )
    defer span.End()

    // 不正検知チェック
    ctx, fraudSpan := tracer.Start(ctx, "fraud_detection")
    isFraud, err := checkFraud(ctx, orderID, amount)
    if err != nil {
        fraudSpan.SetStatus(codes.Error, err.Error())
        fraudSpan.RecordError(err)
        fraudSpan.End()
        return err
    }
    fraudSpan.SetAttributes(attribute.Bool("fraud.detected", isFraud))
    fraudSpan.End()

    if isFraud {
        span.SetStatus(codes.Error, "Fraud detected")
        return fmt.Errorf("fraud detected for order %s", orderID)
    }

    // 決済ゲートウェイ呼び出し
    ctx, gwSpan := tracer.Start(ctx, "payment_gateway_call")
    txID, err := callPaymentGateway(ctx, amount)
    if err != nil {
        gwSpan.SetStatus(codes.Error, err.Error())
        gwSpan.RecordError(err)
        gwSpan.End()
        return err
    }
    gwSpan.SetAttributes(attribute.String("payment.transaction_id", txID))
    gwSpan.End()

    span.SetStatus(codes.Ok, "Payment processed successfully")
    return nil
}

自動計装(Auto-Instrumentation)

Python自動計装

# 自動計装パッケージのインストール
pip install opentelemetry-distro opentelemetry-exporter-otlp
opentelemetry-bootstrap -a install

# 環境変数で設定して実行
OTEL_SERVICE_NAME=order-service \
OTEL_TRACES_EXPORTER=otlp \
OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 \
OTEL_PYTHON_LOG_CORRELATION=true \
opentelemetry-instrument python app.py

Node.js自動計装

// tracing.js - アプリ起動前にロード
const { NodeSDK } = require('@opentelemetry/sdk-node')
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node')
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-grpc')

const sdk = new NodeSDK({
  traceExporter: new OTLPTraceExporter({
    url: 'http://otel-collector:4317',
  }),
  instrumentations: [
    getNodeAutoInstrumentations({
      '@opentelemetry/instrumentation-http': {
        ignoreIncomingPaths: ['/health', '/ready'],
      },
      '@opentelemetry/instrumentation-express': {
        enabled: true,
      },
      '@opentelemetry/instrumentation-pg': {
        enabled: true,
        enhancedDatabaseReporting: true,
      },
    }),
  ],
})

sdk.start()

process.on('SIGTERM', () => {
  sdk.shutdown().then(() => process.exit(0))
})
# 自動計装でアプリを実行
node --require ./tracing.js app.js

OpenTelemetry Collectorパイプライン

Collectorアーキテクチャ

Collectorは3つのコンポーネントで構成される。

  • Receivers: テレメトリデータを受信する入口。OTLP、Jaeger、Zipkinなどのプロトコルに対応。
  • Processors: データを変換、フィルタリング、バッチ処理。属性の追加/削除、サンプリングなど。
  • Exporters: 処理されたデータをバックエンドに送信。Jaeger、Tempo、Datadogなど。

Collector設定例

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

  # Jaegerフォーマットも受信可能
  jaeger:
    protocols:
      thrift_http:
        endpoint: 0.0.0.0:14268

processors:
  # バッチ処理でネットワーク効率を向上
  batch:
    send_batch_size: 1024
    send_batch_max_size: 2048
    timeout: 5s

  # メモリ使用量の制限
  memory_limiter:
    check_interval: 1s
    limit_mib: 512
    spike_limit_mib: 128

  # リソース属性の追加
  resource:
    attributes:
      - key: environment
        value: production
        action: upsert
      - key: cluster
        value: ap-northeast-2-prod
        action: upsert

  # 不要な属性の削除(コスト削減)
  attributes:
    actions:
      - key: http.request.header.authorization
        action: delete
      - key: db.statement
        action: hash # SQLクエリのハッシュ化(セキュリティ)

  # テールベースサンプリング
  tail_sampling:
    decision_wait: 10s
    num_traces: 100000
    policies:
      - name: errors-policy
        type: status_code
        status_code:
          status_codes:
            - ERROR
      - name: slow-traces-policy
        type: latency
        latency:
          threshold_ms: 1000
      - name: probabilistic-policy
        type: probabilistic
        probabilistic:
          sampling_percentage: 10

exporters:
  # Grafana Tempoへ送信
  otlp/tempo:
    endpoint: tempo:4317
    tls:
      insecure: true

  # Jaegerへ送信
  otlp/jaeger:
    endpoint: jaeger:4317
    tls:
      insecure: true

  # デバッグログ出力
  debug:
    verbosity: detailed

service:
  pipelines:
    traces:
      receivers: [otlp, jaeger]
      processors: [memory_limiter, resource, attributes, tail_sampling, batch]
      exporters: [otlp/tempo, debug]

  telemetry:
    logs:
      level: info
    metrics:
      address: 0.0.0.0:8888

Collectorデプロイモード

# Docker ComposeでCollectorをデプロイ
version: '3.8'
services:
  otel-collector:
    image: otel/opentelemetry-collector-contrib:0.96.0
    command: ['--config=/etc/otel/config.yaml']
    volumes:
      - ./otel-collector-config.yaml:/etc/otel/config.yaml
    ports:
      - '4317:4317' # OTLP gRPC
      - '4318:4318' # OTLP HTTP
      - '8888:8888' # Prometheus metrics
      - '8889:8889' # Prometheus exporter
      - '13133:13133' # Health check
    deploy:
      resources:
        limits:
          memory: 1G
          cpus: '1.0'

サンプリング戦略

Head-basedサンプリング vs Tail-basedサンプリング

特性Head-basedTail-based
決定タイミングトレース開始時トレース完了後
情報ベースTraceIDハッシュ完全なトレースデータ
利点オーバーヘッドが低い、実装が簡単エラー/遅延トレースを確実にキャプチャ
欠点重要なトレースを見逃す可能性メモリ使用量が多い、複雑
適切な環境大規模トラフィック、コスト重視デバッグ中心、品質優先
実装場所SDK(クライアント側)Collector(サーバー側)

サンプリング設定例

# Python SDKでhead-basedサンプリングの設定
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.sampling import (
    TraceIdRatioBased,
    ParentBased,
    ALWAYS_ON,
    ALWAYS_OFF,
)

# 10%確率サンプリング(親スパンの決定に従う)
sampler = ParentBased(root=TraceIdRatioBased(0.1))

provider = TracerProvider(
    resource=resource,
    sampler=sampler,
)
# Collectorでtail-basedサンプリングの設定
processors:
  tail_sampling:
    decision_wait: 30s
    num_traces: 50000
    expected_new_traces_per_sec: 1000
    policies:
      # エラーのあるトレースは100%収集
      - name: error-traces
        type: status_code
        status_code:
          status_codes: [ERROR]

      # 1秒以上かかったトレースは100%収集
      - name: slow-traces
        type: latency
        latency:
          threshold_ms: 1000

      # 重要サービスのトレースは50%収集
      - name: critical-service
        type: string_attribute
        string_attribute:
          key: service.name
          values: [payment-service, auth-service]
          enabled_regex_matching: false
        type: and
        and:
          and_sub_policy:
            - name: sample-critical
              type: probabilistic
              probabilistic:
                sampling_percentage: 50

      # 残りは5%収集
      - name: default
        type: probabilistic
        probabilistic:
          sampling_percentage: 5

バックエンド比較

項目JaegerGrafana TempoZipkinDatadogNew Relic
ライセンスApache 2.0AGPLv3Apache 2.0商用商用
ストレージCassandra, ES, MemoryObject Storage (S3等)Cassandra, ES, MySQL自社ストレージ自社ストレージ
クエリ言語自社UI/APITraceQL自社UI/API自社クエリNRQL
コスト無料(インフラ費用)無料(インフラ費用)無料(インフラ費用)トレース単位課金トレース単位課金
スケーリング水平スケーリング可能非常に優れる限定的自動自動
OTelサポートネイティブネイティブネイティブネイティブネイティブ
ログ/メトリクス統合限定的Grafanaスタックと統合限定的完全統合完全統合
運用複雑度普通低い(オブジェクトストレージ)低いなし(SaaS)なし(SaaS)

コンテキスト伝播(Context Propagation)

W3C TraceContext

W3C TraceContextは標準HTTPヘッダーを通じてトレース情報を伝播する標準である。

traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01
             ^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^ ^^
          バージョン   trace-id (32 hex)         parent-id (16 hex) flags
# PythonでのW3C TraceContext伝播設定
from opentelemetry import propagate
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.propagators.textmap import DefaultTextMapPropagator

# W3C TraceContext(デフォルト)
propagate.set_global_textmap(
    CompositePropagator([
        DefaultTextMapPropagator(),  # W3C TraceContext
    ])
)

# HTTPリクエストにコンテキストを注入
import requests
from opentelemetry.propagate import inject

headers = {}
inject(headers)  # traceparent、tracestateヘッダーを自動追加
response = requests.get("http://downstream-service/api/data", headers=headers)

B3伝播(Zipkin互換)

# B3伝播設定(Zipkin互換が必要な場合)
from opentelemetry.propagators.b3 import B3MultiFormat

propagate.set_global_textmap(
    CompositePropagator([
        DefaultTextMapPropagator(),  # W3C
        B3MultiFormat(),             # B3 (Zipkin互換)
    ])
)

ログとメトリクスの相関

# トレースIDをログに含めて相関関係を設定
import logging
from opentelemetry import trace

class TraceIdFilter(logging.Filter):
    def filter(self, record):
        span = trace.get_current_span()
        if span.is_recording():
            ctx = span.get_span_context()
            record.trace_id = format(ctx.trace_id, '032x')
            record.span_id = format(ctx.span_id, '016x')
        else:
            record.trace_id = '0' * 32
            record.span_id = '0' * 16
        return True

# ログ設定
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
    '%(asctime)s %(levelname)s [trace_id=%(trace_id)s span_id=%(span_id)s] %(message)s'
))
handler.addFilter(TraceIdFilter())
logger = logging.getLogger(__name__)
logger.addHandler(handler)

eBPFベースゼロコード計装

eBPF(extended Berkeley Packet Filter)を活用すれば、アプリケーションコードを修正せずにカーネルレベルでトレーシングデータを収集できる。Grafana Beylaが代表的なツールである。

# KubernetesでGrafana Beylaをデプロイ
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: beyla
spec:
  selector:
    matchLabels:
      app: beyla
  template:
    metadata:
      labels:
        app: beyla
    spec:
      hostPID: true
      containers:
        - name: beyla
          image: grafana/beyla:latest
          securityContext:
            privileged: true
          env:
            - name: BEYLA_OPEN_PORT
              value: '80,443,8080,3000'
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              value: 'http://otel-collector:4317'
            - name: BEYLA_SERVICE_NAMESPACE
              value: 'production'
          volumeMounts:
            - name: sys-kernel
              mountPath: /sys/kernel
      volumes:
        - name: sys-kernel
          hostPath:
            path: /sys/kernel

eBPFベース計装のメリットとデメリットは以下の通りである。

  • メリット: コード変更不要、言語非依存、低オーバーヘッド
  • デメリット: ビジネスコンテキスト(ユーザーID等)の追加不可、Linux カーネル4.18+が必要、一部のプロトコルのみ対応

障害事例とリカバリ手順

事例1: 非同期境界でのコンテキスト喪失

# 問題: 非同期タスクでトレースコンテキストが消失
import asyncio
from opentelemetry import trace, context

async def process_order(order_id: str):
    with tracer.start_as_current_span("process_order") as span:
        # 間違った方法: 新しいタスクにコンテキストが伝播されない
        asyncio.create_task(send_notification(order_id))  # コンテキスト喪失!

# 解決: コンテキストを明示的に渡す
async def process_order_fixed(order_id: str):
    with tracer.start_as_current_span("process_order") as span:
        ctx = context.get_current()
        asyncio.create_task(send_notification_with_context(order_id, ctx))

async def send_notification_with_context(order_id: str, ctx):
    token = context.attach(ctx)
    try:
        with tracer.start_as_current_span("send_notification"):
            # 通知送信ロジック
            pass
    finally:
        context.detach(token)

事例2: サンプリング設定ミスによるトレース欠落

# 問題: head-basedサンプリングを0.1%に設定し、エラートレースもほとんど欠落
# SDK設定
sampler: TraceIdRatioBased(0.001)  # 0.1% - 低すぎる

# 解決: ParentBased + tail_samplingの組み合わせを使用
# SDKでは全量収集
sampler: ParentBased(root=ALWAYS_ON)

# Collectorでtail-basedサンプリングによりエラー/遅延を保証
processors:
  tail_sampling:
    policies:
      - name: errors
        type: status_code
        status_code:
          status_codes: [ERROR]
      - name: latency
        type: latency
        latency:
          threshold_ms: 500
      - name: default
        type: probabilistic
        probabilistic:
          sampling_percentage: 1

事例3: Collectorのメモリ不足(OOM)

# 問題: トラフィック急増時にCollectorがOOMで終了

# 解決: memory_limiterプロセッサを必ず追加
processors:
  memory_limiter:
    check_interval: 1s
    limit_mib: 1024 # 最大メモリ使用量
    spike_limit_mib: 256 # スパイク許容範囲
    limit_percentage: 80 # 全体メモリの80%

  batch:
    send_batch_size: 512 # バッチサイズを縮小
    timeout: 2s

service:
  pipelines:
    traces:
      # memory_limiterをプロセッサチェーンの先頭に配置
      processors: [memory_limiter, batch, tail_sampling]

事例4: サービス間の伝播ヘッダー不一致

サービスAがW3C TraceContextを、サービスBがB3フォーマットを使用すると、コンテキストが途切れる。

解決方法は、すべてのサービスで同一の伝播フォーマットを使用するか、CompositePropagatorで複数のフォーマットを同時にサポートすることである。

本番環境チェックリスト

計装

  • すべてのサービスにOpenTelemetry SDKがインストールされているか確認
  • サービス名、バージョン、環境情報がリソース属性に含まれているか確認
  • 主要ビジネストランザクションにカスタムスパンが追加されているか確認
  • 機密情報(パスワード、トークン等)がスパン属性に含まれていないか確認
  • 非同期境界でコンテキストが正しく伝播されているか確認

Collector

  • memory_limiterプロセッサがパイプラインの先頭に配置されているか確認
  • バッチプロセッサのサイズとタイムアウトが適切か確認
  • ヘルスチェックエンドポイントが設定されているか確認
  • Collector自体のメトリクスがモニタリングされているか確認
  • セキュリティ上機密な属性がattributesプロセッサで削除/ハッシュ化されているか確認

サンプリング

  • エラートレースが100%収集されているか確認
  • 遅延トレース(SLO違反)が収集されているか確認
  • サンプリング率がコスト予算内であるか確認
  • head-basedとtail-basedサンプリングの組み合わせが適切か確認

運用

  • トレース-ログ-メトリクスの相関が設定されているか確認
  • ダッシュボードでサービスマップと依存関係グラフが表示されるか確認
  • アラートルールがトレースベースのSLOと連動しているか確認
  • トレースデータの保持期間が設定されているか確認
  • コンテキスト伝播フォーマットがすべてのサービスで一貫しているか確認

参考資料