見出し画像

Rust on Lambda (Container image) を X-Ray でトレーシングする

Lambda (Container image) で動く Rust アプリケーションのボトルネック分析をした際に、X-Ray を使ってトレーシングしてみたのでその設定について紹介したいと思います。

トレーシング

アプリケーションの挙動を知りたい場合、テキストとして何らかの情報をログに出力することで分析ができますが、トレーシングを利用することでより便利に分析ができます。例えば非同期実行されるアプリケーションの場合、ログの出力順序は意味をなさなくなり、連続するログは同じコンテキスト(Web サーバーの場合は同じ HTTP リクエストの結果なのかどうか)を保有しているとは限りません。一方で、トレーシングを利用することでコンテキストを保持したままログを出力することができます (tokio のブログにも似たようなことが記載されているので参考にしてください)。
また、複数のアプリケーションが関係するシステムの場合は、それぞれのアプリケーションにて同じトレーシングの情報を共有することで、どのようにそれぞれのアプリケーションが連動したのかを追跡することができます。

Rust では tracing クレートを利用することでトレーシングを実行できます。また、tracing-opentelemetry クレートを利用することで OpenTelemetry に互換性のあるシステム(例えば jaeger など)に対してトレース情報を出力することができます。AWS では AWS Distro for OpenTelemetry Collector を提供しており、アプリケーションからトレーシングデータを受信して X-Ray に送信することができます。これらの関係を図で表すと下記のようになります。

トレーシングデータの流れ

Lambda コード

Lambda 関数のコードは以下のようになります。

use aws_config::meta::region::RegionProviderChain;
use aws_sdk_dynamodb::Client;
use lambda_runtime::{service_fn, Error, LambdaEvent};
use lambda_tracing_xray::xray::{XRayExtractor, XRayIdGenerator};
use serde_json::{json, Value};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;

#[tokio::main]
async fn main() -> Result<(), Error> {
    // enable xray tracing support
    opentelemetry::global::set_text_map_propagator(opentelemetry_aws::XrayPropagator::new());
    let tracer = opentelemetry_otlp::new_pipeline()
        .tracing()
        .with_exporter(opentelemetry_otlp::new_exporter().tonic())
        .with_trace_config(
            opentelemetry_sdk::trace::config()
                .with_sampler(opentelemetry_sdk::trace::Sampler::AlwaysOn)
                .with_id_generator(XRayIdGenerator::default()),
        )
        .install_batch(opentelemetry::runtime::Tokio)?;
    let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

    // register Subscriber
    let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()?;
    let subscriber = tracing_subscriber::Registry::default()
        .with(env_filter)
        .with(telemetry);
    tracing::subscriber::set_global_default(subscriber)?;

    // invoke handler
    lambda_runtime::run(service_fn(handler)).await?;

    // emit remaining spans
    opentelemetry::global::shutdown_tracer_provider();
    Ok(())
}

async fn handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
    let xray_trace_id = std::env::var("_X_AMZN_TRACE_ID")?;
    let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| {
        propagator.extract(&XRayExtractor {
            xray_trace_id: xray_trace_id.to_owned(),
        })
    });

    let mut span = tracing::info_span!("handler");
    span.set_parent(parent_context);
    let _guard = span.entered();

    let (event, _ctx) = event.into_parts();
    let first_name = event["firstName"].as_str().unwrap_or("world");
    let message = hello(first_name).await?;

    Ok(json!({ "message": message }))
}

#[tracing::instrument()]
async fn hello(first_name: &str) -> Result<String, Error> {
    ddb_get(String::from("12345")).await?;
    Ok(format!("Hello, {}!", first_name))
}

#[tracing::instrument(err)]
async fn ddb_get(id: String) -> Result<String, Error> {
    let region_provider = RegionProviderChain::default_provider().or_else("ap-northeast-1");
    let config = aws_config::from_env().region(region_provider).load().await;
    let client = Client::new(&config);
    let req = client
        .get_item()
        .table_name("lambda-tracing-xray")
        .key("id", aws_sdk_dynamodb::model::AttributeValue::S(id));
    let resp = req.send().await.unwrap();
    Ok(resp
        .item()
        .unwrap()
        .get("id")
        .unwrap()
        .as_s()
        .unwrap()
        .to_string())
}

上記では lambda-runtime を利用して Lambda 上で Rust のコードを動かしています。Lambda のハンドラ関数は handler 関数で、main 関数ではトレーシング機能の設定・有効化をしています。

main 関数ではトレーシングデータを OpenTelemetry 互換のデータ・AWS X-Ray に送信するためのデータへと加工し、AWS X-Ray にデータを送信する Layer を作成しています。ここで、XRayIdGenerator という構造体を利用していますが、こちらは下記のようなものとなります。

use opentelemetry_api::trace::{SpanId, TraceId};
use opentelemetry_sdk::trace::{IdGenerator, RandomIdGenerator};

#[derive(Debug, Default)]
pub struct XRayIdGenerator {
    sdk_default_generator: RandomIdGenerator,
}

const HEADER_ROOT_KEY: &str = "Root";

impl IdGenerator for XRayIdGenerator {
    /// Generates `TraceId` from _X_AMZN_TRACE_ID
    fn new_trace_id(&self) -> TraceId {
        let xray_trace_id = std::env::var("_X_AMZN_TRACE_ID").unwrap();
        let parts: Vec<(&str, &str)> = xray_trace_id
            .split_terminator(';')
            .filter_map(from_key_value_pair)
            .collect();

        let mut trace_id: TraceId = TraceId::INVALID;

        for (key, value) in parts {
            match key {
                HEADER_ROOT_KEY => {
                    let converted_trace_id: Result<TraceId, ()> =
                        XrayTraceId(value.to_string()).try_into();
                    match converted_trace_id {
                        Err(_) => return trace_id,
                        Ok(parsed) => trace_id = parsed,
                    }
                }
                _ => (),
            }
        }
        trace_id
    }

    fn new_span_id(&self) -> SpanId {
        self.sdk_default_generator.new_span_id()
    }
}

fn from_key_value_pair(pair: &str) -> Option<(&str, &str)> {
    let mut key_value_pair: Option<(&str, &str)> = None;

    if let Some(index) = pair.find('=') {
        let (key, value) = pair.split_at(index);
        key_value_pair = Some((key, value.trim_start_matches('=')));
    }
    key_value_pair
}

#[derive(Clone, Debug, PartialEq)]
struct XrayTraceId(String);

impl TryFrom<XrayTraceId> for TraceId {
    type Error = ();

    fn try_from(id: XrayTraceId) -> Result<Self, Self::Error> {
        let parts: Vec<&str> = id.0.split_terminator('-').collect();

        if parts.len() != 3 {
            return Err(());
        }

        let trace_id: TraceId =
            TraceId::from_hex(format!("{}{}", parts[1], parts[2]).as_str()).map_err(|_| ())?;

        if trace_id == TraceId::INVALID {
            Err(())
        } else {
            Ok(trace_id)
        }
    }
}

やっていることとしては、Span の生成時に TraceId を Lambda の環境変数である _X_AMZN_TRACE_ID から生成しています。opentelemetry クレートにある XrayIdGenerator では TraceId がランダムなものとなり、Lambda が生成するトレーシング情報と統合されません。その後、複数の Layer をまとめて Subscriber を作成し、それをデフォルトのものとして登録します。これでトレーシングのための初期化は完了です。

handler 関数でも、Lambda が生成するトレーシング情報と統合するために、下記のようなXRayExtractor という構造体を独自に定義して利用しています。_X_AMZN_TRACE_ID から親となる Span の ID を抽出して、handler 関数内で Span を作成しています。

use opentelemetry::propagation::Extractor;


pub struct XRayExtractor {
    pub xray_trace_id: String,
}

impl Extractor for XRayExtractor {
    fn get(&self, _key: &str) -> Option<&str> {
        Some(&self.xray_trace_id)
    }

    fn keys(&self) -> Vec<&str> {
        todo!()
    }
}


hello 関数や ddb_get 関数のように tracing::instrument() を使って Span を作成してトレーシングデータを収集しています。tracing::instrument() 自体に様々な引数を渡すことで、トレーシング時の挙動を変更することもできます。例えば、ddb_get 関数には err という引数を渡しており、Error が発生した場合にトレーシングに Error の情報を挿入してくれます。こちらのドキュメントにその他の引数が記載されています。

AWS Distro for OpenTelemetry Collector の導入

前述したように、トレーシングデータを AWS X-Ray に送信するには、AWS Distro for OpenTelemetry Collector を Lambda 上にインストールする必要があり、AWS Distro for OpenTelemetry Collector 自体を Lambda レイヤーとして設定する必要があります。しかしながら、今回ボトルネック分析をしたようにコンテナイメージとして動く Lambda アプリケーションの場合 Lambda レイヤーを利用できません。なので、こちらの AWS ブログにあるようにコンテナイメージの中に AWS Distro for OpenTelemetry Collector をパッケージングしてあげる必要があり、Dockerfile は以下のようになります(cargo-chef を利用しています)。

FROM public.ecr.aws/lambda/provided:al2 AS chef 

# Set AWS credentials
ARG AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION:-"ap-northeast-1"}
ARG AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-""}
ARG AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-""}
ARG AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN:-""}
ENV AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
ENV AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
ENV AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
ENV AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN}

# Install dependencies
RUN yum update -y && yum install -y gcc unzip
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" \
    && unzip awscliv2.zip \
    && ./aws/install
# protoc bin for opentelemetry-otlp
RUN curl -LO "https://github.com/protocolbuffers/protobuf/releases/download/v3.15.8/protoc-3.15.8-linux-x86_64.zip" \
    && unzip protoc-3.15.8-linux-x86_64.zip -d /tmp/.local \
    && cp /tmp/.local/bin/protoc /usr/local/bin/


# Install AWS Distro for OpenTelemetry Collector
RUN mkdir -p /opt
RUN curl $(aws lambda get-layer-version-by-arn --arn arn:aws:lambda:ap-northeast-1:901920570463:layer:aws-otel-collector-amd64-ver-0-58-0:1 --query 'Content.Location' --output text) --output layer.zip
RUN unzip layer.zip -d /opt
RUN rm layer.zip

# Install rust
ARG RUST_VERSION=1.62.0
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUST_VERSION
ENV PATH $PATH:/root/.cargo/bin
RUN cargo install cargo-chef
WORKDIR app

# plan cargo-chef
FROM chef AS planner
COPY . .
RUN cargo chef prepare --recipe-path recipe.json

# Build application
FROM chef AS builder
COPY --from=planner /var/task/app/recipe.json recipe.json
RUN cargo chef cook --release --recipe-path recipe.json
COPY . .
RUN cargo build --release

# Runtime
FROM public.ecr.aws/lambda/provided:al2 AS runtime
WORKDIR app
COPY --from=builder /var/task/app/target/release/lambda-tracing-xray /usr/local/bin
COPY --from=builder /opt /opt
ENTRYPOINT ["/usr/local/bin/lambda-tracing-xray"]

20 行目 ~ 24 行目で、AWS Distro for OpenTelemetry Collector のレイヤの中身をダウンロードして /opt/extensions ディレクトリへと格納しています。
Lambda の仕様として、/opt/extensions ディレクトリに格納されたものは拡張機能として Lambda の初期化時に読み込まれます。また、イメージのビルド時には AWS の認証情報が必要となるので、下記の docker build コマンドのように AWS 認証情報を設定します。

docker build \
    --build-arg AWS_ACCESS_KEY_ID="XXXXXXX" \
    --build-arg AWS_SECRET_ACCESS_KEY="YYYYYY" \
    --build-arg AWS_SESSION_TOKEN="ZZZZZZ" \
    -t lambda-tracing-xray \
.

また、Lambda 上で「AWS X-Ray 」>「アクティブトレース」を有効化する必要があります。この設定を有効化すると、AWS X-Ray へデータを送信するのに必要なパーミッションが Lambda の実行ロールに付与されます。

AWS コンソールより

AWS X-Ray でトレーシングデータを確認する

上記 Lambda をデプロイし、実行してみると CloudWatch のコンソールにて下記のように Lambda 実行時のデータを確認できます。

CloudWatch のコンソールより

関数全体の実行時間は 2.28 秒かかっており、Lambda の初期化処理で 1.5 秒、実際に Lambda 関数(handler 関数)の実行時間は 377 msec で、hello 関数の実行にも 376 msec かかっているが、実際には DynamoDB をコールしている ddb_get 関数によるものであることがわかります。
また、下記のように「メタデータ」の箇所を開くと ddb_get 関数に渡された引数の詳細も確認することができます。


まとめ

Rust on Lambda (Container image) にて AWS X-Ray を用いて、トレーシング情報を収集することができました。今度は複数のサービスを用いて分散トレーシングを試してみたいと思います。


この記事が気に入ったらサポートをしてみませんか?