• 指标
    • Registering metrics
      • Metric types
        • Counter
        • Gauge
        • Histogram
        • Meter
    • Scope
      • User Scope
      • System Scope
      • List of all Variables
      • User Variables
    • Reporter
      • JMX (org.apache.flink.metrics.jmx.JMXReporter)
      • Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)
      • InfluxDB (org.apache.flink.metrics.influxdb.InfluxdbReporter)
      • Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)
      • PrometheusPushGateway (org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
      • StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
      • Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
      • Slf4j (org.apache.flink.metrics.slf4j.Slf4jReporter)
    • System metrics
      • CPU
      • Memory
      • Threads
      • GarbageCollection
      • ClassLoader
      • Network (Deprecated: use Default shuffle service metrics)
      • Default shuffle service
      • Cluster
      • Availability
      • Checkpointing
      • RocksDB
      • IO
      • Connectors
        • Kafka Connectors
        • Kinesis Connectors
      • System resources
        • System CPU
        • System memory
        • System network
    • Latency tracking
    • REST API integration
    • Dashboard integration

    指标

    Flink exposes a metric system that allows gathering and exposing metrics to external systems.

    • Registering metrics
      • Metric types
    • Scope
      • User Scope
      • System Scope
      • List of all Variables
      • User Variables
    • Reporter
      • JMX (org.apache.flink.metrics.jmx.JMXReporter)
      • Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)
      • InfluxDB (org.apache.flink.metrics.influxdb.InfluxdbReporter)
      • Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)
      • PrometheusPushGateway (org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
      • StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
      • Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
      • Slf4j (org.apache.flink.metrics.slf4j.Slf4jReporter)
    • System metrics
      • CPU
      • Memory
      • Threads
      • GarbageCollection
      • ClassLoader
      • Network (Deprecated: use Default shuffle service metrics)
      • Default shuffle service
      • Cluster
      • Availability
      • Checkpointing
      • RocksDB
      • IO
      • Connectors
      • System resources
    • Latency tracking
    • REST API integration
    • Dashboard integration

    Registering metrics

    You can access the metric system from any user function that extends RichFunction by calling getRuntimeContext().getMetricGroup().This method returns a MetricGroup object on which you can create and register new metrics.

    Metric types

    Flink supports Counters, Gauges, Histograms and Meters.

    Counter

    A Counter is used to count something. The current value can be in- or decremented using inc()/inc(long n) or dec()/dec(long n).You can create and register a Counter by calling counter(String name) on a MetricGroup.

    1. public class MyMapper extends RichMapFunction<String, String> {
    2. private transient Counter counter;
    3. @Override
    4. public void open(Configuration config) {
    5. this.counter = getRuntimeContext()
    6. .getMetricGroup()
    7. .counter("myCounter");
    8. }
    9. @Override
    10. public String map(String value) throws Exception {
    11. this.counter.inc();
    12. return value;
    13. }
    14. }
    1. class MyMapper extends RichMapFunction[String,String] {
    2. @transient private var counter: Counter = _
    3. override def open(parameters: Configuration): Unit = {
    4. counter = getRuntimeContext()
    5. .getMetricGroup()
    6. .counter("myCounter")
    7. }
    8. override def map(value: String): String = {
    9. counter.inc()
    10. value
    11. }
    12. }

    Alternatively you can also use your own Counter implementation:

    1. public class MyMapper extends RichMapFunction<String, String> {
    2. private transient Counter counter;
    3. @Override
    4. public void open(Configuration config) {
    5. this.counter = getRuntimeContext()
    6. .getMetricGroup()
    7. .counter("myCustomCounter", new CustomCounter());
    8. }
    9. @Override
    10. public String map(String value) throws Exception {
    11. this.counter.inc();
    12. return value;
    13. }
    14. }
    1. class MyMapper extends RichMapFunction[String,String] {
    2. @transient private var counter: Counter = _
    3. override def open(parameters: Configuration): Unit = {
    4. counter = getRuntimeContext()
    5. .getMetricGroup()
    6. .counter("myCustomCounter", new CustomCounter())
    7. }
    8. override def map(value: String): String = {
    9. counter.inc()
    10. value
    11. }
    12. }

    Gauge

    A Gauge provides a value of any type on demand. In order to use a Gauge you must first create a class that implements the org.apache.flink.metrics.Gauge interface.There is no restriction for the type of the returned value.You can register a gauge by calling gauge(String name, Gauge gauge) on a MetricGroup.

    1. public class MyMapper extends RichMapFunction<String, String> {
    2. private transient int valueToExpose = 0;
    3. @Override
    4. public void open(Configuration config) {
    5. getRuntimeContext()
    6. .getMetricGroup()
    7. .gauge("MyGauge", new Gauge<Integer>() {
    8. @Override
    9. public Integer getValue() {
    10. return valueToExpose;
    11. }
    12. });
    13. }
    14. @Override
    15. public String map(String value) throws Exception {
    16. valueToExpose++;
    17. return value;
    18. }
    19. }
    1. new class MyMapper extends RichMapFunction[String,String] {
    2. @transient private var valueToExpose = 0
    3. override def open(parameters: Configuration): Unit = {
    4. getRuntimeContext()
    5. .getMetricGroup()
    6. .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
    7. }
    8. override def map(value: String): String = {
    9. valueToExpose += 1
    10. value
    11. }
    12. }

    Note that reporters will turn the exposed object into a String, which means that a meaningful toString() implementation is required.

    Histogram

    A Histogram measures the distribution of long values.You can register one by calling histogram(String name, Histogram histogram) on a MetricGroup.

    1. public class MyMapper extends RichMapFunction<Long, Long> {
    2. private transient Histogram histogram;
    3. @Override
    4. public void open(Configuration config) {
    5. this.histogram = getRuntimeContext()
    6. .getMetricGroup()
    7. .histogram("myHistogram", new MyHistogram());
    8. }
    9. @Override
    10. public Long map(Long value) throws Exception {
    11. this.histogram.update(value);
    12. return value;
    13. }
    14. }
    1. class MyMapper extends RichMapFunction[Long,Long] {
    2. @transient private var histogram: Histogram = _
    3. override def open(parameters: Configuration): Unit = {
    4. histogram = getRuntimeContext()
    5. .getMetricGroup()
    6. .histogram("myHistogram", new MyHistogram())
    7. }
    8. override def map(value: Long): Long = {
    9. histogram.update(value)
    10. value
    11. }
    12. }

    Flink does not provide a default implementation for Histogram, but offers a Wrapper that allows usage of Codahale/DropWizard histograms.To use this wrapper add the following dependency in your pom.xml:

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-metrics-dropwizard</artifactId>
    4. <version>1.9.0</version>
    5. </dependency>

    You can then register a Codahale/DropWizard histogram like this:

    1. public class MyMapper extends RichMapFunction<Long, Long> {
    2. private transient Histogram histogram;
    3. @Override
    4. public void open(Configuration config) {
    5. com.codahale.metrics.Histogram dropwizardHistogram =
    6. new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
    7. this.histogram = getRuntimeContext()
    8. .getMetricGroup()
    9. .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
    10. }
    11. @Override
    12. public Long map(Long value) throws Exception {
    13. this.histogram.update(value);
    14. return value;
    15. }
    16. }
    1. class MyMapper extends RichMapFunction[Long, Long] {
    2. @transient private var histogram: Histogram = _
    3. override def open(config: Configuration): Unit = {
    4. com.codahale.metrics.Histogram dropwizardHistogram =
    5. new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
    6. histogram = getRuntimeContext()
    7. .getMetricGroup()
    8. .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
    9. }
    10. override def map(value: Long): Long = {
    11. histogram.update(value)
    12. value
    13. }
    14. }

    Meter

    A Meter measures an average throughput. An occurrence of an event can be registered with the markEvent() method. Occurrence of multiple events at the same time can be registered with markEvent(long n) method.You can register a meter by calling meter(String name, Meter meter) on a MetricGroup.

    1. public class MyMapper extends RichMapFunction<Long, Long> {
    2. private transient Meter meter;
    3. @Override
    4. public void open(Configuration config) {
    5. this.meter = getRuntimeContext()
    6. .getMetricGroup()
    7. .meter("myMeter", new MyMeter());
    8. }
    9. @Override
    10. public Long map(Long value) throws Exception {
    11. this.meter.markEvent();
    12. return value;
    13. }
    14. }
    1. class MyMapper extends RichMapFunction[Long,Long] {
    2. @transient private var meter: Meter = _
    3. override def open(config: Configuration): Unit = {
    4. meter = getRuntimeContext()
    5. .getMetricGroup()
    6. .meter("myMeter", new MyMeter())
    7. }
    8. override def map(value: Long): Long = {
    9. meter.markEvent()
    10. value
    11. }
    12. }

    Flink offers a Wrapper that allows usage of Codahale/DropWizard meters.To use this wrapper add the following dependency in your pom.xml:

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-metrics-dropwizard</artifactId>
    4. <version>1.9.0</version>
    5. </dependency>

    You can then register a Codahale/DropWizard meter like this:

    1. public class MyMapper extends RichMapFunction<Long, Long> {
    2. private transient Meter meter;
    3. @Override
    4. public void open(Configuration config) {
    5. com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
    6. this.meter = getRuntimeContext()
    7. .getMetricGroup()
    8. .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
    9. }
    10. @Override
    11. public Long map(Long value) throws Exception {
    12. this.meter.markEvent();
    13. return value;
    14. }
    15. }
    1. class MyMapper extends RichMapFunction[Long,Long] {
    2. @transient private var meter: Meter = _
    3. override def open(config: Configuration): Unit = {
    4. com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()
    5. meter = getRuntimeContext()
    6. .getMetricGroup()
    7. .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
    8. }
    9. override def map(value: Long): Long = {
    10. meter.markEvent()
    11. value
    12. }
    13. }

    Scope

    Every metric is assigned an identifier and a set of key-value pairs under which the metric will be reported.

    The identifier is based on 3 components: a user-defined name when registering the metric, an optional user-defined scope and a system-provided scope.For example, if A.B is the system scope, C.D the user scope and E the name, then the identifier for the metric will be A.B.C.D.E.

    You can configure which delimiter to use for the identifier (default: .) by setting the metrics.scope.delimiter key in conf/flink-conf.yaml.

    User Scope

    You can define a user scope by calling MetricGroup#addGroup(String name), MetricGroup#addGroup(int name) or MetricGroup#addGroup(String key, String value).These methods affect what MetricGroup#getMetricIdentifier and MetricGroup#getScopeComponents return.

    1. counter = getRuntimeContext()
    2. .getMetricGroup()
    3. .addGroup("MyMetrics")
    4. .counter("myCounter");
    5. counter = getRuntimeContext()
    6. .getMetricGroup()
    7. .addGroup("MyMetricsKey", "MyMetricsValue")
    8. .counter("myCounter");
    1. counter = getRuntimeContext()
    2. .getMetricGroup()
    3. .addGroup("MyMetrics")
    4. .counter("myCounter")
    5. counter = getRuntimeContext()
    6. .getMetricGroup()
    7. .addGroup("MyMetricsKey", "MyMetricsValue")
    8. .counter("myCounter")

    System Scope

    The system scope contains context information about the metric, for example in which task it was registered or what job that task belongs to.

    Which context information should be included can be configured by setting the following keys in conf/flink-conf.yaml.Each of these keys expect a format string that may contain constants (e.g. “taskmanager”) and variables (e.g. “<task_id>”) which will be replaced at runtime.

    • metrics.scope.jm
      • Default: .jobmanager
      • Applied to all metrics that were scoped to a job manager.
    • metrics.scope.jm.job
      • Default: .jobmanager.
      • Applied to all metrics that were scoped to a job manager and job.
    • metrics.scope.tm
      • Default: .taskmanager.
      • Applied to all metrics that were scoped to a task manager.
    • metrics.scope.tm.job
      • Default: .taskmanager..
      • Applied to all metrics that were scoped to a task manager and job.
    • metrics.scope.task
      • Default: .taskmanager....
      • Applied to all metrics that were scoped to a task.
    • metrics.scope.operator
      • Default: .taskmanager....
      • Applied to all metrics that were scoped to an operator.There are no restrictions on the number or order of variables. Variables are case sensitive.

    The default scope for operator metrics will result in an identifier akin to localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric

    If you also want to include the task name but omit the task manager information you can specify the following format:

    metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

    This could create the identifier localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric.

    Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data.As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g <job_id>)or by assigning unique names to jobs and operators.

    List of all Variables

    • JobManager:
    • TaskManager: ,
    • Job: ,
    • Task: , , , ,
    • Operator: ,, Important: For the Batch API, <operator_id> is always equal to <task_id>.

    User Variables

    You can define a user variable by calling MetricGroup#addGroup(String key, String value).This method affects what MetricGroup#getMetricIdentifier, MetricGroup#getScopeComponents and MetricGroup#getAllVariables() returns.

    Important: User variables cannot be used in scope formats.

    1. counter = getRuntimeContext()
    2. .getMetricGroup()
    3. .addGroup("MyMetricsKey", "MyMetricsValue")
    4. .counter("myCounter");
    1. counter = getRuntimeContext()
    2. .getMetricGroup()
    3. .addGroup("MyMetricsKey", "MyMetricsValue")
    4. .counter("myCounter")

    Reporter

    Metrics can be exposed to an external system by configuring one or several reporters in conf/flink-conf.yaml. Thesereporters will be instantiated on each job and task manager when they are started.

    • metrics.reporter.<name>.<config>: Generic setting <config> for the reporter named <name>.
    • metrics.reporter.<name>.class: The reporter class to use for the reporter named <name>.
    • metrics.reporter.<name>.factory.class: The reporter factory class to use for the reporter named <name>.
    • metrics.reporter.<name>.interval: The reporter interval to use for the reporter named <name>.
    • metrics.reporter.<name>.scope.delimiter: The delimiter to use for the identifier (default value use metrics.scope.delimiter) for the reporter named <name>.
    • metrics.reporters: (optional) A comma-separated include list of reporter names. By default all configured reporters will be used.

    All reporters must at least have either the class or factory.class property. Which property may/should be used depends on the reporter implementation. See the individual reporter configuration sections for more information.Some reporters (referred to as Scheduled) allow specifying a reporting interval.Below more settings specific to each reporter will be listed.

    Example reporter configuration that specifies multiple reporters:

    1. metrics.reporters: my_jmx_reporter,my_other_reporter
    2. metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
    3. metrics.reporter.my_jmx_reporter.port: 9020-9040
    4. metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
    5. metrics.reporter.my_other_reporter.host: 192.168.1.1
    6. metrics.reporter.my_other_reporter.port: 10000

    Important: The jar containing the reporter must be accessible when Flink is started by placing it in the /lib folder.

    You can write your own Reporter by implementing the org.apache.flink.metrics.reporter.MetricReporter interface.If the Reporter should send out reports regularly you have to implement the Scheduled interface as well.

    The following sections list the supported reporters.

    You don’t have to include an additional dependency since the JMX reporter is available by defaultbut not activated.

    Parameters:

    • port - (optional) the port on which JMX listens for connections.In order to be able to run several instances of the reporter on one host (e.g. when one TaskManager is colocated with the JobManager) it is advisable to use a port range like 9250-9260.When a range is specified the actual port is shown in the relevant job or task manager log.If this setting is set Flink will start an extra JMX connector for the given port/range.Metrics are always available on the default local JMX interface.

    Example configuration:

    1. metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
    2. metrics.reporter.jmx.port: 8789

    Metrics exposed through JMX are identified by a domain and a list of key-properties, which together form the object name.

    The domain always begins with org.apache.flink followed by a generalized metric identifier. In contrast to the usualidentifier it is not affected by scope-formats, does not contain any variables and is constant across jobs.An example for such a domain would be org.apache.flink.job.task.numBytesOut.

    The key-property list contains the values for all variables, regardless of configured scope formats, that are associatedwith a given metric.An example for such a list would be host=localhost,job_name=MyJob,task_name=MyTask.

    The domain thus identifies a metric class, while the key-property list identifies one (or multiple) instances of that metric.

    In order to use this reporter you must copy /opt/flink-metrics-graphite-1.9.0.jar into the /lib folderof your Flink distribution.

    Parameters:

    • host - the Graphite server host
    • port - the Graphite server port
    • protocol - protocol to use (TCP/UDP)

    Example configuration:

    1. metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
    2. metrics.reporter.grph.host: localhost
    3. metrics.reporter.grph.port: 2003
    4. metrics.reporter.grph.protocol: TCP

    In order to use this reporter you must copy /opt/flink-metrics-influxdb-1.9.0.jar into the /lib folderof your Flink distribution.

    Parameters:

    • host - the InfluxDB server host
    • port - (optional) the InfluxDB server port, defaults to 8086
    • db - the InfluxDB database to store metrics
    • username - (optional) InfluxDB username used for authentication
    • password - (optional) InfluxDB username’s password used for authentication
    • retentionPolicy - (optional) InfluxDB retention policy, defaults to retention policy defined on the server for the db

    Example configuration:

    1. metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
    2. metrics.reporter.influxdb.host: localhost
    3. metrics.reporter.influxdb.port: 8086
    4. metrics.reporter.influxdb.db: flink
    5. metrics.reporter.influxdb.username: flink-metrics
    6. metrics.reporter.influxdb.password: qwerty
    7. metrics.reporter.influxdb.retentionPolicy: one_hour

    The reporter would send metrics using http protocol to the InfluxDB server with the specified retention policy (or the default policy specified on the server).All Flink metrics variables (see List of all Variables) are exported as InfluxDB tags.

    In order to use this reporter you must copy /opt/flink-metrics-prometheus_2.11-1.9.0.jar into the /lib folderof your Flink distribution.

    Parameters:

    • port - (optional) the port the Prometheus exporter listens on, defaults to 9249. In order to be able to run several instances of the reporter on one host (e.g. when one TaskManager is colocated with the JobManager) it is advisable to use a port range like 9250-9260.
    • filterLabelValueCharacters - (optional) Specifies whether to filter label value characters. If enabled, all characters not matching [a-zA-Z0-9:_] will be removed, otherwise no characters will be removed. Before disabling this option please ensure that your label values meet the Prometheus requirements.

    Example configuration:

    1. metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

    Flink metric types are mapped to Prometheus metric types as follows:

    FlinkPrometheusNote
    CounterGaugePrometheus counters cannot be decremented.
    GaugeGaugeOnly numbers and booleans are supported.
    HistogramSummaryQuantiles .5, .75, .95, .98, .99 and .999
    MeterGaugeThe gauge exports the meter’s rate.

    All Flink metrics variables (see List of all Variables) are exported to Prometheus as labels.

    In order to use this reporter you must copy /opt/flink-metrics-prometheus-1.9.0.jar into the /lib folderof your Flink distribution.

    Parameters:

    KeyDefaultDescription
    ##### deleteOnShutdowntrueSpecifies whether to delete metrics from the PushGateway on shutdown.
    ##### filterLabelValueCharacterstrueSpecifies whether to filter label value characters. If enabled, all characters not matching [a-zA-Z0-9:_] will be removed, otherwise no characters will be removed. Before disabling this option please ensure that your label values meet the Prometheus requirements.
    ##### host(none)The PushGateway server host.
    ##### jobName(none)The job name under which metrics will be pushed
    ##### port-1The PushGateway server port.
    ##### randomJobNameSuffixtrueSpecifies whether a random suffix should be appended to the job name.

    Example configuration:

    1. metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
    2. metrics.reporter.promgateway.host: localhost
    3. metrics.reporter.promgateway.port: 9091
    4. metrics.reporter.promgateway.jobName: myJob
    5. metrics.reporter.promgateway.randomJobNameSuffix: true
    6. metrics.reporter.promgateway.deleteOnShutdown: false

    The PrometheusPushGatewayReporter pushes metrics to a Pushgateway, which can be scraped by Prometheus.

    Please see the Prometheus documentation for use-cases.

    In order to use this reporter you must copy /opt/flink-metrics-statsd-1.9.0.jar into the /lib folderof your Flink distribution.

    Parameters:

    • host - the StatsD server host
    • port - the StatsD server port

    Example configuration:

    1. metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
    2. metrics.reporter.stsd.host: localhost
    3. metrics.reporter.stsd.port: 8125

    In order to use this reporter you must copy /opt/flink-metrics-datadog-1.9.0.jar into the /lib folderof your Flink distribution.

    Note any variables in Flink metrics, such as <host>, <job_name>, <tm_id>, <subtask_index>, <task_name>, and <operator_name>,will be sent to Datadog as tags. Tags will look like host:localhost and job_name:myjobname.

    Parameters:

    • apikey - the Datadog API key
    • tags - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only
    • proxyHost - (optional) The proxy host to use when sending to Datadog.
    • proxyPort - (optional) The proxy port to use when sending to Datadog, defaults to 8080.

    Example configuration:

    1. metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
    2. metrics.reporter.dghttp.apikey: xxx
    3. metrics.reporter.dghttp.tags: myflinkapp,prod
    4. metrics.reporter.dghttp.proxyHost: my.web.proxy.com
    5. metrics.reporter.dghttp.proxyPort: 8080

    In order to use this reporter you must copy /opt/flink-metrics-slf4j-1.9.0.jar into the /lib folderof your Flink distribution.

    Example configuration:

    1. metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
    2. metrics.reporter.slf4j.interval: 60 SECONDS

    System metrics

    By default Flink gathers several metrics that provide deep insights on the current state.This section is a reference of all these metrics.

    The tables below generally feature 5 columns:

    • The “Scope” column describes which scope format is used to generate the system scope.For example, if the cell contains “Operator” then the scope format for “metrics.scope.operator” is used.If the cell contains multiple values, separated by a slash, then the metrics are reported multipletimes for different entities, like for both job- and taskmanagers.

    • The (optional)”Infix” column describes which infix is appended to the system scope.

    • The “Metrics” column lists the names of all metrics that are registered for the given scope and infix.

    • The “Description” column provides information as to what a given metric is measuring.

    • The “Type” column describes which metric type is used for the measurement.

    Note that all dots in the infix/metric name columns are still subject to the “metrics.delimiter” setting.

    Thus, in order to infer the metric identifier:

    • Take the scope-format based on the “Scope” column
    • Append the value in the “Infix” column if present, and account for the “metrics.delimiter” setting
    • Append metric name.

    CPU

    ScopeInfixMetricsDescriptionType
    Job-/TaskManagerStatus.JVM.CPULoadThe recent CPU usage of the JVM.Gauge
    TimeThe CPU time used by the JVM.Gauge

    Memory

    ScopeInfixMetricsDescriptionType
    Job-/TaskManagerStatus.JVM.MemoryHeap.UsedThe amount of heap memory currently used (in bytes).Gauge
    Heap.CommittedThe amount of heap memory guaranteed to be available to the JVM (in bytes).Gauge
    Heap.MaxThe maximum amount of heap memory that can be used for memory management (in bytes).Gauge
    NonHeap.UsedThe amount of non-heap memory currently used (in bytes).Gauge
    NonHeap.CommittedThe amount of non-heap memory guaranteed to be available to the JVM (in bytes).Gauge
    NonHeap.MaxThe maximum amount of non-heap memory that can be used for memory management (in bytes).Gauge
    Direct.CountThe number of buffers in the direct buffer pool.Gauge
    Direct.MemoryUsedThe amount of memory used by the JVM for the direct buffer pool (in bytes).Gauge
    Direct.TotalCapacityThe total capacity of all buffers in the direct buffer pool (in bytes).Gauge
    Mapped.CountThe number of buffers in the mapped buffer pool.Gauge
    Mapped.MemoryUsedThe amount of memory used by the JVM for the mapped buffer pool (in bytes).Gauge
    Mapped.TotalCapacityThe number of buffers in the mapped buffer pool (in bytes).Gauge

    Threads

    ScopeInfixMetricsDescriptionType
    Job-/TaskManagerStatus.JVM.ThreadsCountThe total number of live threads.Gauge

    GarbageCollection

    ScopeInfixMetricsDescriptionType
    Job-/TaskManagerStatus.JVM.GarbageCollector<GarbageCollector>.CountThe total number of collections that have occurred.Gauge
    <GarbageCollector>.TimeThe total time spent performing garbage collection.Gauge

    ClassLoader

    ScopeInfixMetricsDescriptionType
    Job-/TaskManagerStatus.JVM.ClassLoaderClassesLoadedThe total number of classes loaded since the start of the JVM.Gauge
    ClassesUnloadedThe total number of classes unloaded since the start of the JVM.Gauge

    Network (Deprecated: use Default shuffle service metrics)

    ScopeInfixMetricsDescriptionType
    TaskManagerStatus.NetworkAvailableMemorySegmentsThe number of unused memory segments.Gauge
    TotalMemorySegmentsThe number of allocated memory segments.Gauge
    TaskbuffersinputQueueLengthThe number of queued input buffers. (ignores LocalInputChannels which are using blocking subpartitions)Gauge
    outputQueueLengthThe number of queued output buffers.Gauge
    inPoolUsageAn estimate of the input buffers usage. (ignores LocalInputChannels)Gauge
    inputFloatingBuffersUsageAn estimate of the floating input buffers usage, dedicated for credit-based mode. (ignores LocalInputChannels)Gauge
    inputExclusiveBuffersUsageAn estimate of the exclusive input buffers usage, dedicated for credit-based mode. (ignores LocalInputChannels)Gauge
    outPoolUsageAn estimate of the output buffers usage.Gauge
    Network.<Input|Output>.<gate|partition>(only available if taskmanager.net.detailed-metrics config option is set)totalQueueLenTotal number of queued buffers in all input/output channels.Gauge
    minQueueLenMinimum number of queued buffers in all input/output channels.Gauge
    maxQueueLenMaximum number of queued buffers in all input/output channels.Gauge
    avgQueueLenAverage number of queued buffers in all input/output channels.Gauge

    Default shuffle service

    Metrics related to data exchange between task executors using netty network communication.

    ScopeInfixMetricsDescriptionType
    TaskManagerStatus.Shuffle.NettyAvailableMemorySegmentsThe number of unused memory segments.Gauge
    TotalMemorySegmentsThe number of allocated memory segments.Gauge
    TaskShuffle.Netty.Input.BuffersinputQueueLengthThe number of queued input buffers.Gauge
    inPoolUsageAn estimate of the input buffers usage.Gauge
    Shuffle.Netty.Output.BuffersoutputQueueLengthThe number of queued output buffers.Gauge
    outPoolUsageAn estimate of the output buffers usage.Gauge
    Shuffle.Netty.<Input|Output>.<gate|partition>(only available if taskmanager.net.detailed-metrics config option is set)totalQueueLenTotal number of queued buffers in all input/output channels.Gauge
    minQueueLenMinimum number of queued buffers in all input/output channels.Gauge
    maxQueueLenMaximum number of queued buffers in all input/output channels.Gauge
    avgQueueLenAverage number of queued buffers in all input/output channels.Gauge
    TaskShuffle.Netty.InputnumBytesInLocalThe total number of bytes this task has read from a local source.Counter
    numBytesInLocalPerSecondThe number of bytes this task reads from a local source per second.Meter
    numBytesInRemoteThe total number of bytes this task has read from a remote source.Counter
    numBytesInRemotePerSecondThe number of bytes this task reads from a remote source per second.Meter
    numBuffersInLocalThe total number of network buffers this task has read from a local source.Counter
    numBuffersInLocalPerSecondThe number of network buffers this task reads from a local source per second.Meter
    numBuffersInRemoteThe total number of network buffers this task has read from a remote source.Counter
    numBuffersInRemotePerSecondThe number of network buffers this task reads from a remote source per second.Meter

    Cluster

    ScopeMetricsDescriptionType
    JobManagernumRegisteredTaskManagersThe number of registered taskmanagers.Gauge
    numRunningJobsThe number of running jobs.Gauge
    taskSlotsAvailableThe number of available task slots.Gauge
    taskSlotsTotalThe total number of task slots.Gauge

    Availability

    ScopeMetricsDescriptionType
    Job (only available on JobManager)restartingTimeThe time it took to restart the job, or how long the current restart has been in progress (in milliseconds).Gauge
    uptime The time that the job has been running without interruption.Returns -1 for completed jobs (in milliseconds).Gauge
    downtime For jobs currently in a failing/recovering situation, the time elapsed during this outage.Returns 0 for running jobs and -1 for completed jobs (in milliseconds).Gauge
    fullRestarts The total number of full restarts since this job was submitted. Attention: Since 1.9.2, this metric also includes fine-grained restarts. Gauge

    Checkpointing

    ScopeMetricsDescriptionType
    Job (only available on JobManager)lastCheckpointDurationThe time it took to complete the last checkpoint (in milliseconds).Gauge
    lastCheckpointSizeThe total size of the last checkpoint (in bytes).Gauge
    lastCheckpointExternalPathThe path where the last external checkpoint was stored.Gauge
    lastCheckpointRestoreTimestampTimestamp when the last checkpoint was restored at the coordinator (in milliseconds).Gauge
    lastCheckpointAlignmentBufferedThe number of buffered bytes during alignment over all subtasks for the last checkpoint (in bytes).Gauge
    numberOfInProgressCheckpointsThe number of in progress checkpoints.Gauge
    numberOfCompletedCheckpointsThe number of successfully completed checkpoints.Gauge
    numberOfFailedCheckpointsThe number of failed checkpoints.Gauge
    totalNumberOfCheckpointsThe number of total checkpoints (in progress, completed, failed).Gauge
    TaskcheckpointAlignmentTimeThe time in nanoseconds that the last barrier alignment took to complete, or how long the current alignment has taken so far (in nanoseconds).Gauge

    RocksDB

    Certain RocksDB native metrics are available but disabled by default, you can find full documentation here

    IO

    ScopeMetricsDescriptionType
    Job (only available on TaskManager)<source_id>.<source_subtask_index>.<operator_id>.<operator_subtask_index>.latencyThe latency distributions from a given source subtask to an operator subtask (in milliseconds).Histogram
    TasknumBytesInLocalAttention: deprecated, use Default shuffle service metrics.Counter
    numBytesInLocalPerSecondAttention: deprecated, use Default shuffle service metrics.Meter
    numBytesInRemoteAttention: deprecated, use Default shuffle service metrics.Counter
    numBytesInRemotePerSecondAttention: deprecated, use Default shuffle service metrics.Meter
    numBuffersInLocalAttention: deprecated, use Default shuffle service metrics.Counter
    numBuffersInLocalPerSecondAttention: deprecated, use Default shuffle service metrics.Meter
    numBuffersInRemoteAttention: deprecated, use Default shuffle service metrics.Counter
    numBuffersInRemotePerSecondAttention: deprecated, use Default shuffle service metrics.Meter
    numBytesOutThe total number of bytes this task has emitted.Counter
    numBytesOutPerSecondThe number of bytes this task emits per second.Meter
    numBuffersOutThe total number of network buffers this task has emitted.Counter
    numBuffersOutPerSecondThe number of network buffers this task emits per second.Meter
    Task/OperatornumRecordsInThe total number of records this operator/task has received.Counter
    numRecordsInPerSecondThe number of records this operator/task receives per second.Meter
    numRecordsOutThe total number of records this operator/task has emitted.Counter
    numRecordsOutPerSecondThe number of records this operator/task sends per second.Meter
    numLateRecordsDroppedThe number of records this operator/task has dropped due to arriving late.Counter
    currentInputWatermark The last watermark this operator/tasks has received (in milliseconds).Note: For operators/tasks with 2 inputs this is the minimum of the last received watermarks.Gauge
    OperatorcurrentInput1Watermark The last watermark this operator has received in its first input (in milliseconds).Note: Only for operators with 2 inputs.Gauge
    currentInput2Watermark The last watermark this operator has received in its second input (in milliseconds).Note: Only for operators with 2 inputs.Gauge
    currentOutputWatermark The last watermark this operator has emitted (in milliseconds). Gauge
    numSplitsProcessedThe total number of InputSplits this data source has processed (if the operator is a data source).Gauge

    Connectors

    Kafka Connectors

    ScopeMetricsUser VariablesDescriptionType
    OperatorcommitsSucceededn/aThe total number of successful offset commits to Kafka, if offset committing is turned on and checkpointing is enabled.Counter
    OperatorcommitsFailedn/aThe total number of offset commit failures to Kafka, if offset committing is turned on and checkpointing is enabled. Note that committing offsets back to Kafka is only a means to expose consumer progress, so a commit failure does not affect the integrity of Flink's checkpointed partition offsets.Counter
    OperatorcommittedOffsetstopic, partitionThe last successfully committed offsets to Kafka, for each partition. A particular partition's metric can be specified by topic name and partition id.Gauge
    OperatorcurrentOffsetstopic, partitionThe consumer's current read offset, for each partition. A particular partition's metric can be specified by topic name and partition id.Gauge

    Kinesis Connectors

    ScopeMetricsUser VariablesDescriptionType
    OperatormillisBehindLateststream, shardIdThe number of milliseconds the consumer is behind the head of the stream, indicating how far behind current time the consumer is, for each Kinesis shard. A particular shard's metric can be specified by stream name and shard id. A value of 0 indicates record processing is caught up, and there are no new records to process at this moment. A value of -1 indicates that there is no reported value for the metric, yet. Gauge
    OperatorsleepTimeMillisstream, shardIdThe number of milliseconds the consumer spends sleeping before fetching records from Kinesis. A particular shard's metric can be specified by stream name and shard id. Gauge
    OperatormaxNumberOfRecordsPerFetchstream, shardIdThe maximum number of records requested by the consumer in a single getRecords call to Kinesis. If ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS is set to true, this value is adaptively calculated to maximize the 2 Mbps read limits from Kinesis. Gauge
    OperatornumberOfAggregatedRecordsPerFetchstream, shardIdThe number of aggregated Kinesis records fetched by the consumer in a single getRecords call to Kinesis. Gauge
    OperatornumberOfDeggregatedRecordsPerFetchstream, shardIdThe number of deaggregated Kinesis records fetched by the consumer in a single getRecords call to Kinesis. Gauge
    OperatoraverageRecordSizeBytesstream, shardIdThe average size of a Kinesis record in bytes, fetched by the consumer in a single getRecords call. Gauge
    OperatorrunLoopTimeNanosstream, shardIdThe actual time taken, in nanoseconds, by the consumer in the run loop. Gauge
    OperatorloopFrequencyHzstream, shardIdThe number of calls to getRecords in one second. Gauge
    OperatorbytesRequestedPerFetchstream, shardIdThe bytes requested (2 Mbps / loopFrequencyHz) in a single call to getRecords. Gauge

    System resources

    System resources reporting is disabled by default. When metrics.system-resourceis enabled additional metrics listed below will be available on Job- and TaskManager.System resources metrics are updated periodically and they present average values for aconfigured interval (metrics.system-resource-probing-interval).

    System resources reporting requires an optional dependency to be present on theclasspath (for example placed in Flink’s lib directory):

    • com.github.oshi:oshi-core:3.4.0 (licensed under EPL 1.0 license)

    Including it’s transitive dependencies:

    • net.java.dev.jna:jna-platform:jar:4.2.2
    • net.java.dev.jna:jna:jar:4.2.2

    Failures in this regard will be reported as warning messages like NoClassDefFoundErrorlogged by SystemResourcesMetricsInitializer during the startup.

    System CPU

    ScopeInfixMetricsDescription
    Job-/TaskManagerSystem.CPUUsageOverall % of CPU usage on the machine.
    Idle% of CPU Idle usage on the machine.
    Sys% of System CPU usage on the machine.
    User% of User CPU usage on the machine.
    IOWait% of IOWait CPU usage on the machine.
    Irq% of Irq CPU usage on the machine.
    SoftIrq% of SoftIrq CPU usage on the machine.
    Nice% of Nice Idle usage on the machine.
    Load1minAverage CPU load over 1 minute
    Load5minAverage CPU load over 5 minute
    Load15minAverage CPU load over 15 minute
    UsageCPU*% of CPU usage per each processor

    System memory

    ScopeInfixMetricsDescription
    Job-/TaskManagerSystem.MemoryAvailableAvailable memory in bytes
    TotalTotal memory in bytes
    System.SwapUsedUsed swap bytes
    TotalTotal swap in bytes

    System network

    ScopeInfixMetricsDescription
    Job-/TaskManagerSystem.Network.INTERFACE_NAMEReceiveRateAverage receive rate in bytes per second
    SendRateAverage send rate in bytes per second

    Latency tracking

    Flink allows to track the latency of records traveling through the system. This feature is disabled by default.To enable the latency tracking you must set the latencyTrackingInterval to a positive number in either theFlink configuration or ExecutionConfig.

    At the latencyTrackingInterval, the sources will periodically emit a special record, called a LatencyMarker.The marker contains a timestamp from the time when the record has been emitted at the sources.Latency markers can not overtake regular user records, thus if records are queuing up in front of an operator,it will add to the latency tracked by the marker.

    Note that the latency markers are not accounting for the time user records spend in operators as they arebypassing them. In particular the markers are not accounting for the time records spend for example in window buffers.Only if operators are not able to accept new records, thus they are queuing up, the latency measured usingthe markers will reflect that.

    The LatencyMarkers are used to derive a distribution of the latency between the sources of the topology and eachdownstream operator. These distributions are reported as histogram metrics. The granularity of these distributions canbe controlled in the Flink configuration. For the highestgranularity subtask Flink will derive the latency distribution between every source subtask and every downstream subtask, which results in quadratic (in the terms of the parallelism) number of histograms.

    Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend settingup an automated clock synchronisation service (like NTP) to avoid false latency results.

    Warning Enabling latency metrics can significantly impact the performanceof the cluster (in particular for subtask granularity). It is highly recommended to only use them for debugging purposes.

    REST API integration

    Metrics can be queried through the Monitoring REST API.

    Below is a list of available endpoints, with a sample JSON response. All endpoints are of the sample form http://hostname:8081/jobmanager/metrics, below we list only the path part of the URLs.

    Values in angle brackets are variables, for example http://hostname:8081/jobs/<jobid>/metrics will have to be requested for example as http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics.

    Request metrics for a specific entity:

    • /jobmanager/metrics
    • /taskmanagers/<taskmanagerid>/metrics
    • /jobs/<jobid>/metrics
    • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>

    Request metrics aggregated across all entities of the respective type:

    • /taskmanagers/metrics
    • /jobs/metrics
    • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics

    Request metrics aggregated over a subset of all entities of the respective type:

    • /taskmanagers/metrics?taskmanagers=A,B,C
    • /jobs/metrics?jobs=D,E,F
    • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3

    Request a list of available metrics:

    GET /jobmanager/metrics

    1. [
    2. {
    3. "id": "metric1"
    4. },
    5. {
    6. "id": "metric2"
    7. }
    8. ]

    Request the values for specific (unaggregated) metrics:

    GET taskmanagers/ABCDE/metrics?get=metric1,metric2

    1. [
    2. {
    3. "id": "metric1",
    4. "value": "34"
    5. },
    6. {
    7. "id": "metric2",
    8. "value": "2"
    9. }
    10. ]

    Request aggregated values for specific metrics:

    GET /taskmanagers/metrics?get=metric1,metric2

    1. [
    2. {
    3. "id": "metric1",
    4. "min": 1,
    5. "max": 34,
    6. "avg": 15,
    7. "sum": 45
    8. },
    9. {
    10. "id": "metric2",
    11. "min": 2,
    12. "max": 14,
    13. "avg": 7,
    14. "sum": 16
    15. }
    16. ]

    Request specific aggregated values for specific metrics:

    GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

    1. [
    2. {
    3. "id": "metric1",
    4. "min": 1,
    5. "max": 34,
    6. },
    7. {
    8. "id": "metric2",
    9. "min": 2,
    10. "max": 14,
    11. }
    12. ]

    Dashboard integration

    Metrics that were gathered for each task or operator can also be visualized in the Dashboard. On the main page for ajob, select the Metrics tab. After selecting one of the tasks in the top graph you can select metrics to display usingthe Add Metric drop-down menu.

    • Task metrics are listed as <subtask_index>.<metric_name>.
    • Operator metrics are listed as <subtask_index>.<operator_name>.<metric_name>.

    Each metric will be visualized as a separate graph, with the x-axis representing time and the y-axis the measured value.All graphs are automatically updated every 10 seconds, and continue to do so when navigating to another page.

    There is no limit as to the number of visualized metrics; however only numeric metrics can be visualized.