普罗米修斯 -- 自定义 exporter

原创作者:山治

前言

普罗米修斯提供了多种语言的 client, 用户可以使用 client 很方便的构建自己的 exporter 服务, 后续只需要修改普罗米修斯的配置文件, 就可以把 exporter 加入到普罗米修斯中来。

python client 的使用

首先 需要用 pip install prometheus_client安装客户端

import time
from prometheus_client.core import GaugeMetricFamily, REGISTRY, CounterMetricFamily
from prometheus_client import start_http_server


class CustomCollector(object):
    def __init__(self):
        pass

    def collect(self):
        g = GaugeMetricFamily("MemoryUsage", 'Help text', labels=['instance'])
        g.add_metric(["instance01.us.west.local"], 20)
        yield g

        c = CounterMetricFamily("HttpRequests", 'Help text', labels=['app'])
        c.add_metric(["example"], 2000)
        yield c


if __name__ == '__main__':
    start_http_server(8000)
    REGISTRY.register(CustomCollector())

上面是在 python 中开发一个 exporter 最简单的方式。 我们可以使用prometheus_client 内置的GaugeMetricFamilyCounterMetricFamily 来构建自己的监控指标。

java client 的使用

首先引入依赖

<!-- The client -->
<dependency>
    <groupId>io.prometheus</groupId>
    <artifactId>simpleclient</artifactId>
    <version>0.6.0</version>
</dependency>

<dependency>
    <groupId>io.prometheus</groupId>
    <artifactId>simpleclient_httpserver</artifactId>
    <version>0.6.0</version>
</dependency>
package exporter;

import io.prometheus.client.Counter;
import io.prometheus.client.exporter.HTTPServer;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class CustomExporter {
        # 注册一个counter类型的监控指标并带有一个名字叫method的label
    static final Counter requests = Counter.build()
            .name("my_library_requests_total").help("Total requests.")
            .labelNames("method").register();

    public static void processGetRequest() {
        requests.labels("get").inc();
        // 在这编写监控逻辑
    }

    public static void main(String[] args) throws IOException {
        // 启动一个线程池,每隔10s种触发一次调用监控逻辑
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        service.scheduleWithFixedDelay(() -> {
            // 模拟一个监控动作, 调用counter类型的自增方法。 当然也可以调用上面的processGetRequest方法
            double value = requests.labels("get").get();
            System.out.println(value);
            requests.labels("get").inc();
        }, 0, 10, TimeUnit.SECONDS);

        // 利用普罗米修斯提供的httpserver 启动服务
        HTTPServer server = new HTTPServer(1234);


    }
}

go client 的使用

我们实际用 go client 来开发一个监控在 k8s 集群中监控每一个容器的 socket 状态的 exporter。 首先我们需要通过 go mod 文件拉引入依赖。PS:代码的逻辑解释在注释中。

module prophet-container-exporter

go 1.13

require (
   github.com/pkg/errors v0.9.1
   github.com/prometheus/client_golang v1.5.1
   github.com/sirupsen/logrus v1.4.2
   k8s.io/api v0.0.0-20190620084959-7cf5895f2711
   k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
   k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
)

注意: 除了普罗米修斯的 client 之外, 还需要引入 k8s 的 client-go 用来实际的去监控容器的状态。

初始化监控指标

func init() {
   log.SetOutput(os.Stdout)
   log.Info("init the kubeconfig")
   // 初始化k8s的client有两种方式。 如果当前是在pod中运行的话, client自己会找到容器对应目录下的service account信息与k8s的apiserver通信鉴权。 如果程序在集群外部运行, 那么需要给client提供一个kubeconfig文件
   if isExist, err := PathExists("kubeconfig"); err != nil {
      panic(err)
   } else {
      if isExist {
         log.Info("now out of k8s cluster")
         kubeConfig, err = clientcmd.BuildConfigFromFlags("", "kubeconfig")
      } else {
         log.Info("now In k8s cluster")
         kubeConfig, err = rest.InClusterConfig()
      }
      if err != nil {
         log.Error("cannot init the kubeconfig")
         panic(err.Error())
      }
   }
   var err error
   k8s, err = kubernetes.NewForConfig(kubeConfig)
   if err != nil {
      log.Error("cannot init the k8s client")
      panic(err.Error())
   }
   log.Info("init the k8sclient done, now begin to monitor the k8s")


   // 在开始监控之前首先遍历当前k8s集群中所有的pod,为每个容器建立一个guage类型的监控指标并记录在一个map中,方便后面程序根据这些容器执行具体的监控逻辑。
   register := func() {
      namespaceList, err := k8s.CoreV1().Namespaces().List(metav1.ListOptions{})
      if err != nil {
         log.Error(err)
         os.Exit(1)
      }

      for _, n := range namespaceList.Items {
         namespace := n.Name
         podList, err := k8s.CoreV1().Pods(namespace).List(metav1.ListOptions{})
         if err != nil {
            panic(errors.Wrapf(err, "cannot list k8s with namespace %s", namespace))
         }
         // 遍历所有pod
         for _, pod := range podList.Items {
                        # 只监控Running状态的Pod
            if pod.Status.Phase != "Running" {
               continue
            }

            // 遍历pod下的容器, 每个容器的指标有3个label。1. 名称空间 2. pod名称  3. 容器名称
            for _, container := range pod.Status.ContainerStatuses {
               sdGauge := prometheus.NewGauge(prometheus.GaugeOpts{
                  Name: "namespace_container_Socket_Close_Wait",
                  Help: "num of socket with CLOSE-WAIT status in container",
                  ConstLabels: map[string]string{
                     "namespace": namespace,
                     "pod":       pod.Name,
                     "container": container.Name,
                  },
               })
               if _, ok := sdMetrics[pod.Name+","+namespace]; !ok {
                  prometheus.MustRegister(sdGauge)
                  sdMetrics[pod.Name+","+namespace] = sdGauge
               }
            }
         }
      }

   }

   // 上来就注册一次
   register()

   // 周期性注册namespace下所有pod中app容器的指标, 因为随时会有新的pod启动
   go func() {
      for {
         time.Sleep(time.Minute * 10)
         register()
      }
   }()
}

实际监控

func main() {

   // 周期性的获取最新的监控数据。 遍历所有容器, 向容器发送`cat /proc/net/tcp | awk '{print $4}'|grep 08|wc -l` 命令,该命令可以查询容器中socket的状态,计算一共有多少个处于CLOSE-WAIT的socket。而我们的监控目的就是通过这个指标判断是否存在socket泄露.
   go func() {
      for {
         for podInfo, guage := range sdMetrics {
            tmp := strings.Split(podInfo, ",")
            podName := tmp[0]
            namespace := tmp[1]

            log.WithFields(log.Fields{
               "namespace": namespace,
               "pod":       podName,
            })
            pod, _ := k8s.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{})

            for _, container := range pod.Status.ContainerStatuses {
               commands := []string{"sh", "-c", "cat /proc/net/tcp | awk '{print $4}'|grep 08|wc -l"}
               output, err := promethues.Exec(k8s, kubeConfig, commands, namespace, podName, container.Name)
               if err != nil {
                  log.Error(err.Error())
                  continue
               }
               closeWait, err := strconv.ParseFloat(strings.Replace(output, "\n", "", -1), 32)

               if err != nil {
                  fmt.Fprintf(os.Stdout, "err %s\n", errors.Wrap(err, "cannot trans string to float"))
                  continue
               }
               guage.Set(closeWait)
               log.Infof("successfully collect %s's socket status", podName)
            }
         }
         time.Sleep(time.Minute * 10)
      }
   }()

   http.Handle("/metrics", promhttp.Handler())
   log.Fatal(http.ListenAndServe("0.0.0.0:80", nil))

}