Setup Data Streams Monitoring for Go
Prerequisites
To start with Data Streams Monitoring, you need recent versions of the Datadog Agent and Data Streams Monitoring libraries:
Installation
Two types of instrumentation are available:
- Instrumentation for Kafka-based workloads
- Custom instrumentation for any other queuing technology or protocol
Confluent Kafka client
import (
ddkafka "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka.v2"
)
...
// CREATE PRODUCER WITH THIS WRAPPER
producer, err := ddkafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": bootStrapServers,
}, ddkafka.WithDataStreams())
If a service consumes data from one point and produces to another point, propagate context between the two places using the Go context structure:
Extract the context from headers:
ctx = datastreams.ExtractFromBase64Carrier(ctx, ddsarama.NewConsumerMessageCarrier(message))
Inject it into the header before producing downstream:
datastreams.InjectToBase64Carrier(ctx, ddsarama.NewProducerMessageCarrier(message))
Sarama Kafka client
import (
ddsarama "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama.v1"
)
...
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{bootStrapServers}, config)
// ADD THIS LINE
producer = ddsarama.WrapAsyncProducer(config, producer, ddsarama.WithDataStreams())
Manual instrumentation
You can also use manual instrumentation. For example, you can propagate context through Kinesis.
Instrumenting the produce call
- Ensure your message supports the TextMapWriter interface.
- Inject the context into your message and instrument the produce call by calling:
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, "direction:out", "type:kinesis", "topic:kinesis_arn")
if ok {
datastreams.InjectToBase64Carrier(ctx, message)
}
Instrumenting the consume call
- Ensure your message supports the TextMapReader interface.
- Extract the context from your message and instrument the consume call by calling:
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), message), options.CheckpointParams{PayloadSize: payloadSize}, "direction:in", "type:kinesis", "topic:kinesis_arn")