This page is not yet available in Spanish. We are working on its translation. If you have any questions or feedback about our current translation project, feel free to reach out to us!
Data Streams Monitoring (DSM) propagates context through message headers. Use manual instrumentation to set up DSM if you are using:
a message queue technology that is not supported by DSM
a message queue technology without headers, such as Kinesis, or
On services sending or consuming messages, declare the supported types. For example:
kinesis, kafka, rabbitmq, sqs, sns, servicebus
Call the Data Streams Monitoring checkpoints when messages are produced and when they are consumed, as shown in the example code below:
importdatadog.trace.api.experimental.*;CarrierheadersAdapter=newCarrier(headers);// before calling produceDataStreamsCheckpointer.get().setProduceCheckpoint("<datastream-type>","<queue-or-topic-name>",headersAdapter);// after calling consumeDataStreamsCheckpointer.get().setConsumeCheckpoint("<datastream-type>","<queue-or-topic-name>",headersAdapter);// example: logging a kafka consume checkpoint on the 'customer-checkout' topic would look likeDataStreamsCheckpointer.get().setConsumeCheckpoint("kafka","customer-checkout",headersAdapter);// replace headers with whatever you're using to pass the contextprivateclassCarrierimplementsDataStreamsContextCarrier{privateHeadersheaders;publicCarrier(Headersheaders){this.headers=headers;}publicSet<Entry<String,Object>>entries(){returnthis.headers.entrySet();}publicvoidset(Stringkey,Stringvalue){this.headers.put(key,value);}}
consttracer=require('dd-trace').init({})// before calling produce
constheaders={}tracer.dataStreamsCheckpointer.setProduceCheckpoint("<datastream-type>","<queue-name>",headers)// after calling consume
tracer.dataStreamsCheckpointer.setConsumeCheckpoint("<datastream-type>","<queue-name>",headers)
fromddtrace.data_streamsimportset_consume_checkpointfromddtrace.data_streamsimportset_produce_checkpoint# before calling produceheaders={}set_produce_checkpoint("<datastream-type>","<datastream-name>",headers.setdefault)# after calling consumeset_consume_checkpoint("<datastream-type>","<datastream-name>",headers.get)
The following example propagates the trace context. See Trace Context Propagation for more information.
Note: In async operations, this may not work as expected because the context derived from the incoming message can be lost when producing a new message in different threads.
Producer configuration
usingDatadog.Trace;using(varscope=Tracer.Instance.StartActive("produce")){varheaders=newHeaders();varmsg=newMessage{Value="<ANY-VALUE>",Headers=headers};newSpanContextInjector().InjectIncludingDsm(msg.Headers,SetHeader,scope.Span.Context,messageType:"<DATASTREAM-TYPE>",target:"<QUEUE-OR-TOPIC-NAME>");// Produce the message}// Specific to how the header is modeledstaticvoidSetHeader(Headersheaders,stringkey,stringvalue){headers.Add(newHeader(key,value));}
Consumer configuration
usingDatadog.Trace;varstartTime=DateTimeOffset.UtcNow;varmsg=consumer.Consume();varparentContext=newSpanContextExtractor().ExtractIncludingDsm(msg.Headers,GetHeader,messageType:"<DATASTREAM-TYPE>",source:"<QUEUE-OR-TOPIC-NAME>");using(varscope=Tracer.Instance.StartActive("consume",newSpanCreationSettings,{Parent=parentContext,StartTime=startTime})){// Do something with the message}// Specific to how the header is modeledstaticIEnumerable<string?>GetHeader(Headersheaders,stringkey){yieldreturnheader.GetByKey(key);}