Data Streams Monitoring for Go のセットアップ
前提条件
Data Streams Monitoring を開始するには、Datadog Agent と Data Streams Monitoring ライブラリの最新バージョンが必要です。
インストール
2 種類のインスツルメンテーションが用意されています。
- Kafka ベースのワークロードのためのインスツルメンテーション
- その他のキューイング技術やプロトコルのためのカスタムインスツルメンテーション
Confluent Kafka クライアント
import (
ddkafka "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka.v2"
)
...
// このラッパーでプロデューサーを作成します
producer, err := ddkafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())
サービスがある地点からデータを消費し、別の地点にデータを生成する場合、Go コンテキスト構造を使用して、2つの地点間でコンテキストを伝播します。
ヘッダーからコンテキストを抽出します。
ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
下流にデータを生成する前に、そのコンテキストをヘッダーに注入します。
datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
Sarama Kafka クライアント
import (
ddsarama "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama"
)
...
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{bootStrapServers}, config)
// この行を追加します
producer = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())
手動インスツルメンテーション
また、手動インスツルメンテーションを使用することもできます。例えば、Kinesis を介してコンテキストを伝搬させることができます。
produce 呼び出しのインスツルメンテーション
- メッセージが TextMapWriter インターフェースをサポートしていることを確認します。
- コンテキストをメッセージに注入し、以下を呼び出して produce 呼び出しをインスツルメントします。
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
if ok {
datastreams.InjectToBase64Carrier(ctx, message)
}
consume 呼び出しのインスツルメンテーション
- メッセージが TextMapReader インターフェースをサポートしていることを確認します。
- メッセージからコンテキストを抽出し、以下を呼び出して consume 呼び出しをインスツルメントします。
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")