- 필수 기능
- 시작하기
- Glossary
- 표준 속성
- Guides
- Agent
- 통합
- 개방형텔레메트리
- 개발자
- API
- Datadog Mobile App
- CoScreen
- Cloudcraft
- 앱 내
- 서비스 관리
- 인프라스트럭처
- 애플리케이션 성능
- APM
- Continuous Profiler
- 스팬 시각화
- 데이터 스트림 모니터링
- 데이터 작업 모니터링
- 디지털 경험
- 소프트웨어 제공
- 보안
- AI Observability
- 로그 관리
- 관리
Supported OS
This check monitors Flink. Datadog collects Flink metrics through Flink’s Datadog HTTP Reporter, which uses Datadog’s HTTP API.
The Flink check is included in the Datadog Agent package. No additional installation is needed on your server.
Configure the Datadog HTTP Reporter in Flink.
In your <FLINK_HOME>/conf/flink-conf.yaml
, add these lines, replacing <DATADOG_API_KEY>
with your Datadog API key:
metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
metrics.reporter.dghttp.apikey: <DATADOG_API_KEY>
metrics.reporter.dghttp.dataCenter: US #(optional) The data center (EU/US) to connect to, defaults to US.
Re-map system scopes in your <FLINK_HOME>/conf/flink-conf.yaml
.
metrics.scope.jm: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
metrics.scope.tm: flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator
Note: The system scopes must be remapped for your Flink metrics to be supported, otherwise they are submitted as custom metrics.
Configure additional tags in <FLINK_HOME>/conf/flink-conf.yaml
. Here is an example of custom tags:
metrics.reporter.dghttp.scope.variables.additional: <KEY1>:<VALUE1>, <KEY1>:<VALUE2>
Note: By default, any variables in metric names are sent as tags, so there is no need to add custom tags for job_id
, task_id
, etc.
Restart Flink to start sending your Flink metrics to Datadog.
Available for Agent >6.0
Flink uses the log4j
logger by default. To enable logging to a file, customize the format by editing the log4j*.properties
configuration files in the conf/
directory of the Flink distribution. See the Flink logging documentation for information on which configuration file is relevant for your setup. See Flink’s repository for default configurations.
By default, the integration pipeline supports the following layout pattern:
%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
An example of a valid timestamp is: 2020-02-03 18:43:12,251
.
Clone and edit the integration pipeline if you have a different format.
Collecting logs is disabled by default in the Datadog Agent, enable it in your datadog.yaml
file:
logs_enabled: true
Uncomment and edit the logs configuration block in your flink.d/conf.yaml
file. Change the path
and service
parameter values based on your environment. See the sample flink.d/conf.yaml for all available configuration options.
logs:
- type: file
path: /var/log/flink/server.log
source: flink
service: myapp
#To handle multi line that starts with yyyy-mm-dd use the following pattern
#log_processing_rules:
# - type: multi_line
# pattern: \d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])
# name: new_log_start_with_date
Run the Agent’s status subcommand and look for flink
under the Checks section.
flink.jobmanager.Status.JVM.CPU.Load (gauge) | The recent CPU usage of the JVM in the jobmanager Shown as percent |
flink.jobmanager.Status.JVM.CPU.Time (gauge) | The CPU time used by the JVM in the jobmanager Shown as second |
flink.jobmanager.Status.JVM.ClassLoader.ClassesLoaded (count) | The total number of classes loaded since the start of the JVM in the jobmanager |
flink.jobmanager.Status.JVM.ClassLoader.ClassesUnloaded (count) | The total number of classes unloaded since the start of the JVM in the jobmanager |
flink.jobmanager.Status.JVM.Memory.Direct.Count (count) | The number of buffers in the direct buffer pool in the jobmanager Shown as buffer |
flink.jobmanager.Status.JVM.Memory.Direct.MemoryUsed (gauge) | The amount of memory used by the JVM for the direct buffer pool in the jobmanager Shown as byte |
flink.jobmanager.Status.JVM.Memory.Direct.TotalCapacity (count) | The total capacity of all buffers in the direct buffer pool in the jobmanager Shown as byte |
flink.jobmanager.Status.JVM.Memory.Heap.Committed (gauge) | The amount of heap memory guaranteed to be available to the JVM in the jobmanager Shown as byte |
flink.jobmanager.Status.JVM.Memory.Heap.Max (gauge) | The maximum amount of heap memory that can be used for memory management in the jobmanager Shown as byte |
flink.jobmanager.Status.JVM.Memory.Heap.Used (gauge) | The amount of heap memory currently used in the jobmanager Shown as byte |
flink.jobmanager.Status.JVM.Memory.Mapped.Count (gauge) | The number of buffers in the mapped buffer pool in the jobmanager Shown as buffer |
flink.jobmanager.Status.JVM.Memory.Mapped.MemoryUsed (gauge) | The amount of memory used by the JVM for the mapped buffer pool in the jobmanager Shown as byte |
flink.jobmanager.Status.JVM.Memory.Mapped.TotalCapacity (count) | The total capacity of all buffers in the mapped buffer pool in the jobmanager Shown as byte |
flink.jobmanager.Status.JVM.Memory.NonHeap.Committed (gauge) | The amount of non-heap memory guaranteed to be available to the JVM in the jobmanager Shown as byte |
flink.jobmanager.Status.JVM.Memory.NonHeap.Max (gauge) | The maximum amount of non-heap memory that can be used for memory management in the jobmanager Shown as byte |
flink.jobmanager.Status.JVM.Memory.NonHeap.Used (gauge) | The amount of non-heap memory currently used in the jobmanager Shown as byte |
flink.jobmanager.Status.JVM.Threads.Count (count) | The total number of live threads in the jobmanager Shown as thread |
flink.jobmanager.job.downtime (gauge) | For jobs currently in a failing/recovering situation- the time elapsed during this outage. Returns 0 for running jobs and -1 for completed jobs Shown as millisecond |
flink.jobmanager.job.lastCheckpointAlignmentBuffered (gauge) | The number of buffered bytes during alignment over all subtasks for the last checkpoint Shown as byte |
flink.jobmanager.job.lastCheckpointDuration (gauge) | The time it took to complete the last checkpoint Shown as millisecond |
flink.jobmanager.job.lastCheckpointExternalPath (gauge) | The path where the last external checkpoint was stored |
flink.jobmanager.job.lastCheckpointRestoreTimestamp (gauge) | Timestamp when the last checkpoint was restored at the coordinator Shown as millisecond |
flink.jobmanager.job.lastCheckpointSize (gauge) | The total size of the last checkpoint Shown as byte |
flink.jobmanager.job.numRestarts (gauge) | The total number of restarts since this job was submitted, including full restarts and fine-grained restarts |
flink.jobmanager.job.numberOfCompletedCheckpoints (count) | The number of successfully completed checkpoints |
flink.jobmanager.job.numberOfFailedCheckpoints (count) | The number of failed checkpoints |
flink.jobmanager.job.numberOfInProgressCheckpoints (gauge) | The number of in progress checkpoints |
flink.jobmanager.job.restartingTime (gauge) | The time it took to restart the job or how long the current restart has been in progress Shown as millisecond |
flink.jobmanager.job.totalNumberOfCheckpoints (count) | The number of total checkpoints (in progress completed and failed) |
flink.jobmanager.job.uptime (gauge) | The time that the job has been running without interruption. Returns -1 for completed jobs Shown as millisecond |
flink.jobmanager.numRegisteredTaskManagers (gauge) | The number of registered taskmanagers |
flink.jobmanager.numRunningJobs (gauge) | The number of running jobs Shown as job |
flink.jobmanager.taskSlotsTotal (gauge) | The total number of task slots |
flink.operator.commitsFailed (count) | The total number of offset commit failures to Kafka if offset committing is turned on and checkpointing is enabled. Note that committing offsets back to Kafka is only a means to expose consumer progress so a commit failure does not affect the integrity of Flink's checkpointed partition offsets Shown as commit |
flink.operator.commitsSucceeded (count) | The total number of successful offset commits to Kafka if offset committing is turned on and checkpointing is enabled Shown as commit |
flink.operator.currentInput1Watermark (gauge) | The last watermark this operator has received in its first input. Only for operators with 2 inputs Shown as millisecond |
flink.operator.currentInput2Watermark (gauge) | The last watermark this operator has received in its second input. Only for operators with 2 inputs Shown as millisecond |
flink.operator.currentInputWatermark (gauge) | The last watermark this operator has received. For tasks with 2 inputs this is the minimum of the last received watermarks Shown as millisecond |
flink.operator.currentOutputWatermark (gauge) | The last watermark this operator has emitted Shown as millisecond |
flink.operator.numLateRecordsDropped (count) | The number of records this operator has dropped due to arriving late Shown as record |
flink.operator.numRecordsIn (count) | The total number of records this operator has received Shown as record |
flink.operator.numRecordsInPerSecond (gauge) | The number of records this operator receives per second Shown as record |
flink.operator.numRecordsOut (count) | The total number of records this operator has emitted Shown as record |
flink.operator.numRecordsOutPerSec (gauge) | The total number of records this operator has emitted per second Shown as record |
flink.operator.numSplitsProcessed (count) | The total number of InputSplits this data source has processed (if the operator is a data source) |
flink.task.Shuffle.Netty.Input.Buffers.inPoolUsage (gauge) | An estimate of the input buffers usage |
flink.task.Shuffle.Netty.Input.Buffers.inputQueueLength (gauge) | The number of queued input buffers Shown as buffer |
flink.task.Shuffle.Netty.Input.numBuffersInLocal (count) | The total number of network buffers this task has read from a local source Shown as buffer |
flink.task.Shuffle.Netty.Input.numBuffersInLocalPerSecond (gauge) | The number of network buffers this task reads from a local source per second |
flink.task.Shuffle.Netty.Input.numBuffersInRemote (count) | The total number of network buffers this task has read from a remote source Shown as buffer |
flink.task.Shuffle.Netty.Input.numBuffersInRemotePerSecond (gauge) | The number of network buffers this task reads from a remote source per second Shown as buffer |
flink.task.Shuffle.Netty.Input.numBytesInLocal (count) | The total number of bytes this task has read from a local source Shown as byte |
flink.task.Shuffle.Netty.Input.numBytesInLocalPerSecond (gauge) | The number of bytes this task reads from a local source per second Shown as byte |
flink.task.Shuffle.Netty.Input.numBytesInRemote (count) | The total number of bytes this task has read from a remote source Shown as byte |
flink.task.Shuffle.Netty.Input.numBytesInRemotePerSecond (gauge) | The number of bytes this task reads from a remote source per second Shown as byte |
flink.task.Shuffle.Netty.Output.Buffers.outPoolUsage (gauge) | An estimate of the output buffers usage |
flink.task.Shuffle.Netty.Output.Buffers.outputQueueLength (gauge) | The number of queued output buffers Shown as buffer |
flink.task.checkpointAlignmentTime (gauge) | The time in nanoseconds that the last barrier alignment took to complete or how long the current alignment has taken so far Shown as nanosecond |
flink.task.currentInputWatermark (gauge) | The last watermark this task has received. For tasks with 2 inputs this is the minimum of the last received watermarks Shown as millisecond |
flink.task.numBuffersOut (count) | The total number of network buffers this task has emitted Shown as buffer |
flink.task.numBuffersOutPerSecond (gauge) | The number of network buffers this task emits per second Shown as buffer |
flink.task.numBytesOut (count) | The total number of bytes this task has emitted Shown as byte |
flink.task.numBytesOutPerSecond (gauge) | The number of bytes this task emits per second Shown as byte |
flink.task.numLateRecordsDropped (count) | The number of records this task has dropped due to arriving late Shown as record |
flink.task.numRecordsIn (count) | The total number of records this task has received Shown as record |
flink.task.numRecordsInPerSecond (gauge) | The number of records this task receives per second Shown as record |
flink.task.numRecordsOut (count) | The total number of records this task has emitted Shown as record |
flink.task.numRecordsOutPerSec (gauge) | The total number of records this task has emitted per second Shown as record |
flink.taskmanager.Status.JVM.CPU.Load (gauge) | The recent CPU usage of the JVM in the taskmanager Shown as percent |
flink.taskmanager.Status.JVM.CPU.Time (gauge) | The CPU time used by the JVM in the taskmanager Shown as second |
flink.taskmanager.Status.JVM.ClassLoader.ClassesLoaded (count) | The total number of classes loaded since the start of the JVM in the taskmanager |
flink.taskmanager.Status.JVM.ClassLoader.ClassesUnloaded (count) | The total number of classes unloaded since the start of the JVM in the taskmanager |
flink.taskmanager.Status.JVM.Memory.Direct.Count (gauge) | The number of buffers in the direct buffer pool in the taskmanager Shown as buffer |
flink.taskmanager.Status.JVM.Memory.Direct.MemoryUsed (gauge) | The amount of memory used by the JVM for the direct buffer pool in the taskmanager Shown as byte |
flink.taskmanager.Status.JVM.Memory.Direct.TotalCapacity (count) | The total capacity of all buffers in the direct buffer pool in the taskmanager Shown as byte |
flink.taskmanager.Status.JVM.Memory.Heap.Committed (gauge) | The amount of heap memory guaranteed to be available to the JVM in the taskmanager Shown as byte |
flink.taskmanager.Status.JVM.Memory.Heap.Max (gauge) | The maximum amount of heap memory that can be used for memory management in the taskmanager Shown as byte |
flink.taskmanager.Status.JVM.Memory.Heap.Used (gauge) | The amount of heap memory currently used in the taskmanager Shown as byte |
flink.taskmanager.Status.JVM.Memory.Mapped.Count (gauge) | The number of buffers in the mapped buffer pool in the taskmanager |
flink.taskmanager.Status.JVM.Memory.Mapped.MemoryUsed (gauge) | The amount of memory used by the JVM for the mapped buffer pool in the taskmanager Shown as byte |
flink.taskmanager.Status.JVM.Memory.Mapped.TotalCapacity (count) | The total capacity of all buffers in the mapped buffer pool in the taskmanager Shown as byte |
flink.taskmanager.Status.JVM.Memory.NonHeap.Committed (gauge) | The amount of non-heap memory guaranteed to be available to the JVM in the taskmanager Shown as byte |
flink.taskmanager.Status.JVM.Memory.NonHeap.Max (gauge) | The maximum amount of non-heap memory that can be used for memory management in the taskmanager Shown as byte |
flink.taskmanager.Status.JVM.Memory.NonHeap.Used (gauge) | The amount of non-heap memory currently used in the taskmanager Shown as byte |
flink.taskmanager.Status.JVM.Threads.Count (count) | The total number of live threads in the taskmanager Shown as thread |
flink.taskmanager.Status.Shuffle.Netty.AvailableMemorySegments (gauge) | The number of unused memory segments in the taskmanager |
flink.taskmanager.Status.Shuffle.Netty.TotalMemorySegments (gauge) | The number of allocated memory segments in the taskmanager |
Flink does not include any service checks.
Flink does not include any events.
Need help? Contact Datadog support.