Skip to content
Published on

Serverlessアーキテクチャパターン完全ガイド2025:Lambda、Step Functions、イベントソーシング、コスト最適化

Authors

目次(もくじ)

1. Serverlessとは何(なに)か

Serverlessは、サーバーを直接(ちょくせつ)管理(かんり)せずクラウドプロバイダーがインフラを完全(かんぜん)に抽象化(ちゅうしょうか)するコンピューティングモデルである。開発者(かいはつしゃ)はビジネスロジックだけに集中(しゅうちゅう)し、プロビジョニング・スケーリング・パッチングはクラウドが担当(たんとう)する。

1.1 Serverlessの4大(だい)原則(げんそく)

原則説明(せつめい)例(れい)
サーバー管理不要OS、パッチ、スケーリングを意識(いしき)しないLambda、Cloud Functions
自動(じどう)スケーリングトラフィックに応(おう)じて0から数千(すうせん)インスタンスまで秒間(びょうかん)0件から10万件まで
従量課金(じゅうりょうかきん)アイドル時(じ)のコストなし100ms単位(たんい)の課金
イベント駆動(くどう)リクエスト/イベントが関数(かんすう)をトリガーHTTP、S3、SQS、スケジュール

1.2 Serverlessコンピューティングの歴史(れきし)

2014: AWS Lambda リリース(最初のFaaS)
2016: Azure Functions、Google Cloud Functions
2017: AWS Step Functions、SAM リリース
2018: Lambda Layers、ALBサポート
2019: Provisioned Concurrency、RDS Proxy
2020: Lambda Container Image、EventBridge
2021: Lambda URL、Graviton2サポート
2022: Lambda SnapStart(Java)、ストリーミングレスポンス
2023: Lambda Advanced Logging、Step Functions改善
2024: Lambda パフォーマンス最適化、ARM64全面サポート
2025: Lambda最大メモリ10GB、Step Functions Distributed Map強化

1.3 主要(しゅよう)クラウドのServerlessサービス

カテゴリAWSAzureGCP
FaaSLambdaFunctionsCloud Functions
ワークフローStep FunctionsDurable FunctionsWorkflows
APIAPI GatewayAPI ManagementAPI Gateway
メッセージングSQS/SNSService BusPub/Sub
ストリーミングKinesisEvent HubsDataflow
DBDynamoDBCosmos DBFirestore
ストレージS3Blob StorageCloud Storage
イベントバスEventBridgeEvent GridEventarc

2. Lambda設計(せっけい)パターン

Lambda関数をどのように構造化(こうぞうか)するかによって、保守性(ほしゅせい)、パフォーマンス、コストが大(おお)きく変(か)わる。

2.1 単一目的関数(Single Purpose Function)

一(ひと)つのLambdaが一つの作業(さぎょう)だけを実行(じっこう)する。最(もっと)も推奨(すいしょう)されるパターンである。

# order_create.py - 注文作成のみ担当
import json
import boto3
import os
from datetime import datetime

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['ORDERS_TABLE'])
sns = boto3.client('sns')

def handler(event, context):
    body = json.loads(event['body'])

    order = {
        'orderId': context.aws_request_id,
        'userId': body['userId'],
        'items': body['items'],
        'total': calculate_total(body['items']),
        'status': 'CREATED',
        'createdAt': datetime.utcnow().isoformat()
    }

    table.put_item(Item=order)

    # イベント発行
    sns.publish(
        TopicArn=os.environ['ORDER_TOPIC'],
        Message=json.dumps(order),
        MessageAttributes={
            'eventType': {
                'DataType': 'String',
                'StringValue': 'OrderCreated'
            }
        }
    )

    return {
        'statusCode': 201,
        'body': json.dumps(order)
    }

def calculate_total(items):
    return sum(item['price'] * item['quantity'] for item in items)

メリット:

  • 関数サイズが小(ちい)さくコールドスタートが速(はや)い
  • 独立(どくりつ)デプロイ可能(かのう)
  • IAM権限(けんげん)を最小限(さいしょうげん)にできる
  • デバッグが容易(ようい)

デメリット:

  • 関数数(かんすうすう)が多(おお)くなりがち
  • 共通(きょうつう)コード管理にLayerが必要(ひつよう)

2.2 モノリシックLambda(Lambda-lith)

一つのLambdaが複数(ふくすう)のルートを処理(しょり)する。ExpressやFastAPIなどのフレームワークを使用(しよう)する。

// app.ts - モノリシックLambda
import express from 'express';
import serverless from 'serverless-http';

const app = express();
app.use(express.json());

// 複数ルートを一つのLambdaで処理
app.get('/orders', async (req, res) => {
  const orders = await getOrders(req.query);
  res.json(orders);
});

app.post('/orders', async (req, res) => {
  const order = await createOrder(req.body);
  res.status(201).json(order);
});

app.get('/orders/:id', async (req, res) => {
  const order = await getOrder(req.params.id);
  if (!order) return res.status(404).json({ error: 'Not found' });
  res.json(order);
});

app.put('/orders/:id/status', async (req, res) => {
  const order = await updateOrderStatus(req.params.id, req.body.status);
  res.json(order);
});

app.delete('/orders/:id', async (req, res) => {
  await cancelOrder(req.params.id);
  res.status(204).send();
});

export const handler = serverless(app);

メリット:

  • 既存(きぞん)のWebフレームワークコードをそのまま移行(いこう)可能
  • 関数数の管理が単純(たんじゅん)
  • ローカル開発が便利(べんり)

デメリット:

  • パッケージサイズが大きくコールドスタートが遅(おそ)い
  • IAM権限が過度(かど)に広(ひろ)くなる
  • 一つのルートの問題(もんだい)が全体(ぜんたい)に影響(えいきょう)

2.3 Fan-out / Fan-inパターン

一つのイベントが複数のLambdaを同時(どうじ)にトリガーし、結果(けっか)を集約(しゅうやく)するパターンである。

# serverless.yml - Fan-outアーキテクチャ
service: order-processing

provider:
  name: aws
  runtime: nodejs20.x

functions:
  orderReceiver:
    handler: src/receiver.handler
    events:
      - http:
          path: /orders
          method: post
    environment:
      FAN_OUT_TOPIC: !Ref OrderFanOutTopic

  inventoryCheck:
    handler: src/inventory.handler
    events:
      - sns:
          arn: !Ref OrderFanOutTopic
          filterPolicy:
            eventType:
              - OrderCreated

  paymentProcess:
    handler: src/payment.handler
    events:
      - sns:
          arn: !Ref OrderFanOutTopic
          filterPolicy:
            eventType:
              - OrderCreated

  notificationSend:
    handler: src/notification.handler
    events:
      - sns:
          arn: !Ref OrderFanOutTopic
          filterPolicy:
            eventType:
              - OrderCreated

resources:
  Resources:
    OrderFanOutTopic:
      Type: AWS::SNS::Topic
      Properties:
        TopicName: order-fan-out

2.4 Lambda設計パターン比較(ひかく)

パターン関数数コールドスタートデプロイ単位推奨場面(すいしょうばめん)
単一目的多い速い個別(こべつ)マイクロサービス
Lambda-lith少(すく)ない遅い全体マイグレーション
Fan-out中間(ちゅうかん)速い個別並列処理
Lambda Layer中間普通(ふつう)レイヤー+関数共通コード共有

3. Step Functions:ワークフローオーケストレーション

Step FunctionsはAWSのServerlessワークフローサービスで、複雑(ふくざつ)なビジネスロジックをステートマシンとして視覚的(しかくてき)に定義(ていぎ)する。

3.1 Standard vs Express Workflow

特性(とくせい)StandardExpress
最大実行時間1年5分
実行保証Exactly-onceAt-least-once
価格(かかく)状態遷移(じょうたいせんい)ごとの課金実行時間/メモリ課金
実行履歴(りれき)90日保管(ほかん)CloudWatch Logs
最大スループット秒間2,000遷移秒間100,000以上
用途(ようと)長期実行ワークフロー大量・高速処理

3.2 ステートタイプ

{
  "Comment": "注文処理ワークフロー",
  "StartAt": "ValidateOrder",
  "States": {
    "ValidateOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789:function:validate-order",
      "Next": "CheckInventory",
      "Retry": [
        {
          "ErrorEquals": ["ServiceException"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["ValidationError"],
          "Next": "OrderFailed"
        }
      ]
    },
    "CheckInventory": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789:function:check-inventory",
      "Next": "ProcessPaymentOrWait"
    },
    "ProcessPaymentOrWait": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.inventoryAvailable",
          "BooleanEquals": true,
          "Next": "ProcessPayment"
        }
      ],
      "Default": "WaitForInventory"
    },
    "WaitForInventory": {
      "Type": "Wait",
      "Seconds": 300,
      "Next": "CheckInventory"
    },
    "ProcessPayment": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789:function:process-payment",
      "Next": "ParallelFulfillment"
    },
    "ParallelFulfillment": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "UpdateDatabase",
          "States": {
            "UpdateDatabase": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:123456789:function:update-db",
              "End": true
            }
          }
        },
        {
          "StartAt": "SendNotification",
          "States": {
            "SendNotification": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:123456789:function:send-notification",
              "End": true
            }
          }
        },
        {
          "StartAt": "InitiateShipping",
          "States": {
            "InitiateShipping": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:123456789:function:initiate-shipping",
              "End": true
            }
          }
        }
      ],
      "Next": "OrderCompleted"
    },
    "OrderCompleted": {
      "Type": "Succeed"
    },
    "OrderFailed": {
      "Type": "Fail",
      "Error": "OrderProcessingFailed",
      "Cause": "Order validation or processing failed"
    }
  }
}

3.3 ステートタイプまとめ

ステートタイプ用途説明
Task作業実行Lambda、DynamoDB、SQSなどを呼(よ)び出(だ)す
Choice条件分岐(じょうけんぶんき)if/elseロジック
Parallel並列実行複数ブランチを同時に実行
Map反復処理(はんぷくしょり)配列(はいれつ)の各要素を処理
Wait待機(たいき)指定時間またはタイムスタンプまで待機
Passデータ変換(へんかん)入力を変換して次(つぎ)に渡(わた)す
Succeed成功終了(せいこうしゅうりょう)ワークフロー成功完了
Fail失敗終了(しっぱいしゅうりょう)ワークフロー失敗

3.4 Callbackパターン(人間承認ワークフロー)

Step Functionsは外部(がいぶ)システムの応答を待つコールバックパターンをサポートする。

# callback_handler.py - 人間の承認を待つLambda
import json
import boto3

sfn = boto3.client('stepfunctions')
ses = boto3.client('ses')

def request_approval(event, context):
    """Step Functionsがタスクトークンと共に呼び出す"""
    task_token = event['taskToken']
    order = event['order']

    # 承認リンク付きメール送信
    approval_url = f"https://api.example.com/approve?token={task_token}"
    reject_url = f"https://api.example.com/reject?token={task_token}"

    ses.send_email(
        Source='noreply@example.com',
        Destination={'ToAddresses': ['manager@example.com']},
        Message={
            'Subject': {'Data': f"注文承認リクエスト: {order['orderId']}"},
            'Body': {
                'Html': {
                    'Data': f"""
                    <h2>注文承認リクエスト</h2>
                    <p>注文ID: {order['orderId']}</p>
                    <p>金額: {order['total']}円</p>
                    <a href="{approval_url}">承認</a> |
                    <a href="{reject_url}">拒否</a>
                    """
                }
            }
        }
    )

def handle_approval(event, context):
    """承認/拒否コールバック処理"""
    params = event['queryStringParameters']
    task_token = params['token']

    if 'approve' in event['path']:
        sfn.send_task_success(
            taskToken=task_token,
            output=json.dumps({'approved': True})
        )
    else:
        sfn.send_task_failure(
            taskToken=task_token,
            error='Rejected',
            cause='Manager rejected the order'
        )

    return {
        'statusCode': 200,
        'body': json.dumps({'message': '処理完了'})
    }

4. イベント駆動アーキテクチャパターン

4.1 イベントソーシング with Lambda

# event_store.py
import json
import boto3
from datetime import datetime

dynamodb = boto3.resource('dynamodb')
event_store = dynamodb.Table('EventStore')
sns = boto3.client('sns')

def append_event(aggregate_id, event_type, data, version):
    """イベントを保存・発行"""
    event = {
        'aggregateId': aggregate_id,
        'version': version,
        'eventType': event_type,
        'data': data,
        'timestamp': datetime.utcnow().isoformat(),
        'metadata': {
            'correlationId': data.get('correlationId', ''),
            'causationId': data.get('causationId', '')
        }
    }

    # 楽観的ロック:versionが既に存在すれば失敗
    event_store.put_item(
        Item=event,
        ConditionExpression='attribute_not_exists(version)'
    )

    # イベント発行
    sns.publish(
        TopicArn='arn:aws:sns:ap-northeast-1:123456789:domain-events',
        Message=json.dumps(event),
        MessageAttributes={
            'eventType': {
                'DataType': 'String',
                'StringValue': event_type
            }
        }
    )

    return event

def replay_events(aggregate_id):
    """特定Aggregateの全イベントを再生"""
    response = event_store.query(
        KeyConditionExpression='aggregateId = :aid',
        ExpressionAttributeValues={':aid': aggregate_id},
        ScanIndexForward=True
    )
    return response['Items']

4.2 SagaパターンwithStep Functions

分散(ぶんさん)トランザクションを管理するSagaパターンをStep Functionsで実装(じっそう)する。

{
  "Comment": "注文Saga - 補償トランザクション付き",
  "StartAt": "ReserveInventory",
  "States": {
    "ReserveInventory": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789:function:reserve-inventory",
      "Next": "ProcessPayment",
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "InventoryReservationFailed"
      }]
    },
    "ProcessPayment": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789:function:process-payment",
      "Next": "ConfirmOrder",
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "RollbackInventory"
      }]
    },
    "ConfirmOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789:function:confirm-order",
      "Next": "SagaCompleted",
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "RollbackPayment"
      }]
    },
    "RollbackPayment": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789:function:rollback-payment",
      "Next": "RollbackInventory"
    },
    "RollbackInventory": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789:function:rollback-inventory",
      "Next": "SagaFailed"
    },
    "InventoryReservationFailed": {
      "Type": "Fail",
      "Error": "InventoryReservationFailed",
      "Cause": "Could not reserve inventory"
    },
    "SagaCompleted": {
      "Type": "Succeed"
    },
    "SagaFailed": {
      "Type": "Fail",
      "Error": "SagaFailed",
      "Cause": "Order saga failed, all compensations executed"
    }
  }
}

4.3 Choreography vs Orchestration

特性Choreography(イベント)Orchestration(Step Functions)
結合度(けつごうど)疎結合(そけつごう)中央集権(ちゅうおうしゅうけん)
可視性(かしせい)分散トレーシングが必要ステートマシンで確認可能
複雑度イベントフロー把握(はあく)が困難(こんなん)ワークフロー定義が明確(めいかく)
エラー処理各サービスが独立して処理中央でリトライ/補償(ほしょう)
適(てき)した場面単純なイベントフロー複雑なビジネスロジック

5. コールドスタート深掘(ふかぼ)り

コールドスタートはServerless最大(さいだい)の技術的課題(ぎじゅつてきかだい)の一つである。Lambda関数が新(あたら)しい実行環境(かんきょう)で起動(きどう)する際(さい)に発生する遅延(ちえん)である。

5.1 コールドスタートの発生原因(げんいん)

リクエスト到着
  |
  v
[実行環境あり?] --No--> [コールドスタート経路]
  |                          |
  Yes                   1. 実行環境プロビジョニング
  |                   2. コードダウンロード(S3  v                   3. ランタイム初期化
[ウォームスタート]     4. ハンドラー外コード実行
  |                   5. ハンドラー実行
  v                        |
[ハンドラー実行]           v
  |                   [レスポンス返却]
  v
[レスポンス返却]

5.2 ランタイム別コールドスタート時間比較

ランタイム平均(へいきん)コールドスタートP99コールドスタートパッケージサイズ影響
Python 3.12150-300ms500-800ms低(ひく)い
Node.js 20150-350ms500-900ms
Go(provided.al2023)50-100ms150-300ms非常(ひじょう)に低い
Rust(provided.al2023)30-80ms100-250ms非常に低い
Java 21800-3000ms3000-8000ms高(たか)い
Java 21(SnapStart)100-200ms300-500ms
.NET 8(AOT)200-400ms600-1000ms

5.3 コールドスタート最適化戦略(せんりゃく)

# 最適化されたLambda関数構造
import json
import os

# ハンドラー外で初期化(再利用される)
# 1. 接続はグローバルで初期化
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])

# 2. 不要なimportを削除
# BAD: import pandas  (パッケージサイズ増加)
# GOOD: 必要なものだけimport

# 3. SDK設定最適化
from botocore.config import Config
config = Config(
    connect_timeout=5,
    read_timeout=5,
    retries={'max_attempts': 2}
)
s3 = boto3.client('s3', config=config)

def handler(event, context):
    """ハンドラーは可能な限り軽量に"""
    order_id = event['pathParameters']['orderId']

    response = table.get_item(Key={'orderId': order_id})
    item = response.get('Item')

    if not item:
        return {'statusCode': 404, 'body': json.dumps({'error': 'Not found'})}

    return {'statusCode': 200, 'body': json.dumps(item)}

5.4 Provisioned Concurrency

# SAMテンプレート - Provisioned Concurrency設定
Resources:
  MyFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: app.handler
      Runtime: python3.12
      MemorySize: 512
      AutoPublishAlias: live
      ProvisionedConcurrencyConfig:
        ProvisionedConcurrentExecutions: 10

  # 時間帯別自動スケーリング
  ScalingTarget:
    Type: AWS::ApplicationAutoScaling::ScalableTarget
    Properties:
      MaxCapacity: 100
      MinCapacity: 5
      ResourceId: !Sub function:${MyFunction}:live
      ScalableDimension: lambda:function:ProvisionedConcurrency
      ServiceNamespace: lambda

  ScalingPolicy:
    Type: AWS::ApplicationAutoScaling::ScalingPolicy
    Properties:
      PolicyName: UtilizationScaling
      PolicyType: TargetTrackingScaling
      ScalableTargetId: !Ref ScalingTarget
      TargetTrackingScalingPolicyConfiguration:
        TargetValue: 0.7
        PredefinedMetricSpecification:
          PredefinedMetricType: LambdaProvisionedConcurrencyUtilization

5.5 Java SnapStart

// SnapStart最適化されたJava Lambda
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import org.crac.Core;
import org.crac.Resource;

public class OrderHandler implements RequestHandler<APIGatewayProxyRequestEvent, APIGatewayProxyResponseEvent>,
                                     Resource {

    private final DynamoDbClient dynamoDb;
    private final ObjectMapper objectMapper;

    public OrderHandler() {
        // SnapStart: この初期化コードはスナップショットに含まれる
        this.dynamoDb = DynamoDbClient.create();
        this.objectMapper = new ObjectMapper();
        Core.getGlobalContext().register(this);
    }

    @Override
    public void beforeCheckpoint(org.crac.Context<? extends Resource> context) {
        // スナップショット前:接続クリーンアップ
    }

    @Override
    public void afterRestore(org.crac.Context<? extends Resource> context) {
        // 復元後:接続再確立
        // 一意性保証(乱数シード再設定など)
    }

    @Override
    public APIGatewayProxyResponseEvent handleRequest(
            APIGatewayProxyRequestEvent event, Context context) {
        // ビジネスロジック
        return new APIGatewayProxyResponseEvent()
            .withStatusCode(200)
            .withBody("{\"message\": \"OK\"}");
    }
}

6. APIパターン

6.1 REST API with API Gateway

# SAMテンプレート - REST API
Resources:
  OrdersApi:
    Type: AWS::Serverless::Api
    Properties:
      StageName: prod
      Auth:
        DefaultAuthorizer: CognitoAuthorizer
        Authorizers:
          CognitoAuthorizer:
            UserPoolArn: !GetAtt UserPool.Arn
      MethodSettings:
        - HttpMethod: '*'
          ResourcePath: '/*'
          ThrottlingBurstLimit: 100
          ThrottlingRateLimit: 50
      Cors:
        AllowMethods: "'GET,POST,PUT,DELETE,OPTIONS'"
        AllowHeaders: "'Content-Type,Authorization'"
        AllowOrigin: "'https://example.com'"

  GetOrderFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: src/orders/get.handler
      Runtime: nodejs20.x
      Events:
        GetOrder:
          Type: Api
          Properties:
            RestApiId: !Ref OrdersApi
            Path: /orders/{orderId}
            Method: get

6.2 GraphQL with AppSync

# AppSyncスキーマ
type Order {
  orderId: ID!
  userId: String!
  items: [OrderItem!]!
  total: Float!
  status: OrderStatus!
  createdAt: AWSDateTime!
}

type OrderItem {
  productId: String!
  name: String!
  quantity: Int!
  price: Float!
}

enum OrderStatus {
  CREATED
  PAID
  SHIPPED
  DELIVERED
  CANCELLED
}

type Query {
  getOrder(orderId: ID!): Order
  listOrders(userId: String!, limit: Int, nextToken: String): OrderConnection!
}

type Mutation {
  createOrder(input: CreateOrderInput!): Order!
  updateOrderStatus(orderId: ID!, status: OrderStatus!): Order!
}

type Subscription {
  onOrderStatusChanged(orderId: ID!): Order
    @aws_subscribe(mutations: ["updateOrderStatus"])
}

6.3 WebSocket with API Gateway

# websocket_handler.py
import json
import boto3
import os

dynamodb = boto3.resource('dynamodb')
connections_table = dynamodb.Table(os.environ['CONNECTIONS_TABLE'])

def connect(event, context):
    """WebSocket接続"""
    connection_id = event['requestContext']['connectionId']
    user_id = event['requestContext']['authorizer']['userId']

    connections_table.put_item(Item={
        'connectionId': connection_id,
        'userId': user_id
    })

    return {'statusCode': 200}

def disconnect(event, context):
    """WebSocket切断"""
    connection_id = event['requestContext']['connectionId']
    connections_table.delete_item(Key={'connectionId': connection_id})
    return {'statusCode': 200}

def send_message(event, context):
    """メッセージ送信"""
    domain = event['requestContext']['domainName']
    stage = event['requestContext']['stage']
    body = json.loads(event['body'])

    apigw = boto3.client(
        'apigatewaymanagementapi',
        endpoint_url=f'https://{domain}/{stage}'
    )

    # 全接続にブロードキャスト
    connections = connections_table.scan()['Items']
    for conn in connections:
        try:
            apigw.post_to_connection(
                ConnectionId=conn['connectionId'],
                Data=json.dumps(body['message']).encode()
            )
        except apigw.exceptions.GoneException:
            connections_table.delete_item(
                Key={'connectionId': conn['connectionId']}
            )

    return {'statusCode': 200}

7. データパターン

7.1 DynamoDB シングルテーブル設計

# DynamoDB Single Table Design
# PK/SKパターンで複数エンティティを一つのテーブルに格納

ENTITY_PATTERNS = {
    'User': {
        'PK': 'USER#user_id',
        'SK': 'PROFILE'
    },
    'Order': {
        'PK': 'USER#user_id',
        'SK': 'ORDER#order_id'
    },
    'OrderItem': {
        'PK': 'ORDER#order_id',
        'SK': 'ITEM#item_id'
    },
    'Product': {
        'PK': 'PRODUCT#product_id',
        'SK': 'DETAIL'
    }
}

# アクセスパターン別クエリ
def get_user_with_orders(user_id):
    """ユーザーと注文リストを一度に取得"""
    response = table.query(
        KeyConditionExpression='PK = :pk',
        ExpressionAttributeValues={':pk': f'USER#{user_id}'}
    )
    user = None
    orders = []
    for item in response['Items']:
        if item['SK'] == 'PROFILE':
            user = item
        elif item['SK'].startswith('ORDER#'):
            orders.append(item)
    return {'user': user, 'orders': orders}

7.2 S3イベント処理パイプライン

# S3イベント -> Lambda -> DynamoDB パイプライン
import json
import boto3
import csv
import io

s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('ProcessedData')

def process_csv_upload(event, context):
    """S3にアップロードされたCSVを処理"""
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']

    # S3からファイル読み込み
    response = s3.get_object(Bucket=bucket, Key=key)
    content = response['Body'].read().decode('utf-8')

    # CSVパース・バッチ書き込み
    reader = csv.DictReader(io.StringIO(content))

    with table.batch_writer() as batch:
        for row in reader:
            batch.put_item(Item={
                'id': row['id'],
                'data': row,
                'sourceFile': key
            })

    return {
        'statusCode': 200,
        'processedFile': key
    }

8. メッセージングサービス選択(せんたく)ガイド

8.1 SQS vs SNS vs EventBridge vs Kinesis

特性SQSSNSEventBridgeKinesis
パターンキュー(1:1)Pub/Sub(1:N)イベントバス(N:N)ストリーミング
順序保証FIFOのみFIFOのみなしパーティション内
最大メッセージ256KB256KB256KB1MB
再処理DLQDLQアーカイブ/リプレイ保持期間
フィルタリングなしメッセージ属性イベントパターンなし
スループット無制限無制限秒間数千シャードあたり1MB/s
課金リクエストごとパブリッシュごとイベントごとシャード時間ごと

8.2 意思決定(いしけってい)ツリー

メッセージング選択フロー:
1. リアルタイムストリーミングが必要?
   -> Yes: Kinesis Data Streams
   -> No: 次へ

2. 複数コンシューマーに同時配信?
   -> Yes: 次へ
   -> No: SQS(シンプルキュー)

3. 複雑なイベントルーティング/フィルタリング?
   -> Yes: EventBridge
   -> No: SNS

4. イベントリプレイが必要?
   -> Yes: EventBridge(アーカイブ)またはKinesis(保持)
   -> No: SNS/SQS

9. Serverlessコンテナ

9.1 Lambda vs Fargate vs Cloud Run

特性LambdaFargateCloud Run
最大実行時間15分無制限60分
最大メモリ10GB120GB32GB
vCPU最大6最大16最大8
ゼロスケール不可(最低1タスク)
コールドスタートありなし(常時実行)あり
価格モデル実行時間+メモリvCPU+メモリ時間実行時間+メモリ

9.2 Lambdaコンテナイメージ

# Dockerfile - Lambdaコンテナイメージ
FROM public.ecr.aws/lambda/python:3.12

# 依存関係インストール
COPY requirements.txt .
RUN pip install -r requirements.txt

# アプリケーションコード
COPY app/ ./app/

# Lambdaハンドラー指定
CMD ["app.main.handler"]

10. コスト最適化

10.1 Lambdaコスト構造(こうぞう)

Lambdaコスト = リクエスト数コスト + 実行時間コスト

リクエスト数コスト:
  -100万件無料
  - 以降100万件あたり約0.20 USD

実行時間コスト(x86):
  - 128MB: 0.0000000021 USD / ms
  - 512MB: 0.0000000083 USD / ms
  - 1024MB: 0.0000000167 USD / ms
  - 1769MB(1 vCPU): 0.0000000289 USD / ms
  - 10240MB: 0.0000001667 USD / ms

ARM64(Graviton2)価格:
  - x86比約20%安い
  - パフォーマンスは同等以上

10.2 メモリ最適化(Power Tuning)

メモリ(MB)平均実行時間コスト/呼び出し最適?
1282500ms0.0053 USD
2561200ms0.0051 USD
512600ms0.0050 USDコスト最適
1024350ms0.0058 USD
1769200ms0.0058 USDパフォーマンス最適

10.3 コスト削減(さくげん)チェックリスト

  1. ARM64(Graviton2)移行 - 20%削減、同等パフォーマンス
  2. メモリPower Tuning - 過小/過大プロビジョニング防止
  3. タイムアウト適切設定 - 無限実行防止
  4. DLQ設定 - 失敗リトライ防止
  5. Reserved Concurrency - 過度なスケーリング制限
  6. Lambda Layer活用 - コードサイズ削減でコールドスタート短縮
  7. EventBridgeスケジュール - CloudWatch Events代替
  8. S3 Intelligent-Tiering - アクセスパターンに応じた自動最適化
  9. DynamoDB On-Demand - 予測不可トラフィック向け
  10. API Gatewayキャッシング - Lambda呼び出し削減

11. Serverless vs Container 意思決定フレームワーク

基準(きじゅん)Serverless(Lambda)Container(ECS/K8s)
実行時間最大15分無制限
スケーリング速度秒単位分単位
最小コスト0(未使用時)常にベースコスト
最大スループット同時実行数制限ありPod数に応じて無制限
状態管理StatelessStateful可能
ベンダーロックイン高い中程度(ちゅうていど)
運用負荷(うんようふか)非常に低い高い
デバッグ難(むずか)しい容易

12. モニタリングと可観測性(かかんそくせい)

12.1 Lambda Powertools

# Lambda Powertools - 構造化ロギング、トレーシング、メトリクス
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.event_handler import APIGatewayRestResolver

logger = Logger()
tracer = Tracer()
metrics = Metrics()
app = APIGatewayRestResolver()

@app.get("/orders/<order_id>")
@tracer.capture_method
def get_order(order_id: str):
    logger.info("Fetching order", extra={"order_id": order_id})

    order = fetch_order(order_id)

    metrics.add_metric(name="OrderFetched", unit=MetricUnit.Count, value=1)
    metrics.add_dimension(name="Environment", value="production")

    return {"order": order}

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def handler(event, context):
    return app.resolve(event, context)

12.2 X-Ray分散トレーシング

# X-Ray SDKで外部呼び出しをトレース
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

# 全AWS SDK呼び出しを自動トレース
patch_all()

@xray_recorder.capture('process_order')
def process_order(order):
    subsegment = xray_recorder.begin_subsegment('validate')
    try:
        validate_order(order)
        subsegment.put_annotation('valid', True)
    except Exception as e:
        subsegment.put_annotation('valid', False)
        subsegment.add_exception(e)
        raise
    finally:
        xray_recorder.end_subsegment()

    save_order(order)
    publish_event(order)

12.3 CloudWatchアラーム設定

Resources:
  # Lambdaエラー率アラーム
  LambdaErrorAlarm:
    Type: AWS::CloudWatch::Alarm
    Properties:
      AlarmName: lambda-high-error-rate
      MetricName: Errors
      Namespace: AWS/Lambda
      Dimensions:
        - Name: FunctionName
          Value: !Ref MyFunction
      Statistic: Sum
      Period: 300
      EvaluationPeriods: 2
      Threshold: 5
      ComparisonOperator: GreaterThanThreshold
      AlarmActions:
        - !Ref AlertTopic

  # Lambdaスロットルアラーム
  LambdaThrottleAlarm:
    Type: AWS::CloudWatch::Alarm
    Properties:
      AlarmName: lambda-throttled
      MetricName: Throttles
      Namespace: AWS/Lambda
      Dimensions:
        - Name: FunctionName
          Value: !Ref MyFunction
      Statistic: Sum
      Period: 60
      EvaluationPeriods: 1
      Threshold: 0
      ComparisonOperator: GreaterThanThreshold
      AlarmActions:
        - !Ref AlertTopic

13. テスト戦略

13.1 ローカルテスト with SAM CLI

# SAM CLIでローカルLambda実行
sam local invoke MyFunction \
  --event events/api-gateway.json \
  --env-vars env.json

# ローカルAPIサーバー起動
sam local start-api --port 3000

# DynamoDB Localと併用
docker run -p 8000:8000 amazon/dynamodb-local
sam local invoke --docker-network host

13.2 統合(とうごう)テスト

# test_integration.py
import boto3
import pytest
import json

STACK_NAME = 'my-serverless-app'
API_URL = None

@pytest.fixture(scope='session', autouse=True)
def setup():
    """CloudFormationスタックからAPI URLを取得"""
    global API_URL
    cfn = boto3.client('cloudformation')
    response = cfn.describe_stacks(StackName=STACK_NAME)
    outputs = response['Stacks'][0]['Outputs']
    API_URL = next(o['OutputValue'] for o in outputs if o['OutputKey'] == 'ApiUrl')

def test_create_order():
    """注文作成統合テスト"""
    import requests

    response = requests.post(
        f'{API_URL}/orders',
        json={
            'userId': 'test-user',
            'items': [
                {'productId': 'p1', 'name': 'Widget', 'quantity': 2, 'price': 1000}
            ]
        },
        headers={'Authorization': f'Bearer {get_test_token()}'}
    )

    assert response.status_code == 201
    data = response.json()
    assert 'orderId' in data
    assert data['status'] == 'CREATED'
    assert data['total'] == 2000

13.3 ユニットテスト(モッキング)

# test_unit.py
import json
import pytest
from unittest.mock import MagicMock
from moto import mock_dynamodb, mock_sns

@mock_dynamodb
@mock_sns
def test_create_order_handler():
    """Lambdaハンドラーユニットテスト"""
    import boto3

    dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1')
    table = dynamodb.create_table(
        TableName='orders',
        KeySchema=[{'AttributeName': 'orderId', 'KeyType': 'HASH'}],
        AttributeDefinitions=[{'AttributeName': 'orderId', 'AttributeType': 'S'}],
        BillingMode='PAY_PER_REQUEST'
    )

    sns = boto3.client('sns', region_name='ap-northeast-1')
    topic = sns.create_topic(Name='order-events')

    import os
    os.environ['ORDERS_TABLE'] = 'orders'
    os.environ['ORDER_TOPIC'] = topic['TopicArn']

    from src.orders.create import handler

    event = {
        'body': json.dumps({
            'userId': 'user123',
            'items': [{'productId': 'p1', 'name': 'Test', 'quantity': 1, 'price': 1000}]
        })
    }

    context = MagicMock()
    context.aws_request_id = 'test-request-id'

    response = handler(event, context)

    assert response['statusCode'] == 201
    body = json.loads(response['body'])
    assert body['userId'] == 'user123'
    assert body['total'] == 1000

14. 実践(じっせん)アーキテクチャ例

14.1 Eコマース注文システム

クライアント
  |
  v
[API Gateway] --> [Lambda: 注文作成]
                     |
                     v
                  [DynamoDB: 注文保存]
                     |
                     v
                  [EventBridge: OrderCreated発行]
                     |
          +----------+----------+
          |          |          |
          v          v          v
     [Lambda:    [Lambda:    [Lambda:
      在庫確認]   決済処理]   通知送信]
          |          |
          v          v
     [DynamoDB]  [Stripe API]
          |
          v
     [Step Functions: 配送ワークフロー]

14.2 メディア処理パイプライン

[S3: 元ファイルアップロード]
  |
  v
[EventBridge: S3イベント]
  |
  v
[Step Functions: メディアパイプライン]
  |
  +-> [Lambda: メタデータ抽出]
  |
  +-> [Lambda: サムネイル生成]
  |
  +-> [Lambda: ビデオトランスコード開始]
  |      |
  |      v
  |   [MediaConvert]
  |      |
  |      v
  |   [Lambda: トランスコード完了処理]
  |
  +-> [Lambda: AIタグ付け(Rekognition)]
  |
  v
[DynamoDB: メタデータ保存]
  |
  v
[CloudFront: CDN配信]

15. クイズ

Q1. Lambdaのコールドスタートが最も長いランタイムは?

正解:Java(SnapStart未適用時)

JavaはJVM初期化、クラスローディング、JITコンパイルなどにより、コールドスタートが800msから8秒まで発生する。SnapStartを使用すると100-200msに大幅に短縮できる。RustとGoはネイティブバイナリにコンパイルされ、30-100ms水準である。

Q2. Step Functions StandardとExpressの主な違いは?

正解:

  • Standard:最大1年実行、Exactly-once、状態遷移ごとの課金、実行履歴90日保管
  • Express:最大5分実行、At-least-once、実行時間/メモリ課金、秒間100,000件以上処理可能

Standardは長期実行ビジネスワークフロー、Expressは大量・高速データ処理に適している。

Q3. Provisioned ConcurrencyとReserved Concurrencyの違いは?

正解:

  • Provisioned Concurrency:Lambdaインスタンスを事前に初期化しコールドスタートを除去。追加コスト発生
  • Reserved Concurrency:特定関数の最大同時実行数を制限。追加コストなし。他関数の同時実行数確保が目的

Provisionedはパフォーマンス保証、Reservedはリソース分離が目的。

Q4. DynamoDBシングルテーブル設計の長所と短所は?

正解:

長所:

  • 一つのクエリで複数エンティティを取得可能(低レイテンシー)
  • テーブル管理が単純
  • トランザクションコスト削減

短所:

  • アクセスパターンを事前に把握する必要がある
  • スキーマ変更が困難
  • 学習曲線が高い
  • データマイグレーションが複雑
Q5. Serverlessを選択すべきでない状況は?

正解:

  • 実行時間が15分を超える長期実行タスク
  • GPUが必要なML学習
  • 常時高トラフィックでコンテナの方がコスト効率的な場合
  • WebSocket等の長期接続が必要な場合
  • 非常に低いレイテンシーが必須の場合(コールドスタート許容不可)
  • 複雑なネットワーク構成が必要な場合

16. 参考資料(さんこうしりょう)

  1. AWS Lambda公式ドキュメント - https://docs.aws.amazon.com/lambda/
  2. AWS Step Functions開発者ガイド - https://docs.aws.amazon.com/step-functions/
  3. Serverless Application Model(SAM) - https://docs.aws.amazon.com/serverless-application-model/
  4. Lambda Powertools for Python - https://docs.powertools.aws.dev/lambda/python/
  5. DynamoDBシングルテーブル設計 - https://www.alexdebrie.com/posts/dynamodb-single-table/
  6. AWS Well-Architected Serverless Lens - https://docs.aws.amazon.com/wellarchitected/latest/serverless-applications-lens/
  7. Lambda Power Tuning - https://github.com/alexcasalboni/aws-lambda-power-tuning
  8. Serverless Land - https://serverlessland.com/
  9. EventBridgeパターン - https://docs.aws.amazon.com/eventbridge/latest/userguide/
  10. Aurora Serverless v2 - https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/aurora-serverless-v2.html
  11. API Gateway REST API - https://docs.aws.amazon.com/apigateway/latest/developerguide/
  12. X-Ray分散トレーシング - https://docs.aws.amazon.com/xray/latest/devguide/
  13. Serverless Framework - https://www.serverless.com/framework/docs/