On services sending or consuming messages, declare the supported types. For example:
kinesis, kafka, rabbitmq, sqs, sns, servicebus
Call the Data Streams Monitoring checkpoints when messages are produced and when they are consumed, as shown in the example code below:
importdatadog.trace.api.experimental.*;CarrierheadersAdapter=newCarrier(headers);// before calling produceDataStreamsCheckpointer.get().setProduceCheckpoint("<datastream-type>","<queue-or-topic-name>",headersAdapter);// after calling consumeDataStreamsCheckpointer.get().setConsumeCheckpoint("<datastream-type>","<queue-or-topic-name>",headersAdapter);// example: logging a kafka consume checkpoint on the 'customer-checkout' topic would look likeDataStreamsCheckpointer.get().setConsumeCheckpoint("kafka","customer-checkout",headersAdapter);// replace headers with whatever you're using to pass the contextprivateclassCarrierimplementsDataStreamsContextCarrier{privateHeadersheaders;publicCarrier(Headersheaders){this.headers=headers;}publicSet<Entry<String,Object>>entries(){returnthis.headers.entrySet();}publicvoidset(Stringkey,Stringvalue){this.headers.put(key,value);}}
consttracer=require('dd-trace').init({})// before calling produce
constheaders={}tracer.dataStreamsCheckpointer.setProduceCheckpoint("<datastream-type>","<queue-name>",headers)// after calling consume
tracer.dataStreamsCheckpointer.setConsumeCheckpoint("<datastream-type>","<queue-name>",headers)
fromddtrace.data_streamsimportset_consume_checkpointfromddtrace.data_streamsimportset_produce_checkpoint# before calling produceheaders={}set_produce_checkpoint("<datastream-type>","<datastream-name>",headers.setdefault)# after calling consumeset_consume_checkpoint("<datastream-type>","<datastream-name>",headers.get)
The following example propagates the trace context. See Trace Context Propagation for more information.
Note: In async operations, this may not work as expected because the context derived from the incoming message can be lost when producing a new message in different threads.
Producer configuration
usingDatadog.Trace;using(varscope=Tracer.Instance.StartActive("produce")){varheaders=newHeaders();varmsg=newMessage{Value="<ANY-VALUE>",Headers=headers};newSpanContextInjector().InjectIncludingDsm(msg.Headers,SetHeader,scope.Span.Context,messageType:"<DATASTREAM-TYPE>",target:"<QUEUE-OR-TOPIC-NAME>");// Produce the message}// Specific to how the header is modeledstaticvoidSetHeader(Headersheaders,stringkey,stringvalue){headers.Add(newHeader(key,value));}
Consumer configuration
usingDatadog.Trace;varstartTime=DateTimeOffset.UtcNow;varmsg=consumer.Consume();varparentContext=newSpanContextExtractor().ExtractIncludingDsm(msg.Headers,GetHeader,messageType:"<DATASTREAM-TYPE>",source:"<QUEUE-OR-TOPIC-NAME>");using(varscope=Tracer.Instance.StartActive("consume",newSpanCreationSettings,{Parent=parentContext,StartTime=startTime})){// Do something with the message}// Specific to how the header is modeledstaticIEnumerable<string?>GetHeader(Headersheaders,stringkey){yieldreturnheader.GetByKey(key);}
Further Reading
Additional helpful documentation, links, and articles: