Set up Data Streams Monitoring through Manual Instrumentation

이 페이지는 아직 한국어로 제공되지 않으며 번역 작업 중입니다. 번역에 관한 질문이나 의견이 있으시면 언제든지 저희에게 연락해 주십시오.

Data Streams Monitoring (DSM) propagates context through message headers. Use manual instrumentation to set up DSM if you are using:

  • a message queue technology that is not supported by DSM
  • a message queue technology without headers, such as Kinesis, or
  • Lambdas

Manual instrumentation installation

  1. Ensure you’re using the Datadog Agent v7.34.0 or later.

  2. On services sending or consuming messages, declare the supported types. For example:

    kinesis, kafka, rabbitmq, sqs, sns, servicebus

  3. Call the Data Streams Monitoring checkpoints when messages are produced and when they are consumed, as shown in the example code below:

    import datadog.trace.api.experimental.*;
    
    Carrier headersAdapter = new Carrier(headers);
    
    // before calling produce
    DataStreamsCheckpointer.get().setProduceCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
    
    // after calling consume
    DataStreamsCheckpointer.get().setConsumeCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
    
    // example: logging a kafka consume checkpoint on the 'customer-checkout' topic would look like
    DataStreamsCheckpointer.get().setConsumeCheckpoint("kafka", "customer-checkout", headersAdapter);
    
    // replace headers with whatever you're using to pass the context
    private class Carrier implements DataStreamsContextCarrier {
    	private Headers headers;
    	
    	public Carrier(Headers headers) {
    		this.headers = headers;
    	}
    
    	public Set<Entry<String, Object>> entries() {
    		return this.headers.entrySet();
    	}
    
    	public void set(String key, String value){
    		this.headers.put(key, value);
    	}
    }
    const tracer = require('dd-trace').init({})
    
    // before calling produce
    const headers = {}
    tracer.dataStreamsCheckpointer.setProduceCheckpoint(
    "<datastream-type>", "<queue-name>", headers
    )
    
    // after calling consume
    tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
    "<datastream-type>", "<queue-name>", headers
    )
    from ddtrace.data_streams import set_consume_checkpoint
    from ddtrace.data_streams import set_produce_checkpoint
    
    # before calling produce
    headers = {}
    set_produce_checkpoint(
    "<datastream-type>", "<datastream-name>", headers.setdefault
    )
    
    # after calling consume
    set_consume_checkpoint(
    "<datastream-type>", "<datastream-name>", headers.get
    )

    The following example propagates the trace context. See Trace Context Propagation for more information.

    Note: In async operations, this may not work as expected because the context derived from the incoming message can be lost when producing a new message in different threads.

    Producer configuration

    using Datadog.Trace;
    
    using (var scope = Tracer.Instance.StartActive("produce"))
    {
        var headers = new Headers();
        var msg = new Message { Value = "<ANY-VALUE>", Headers = headers};
    
        new SpanContextInjector().InjectIncludingDsm(
            msg.Headers,
            SetHeader,
            scope.Span.Context,
            messageType: "<DATASTREAM-TYPE>",
            target: "<QUEUE-OR-TOPIC-NAME>"
        );
    
        // Produce the message
    }
    
    // Specific to how the header is modeled
    static void SetHeader(Headers headers, string key, string value)
    {
        headers.Add(new Header(key, value));
    }

    Consumer configuration

    using Datadog.Trace;
    
    var startTime = DateTimeOffset.UtcNow;
    var msg = consumer.Consume();
    var parentContext = new SpanContextExtractor().ExtractIncludingDsm(
        msg.Headers,
        GetHeader,
        messageType: "<DATASTREAM-TYPE>",
        source: "<QUEUE-OR-TOPIC-NAME>"
    );
    
    using (var scope = Tracer.Instance.StartActive("consume",
           new SpanCreationSettings,
           {
               Parent = parentContext,
               StartTime = startTime
           })
    )
    {
        // Do something with the message
    }
    
    // Specific to how the header is modeled
    static IEnumerable<string?> GetHeader(Headers headers, string key)
    {
        yield return header.GetByKey(key);
    }

Further Reading

추가 유용한 문서, 링크 및 기사:

PREVIEWING: sadhbh-a/gcp_guide