- 필수 기능
- 시작하기
- Glossary
- 표준 속성
- Guides
- Agent
- 통합
- 개방형텔레메트리
- 개발자
- Administrator's Guide
- API
- Datadog Mobile App
- CoScreen
- Cloudcraft
- 앱 내
- 서비스 관리
- 인프라스트럭처
- 애플리케이션 성능
- APM
- Continuous Profiler
- 스팬 시각화
- 데이터 스트림 모니터링
- 데이터 작업 모니터링
- 디지털 경험
- 소프트웨어 제공
- 보안
- AI Observability
- 로그 관리
- 관리
Data Streams Monitoring (DSM) propagates context through message headers. Use manual instrumentation to set up DSM if you are using:
Ensure you’re using the Datadog Agent v7.34.0 or later.
On services sending or consuming messages, declare the supported types. For example:
kinesis, kafka, rabbitmq, sqs, sns, servicebus
Call the Data Streams Monitoring checkpoints when messages are produced and when they are consumed, as shown in the example code below:
import datadog.trace.api.experimental.*;
Carrier headersAdapter = new Carrier(headers);
// before calling produce
DataStreamsCheckpointer.get().setProduceCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
// after calling consume
DataStreamsCheckpointer.get().setConsumeCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
// example: logging a kafka consume checkpoint on the 'customer-checkout' topic would look like
DataStreamsCheckpointer.get().setConsumeCheckpoint("kafka", "customer-checkout", headersAdapter);
// replace headers with whatever you're using to pass the context
private class Carrier implements DataStreamsContextCarrier {
private Headers headers;
public Carrier(Headers headers) {
this.headers = headers;
}
public Set<Entry<String, Object>> entries() {
return this.headers.entrySet();
}
public void set(String key, String value){
this.headers.put(key, value);
}
}
const tracer = require('dd-trace').init({})
// before calling produce
const headers = {}
tracer.dataStreamsCheckpointer.setProduceCheckpoint(
"<datastream-type>", "<queue-name>", headers
)
// after calling consume
tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
"<datastream-type>", "<queue-name>", headers
)
from ddtrace.data_streams import set_consume_checkpoint
from ddtrace.data_streams import set_produce_checkpoint
# before calling produce
headers = {}
set_produce_checkpoint(
"<datastream-type>", "<datastream-name>", headers.setdefault
)
# after calling consume
set_consume_checkpoint(
"<datastream-type>", "<datastream-name>", headers.get
)
The following example propagates the trace context. See Trace Context Propagation for more information.
using Datadog.Trace;
using (var scope = Tracer.Instance.StartActive("produce"))
{
var headers = new Headers();
var msg = new Message { Value = "<ANY-VALUE>", Headers = headers};
new SpanContextInjector().InjectIncludingDsm(
msg.Headers,
SetHeader,
scope.Span.Context,
messageType: "<DATASTREAM-TYPE>",
target: "<QUEUE-OR-TOPIC-NAME>"
);
// Produce the message
}
// Specific to how the header is modeled
static void SetHeader(Headers headers, string key, string value)
{
headers.Add(new Header(key, value));
}
using Datadog.Trace;
var startTime = DateTimeOffset.UtcNow;
var msg = consumer.Consume();
var parentContext = new SpanContextExtractor().ExtractIncludingDsm(
msg.Headers,
GetHeader,
messageType: "<DATASTREAM-TYPE>",
source: "<QUEUE-OR-TOPIC-NAME>"
);
using (var scope = Tracer.Instance.StartActive("consume",
new SpanCreationSettings,
{
Parent = parentContext,
StartTime = startTime
})
)
{
// Do something with the message
}
// Specific to how the header is modeled
static IEnumerable<string?> GetHeader(Headers headers, string key)
{
yield return header.GetByKey(key);
}
추가 유용한 문서, 링크 및 기사: