通过对MySQL Exporter整体进行分析,实现一个自定义的demo收集,并进行采集的整合
1.入口(main函数)
可以看到,MySQL Exporter提供了两个URL供访问,一个是 /,用于打印一些基本的信息,另一个就是用于收集metrics的 /metrics 链接。
我们进去看看 /metrics 对应的handler,它是由 newHandler 生成的.
如果我们需要整合node_exporter ,可以在里面进行整合。或者参考我之前的文章进行具体的整合。
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
"strconv"
"strings"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-sql-driver/mysql"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"
"github.com/prometheus/common/version"
nodeCollector "github.com/prometheus/mysqld_exporter/cmd/collector"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/ini.v1"
"github.com/prometheus/mysqld_exporter/collector"
)
var (
listenAddress = kingpin.Flag(
"web.listen-address",
"Address to listen on for web interface and telemetry.",
).Default(":9100").String()
metricPath = kingpin.Flag(
"web.telemetry-path",
"Path under which to expose metrics.",
).Default("/metrics").String()
timeoutOffset = kingpin.Flag(
"timeout-offset",
"Offset to subtract from timeout in seconds.",
).Default("0.25").Float64()
configMycnf = kingpin.Flag(
"config.my-cnf",
"Path to .my.cnf file to read MySQL credentials from.",
).Default(path.Join(os.Getenv("HOME"), ".my.cnf")).String()
tlsInsecureSkipVerify = kingpin.Flag(
"tls.insecure-skip-verify",
"Ignore certificate and server verification when using a tls connection.",
).Bool()
dsn string
)
// scrapers lists all possible collection methods and if they should be enabled by default.
var scrapers = map[collector.Scraper]bool{
collector.ScrapeGlobalStatus{}: true,
collector.ScrapeGlobalVariables{}: true,
collector.ScrapeSlaveStatus{}: true,
collector.ScrapeProcesslist{}: false,
collector.ScrapeUser{}: false,
collector.ScrapeTableSchema{}: false,
collector.ScrapeInfoSchemaInnodbTablespaces{}: false,
collector.ScrapeInnodbMetrics{}: false,
collector.ScrapeAutoIncrementColumns{}: false,
collector.ScrapeBinlogSize{}: false,
collector.ScrapePerfTableIOWaits{}: false,
collector.ScrapePerfIndexIOWaits{}: false,
collector.ScrapePerfTableLockWaits{}: false,
collector.ScrapePerfEventsStatements{}: false,
collector.ScrapePerfEventsStatementsSum{}: false,
collector.ScrapePerfEventsWaits{}: false,
collector.ScrapePerfFileEvents{}: false,
collector.ScrapePerfFileInstances{}: false,
collector.ScrapePerfMemoryEvents{}: false,
collector.ScrapePerfReplicationGroupMembers{}: false,
collector.ScrapePerfReplicationGroupMemberStats{}: false,
collector.ScrapePerfReplicationApplierStatsByWorker{}: false,
collector.ScrapeUserStat{}: false,
collector.ScrapeClientStat{}: false,
collector.ScrapeTableStat{}: false,
collector.ScrapeSchemaStat{}: false,
collector.ScrapeInnodbCmp{}: true,
collector.ScrapeInnodbCmpMem{}: true,
collector.ScrapeQueryResponseTime{}: true,
collector.ScrapeEngineTokudbStatus{}: false,
collector.ScrapeEngineInnodbStatus{}: false,
collector.ScrapeHeartbeat{}: false,
collector.ScrapeSlaveHosts{}: false,
collector.ScrapeReplicaHost{}: false,
collector.ScrapeRdsMysqlDemo2{}: true,
}
func parseMycnf(config interface{}) (string, error) {
var dsn string
opts := ini.LoadOptions{
// MySQL ini file can have boolean keys.
AllowBooleanKeys: true,
}
cfg, err := ini.LoadSources(opts, config)
if err != nil {
return dsn, fmt.Errorf("failed reading ini file: %s", err)
}
user := cfg.Section("client").Key("user").String()
password := cfg.Section("client").Key("password").String()
if user == "" {
return dsn, fmt.Errorf("no user specified under [client] in %s", config)
}
host := cfg.Section("client").Key("host").MustString("localhost")
port := cfg.Section("client").Key("port").MustUint(3306)
socket := cfg.Section("client").Key("socket").String()
sslCA := cfg.Section("client").Key("ssl-ca").String()
sslCert := cfg.Section("client").Key("ssl-cert").String()
sslKey := cfg.Section("client").Key("ssl-key").String()
passwordPart := ""
if password != "" {
passwordPart = ":" + password
} else {
if sslKey == "" {
return dsn, fmt.Errorf("password or ssl-key should be specified under [client] in %s", config)
}
}
if socket != "" {
dsn = fmt.Sprintf("%s%s@unix(%s)/", user, passwordPart, socket)
} else {
dsn = fmt.Sprintf("%s%s@tcp(%s:%d)/", user, passwordPart, host, port)
}
if sslCA != "" {
if tlsErr := customizeTLS(sslCA, sslCert, sslKey); tlsErr != nil {
tlsErr = fmt.Errorf("failed to register a custom TLS configuration for mysql dsn: %s", tlsErr)
return dsn, tlsErr
}
dsn = fmt.Sprintf("%s?tls=custom", dsn)
}
return dsn, nil
}
func customizeTLS(sslCA string, sslCert string, sslKey string) error {
var tlsCfg tls.Config
caBundle := x509.NewCertPool()
pemCA, err := ioutil.ReadFile(sslCA)
if err != nil {
return err
}
if ok := caBundle.AppendCertsFromPEM(pemCA); ok {
tlsCfg.RootCAs = caBundle
} else {
return fmt.Errorf("failed parse pem-encoded CA certificates from %s", sslCA)
}
if sslCert != "" && sslKey != "" {
certPairs := make([]tls.Certificate, 0, 1)
keypair, err := tls.LoadX509KeyPair(sslCert, sslKey)
if err != nil {
return fmt.Errorf("failed to parse pem-encoded SSL cert %s or SSL key %s: %s",
sslCert, sslKey, err)
}
certPairs = append(certPairs, keypair)
tlsCfg.Certificates = certPairs
tlsCfg.InsecureSkipVerify = *tlsInsecureSkipVerify
}
mysql.RegisterTLSConfig("custom", &tlsCfg)
return nil
}
func init() {
prometheus.MustRegister(version.NewCollector("mysqld_exporter"))
}
func newHandler(metrics collector.Metrics, scrapers []collector.Scraper, logger log.Logger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
filteredScrapers := scrapers
params := r.URL.Query()["collect[]"]
// Use request context for cancellation when connection gets closed.
ctx := r.Context()
// If a timeout is configured via the Prometheus header, add it to the context.
if v := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds"); v != "" {
timeoutSeconds, err := strconv.ParseFloat(v, 64)
if err != nil {
level.Error(logger).Log("msg", "Failed to parse timeout from Prometheus header", "err", err)
} else {
if *timeoutOffset >= timeoutSeconds {
// Ignore timeout offset if it doesn't leave time to scrape.
level.Error(logger).Log("msg", "Timeout offset should be lower than prometheus scrape timeout", "offset", *timeoutOffset, "prometheus_scrape_timeout", timeoutSeconds)
} else {
// Subtract timeout offset from timeout.
timeoutSeconds -= *timeoutOffset
}
// Create new timeout context with request context as parent.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeoutSeconds*float64(time.Second)))
defer cancel()
// Overwrite request with timeout context.
r = r.WithContext(ctx)
}
}
level.Debug(logger).Log("msg", "collect[] params", "params", strings.Join(params, ","))
// Check if we have some "collect[]" query parameters.
if len(params) > 0 {
filters := make(map[string]bool)
for _, param := range params {
filters[param] = true
}
filteredScrapers = nil
for _, scraper := range scrapers {
if filters[scraper.Name()] {
filteredScrapers = append(filteredScrapers, scraper)
}
}
}
registry := prometheus.NewRegistry()
registry.MustRegister(collector.New(ctx, dsn, metrics, filteredScrapers, logger))
// add node_exporter start
nc, err := nodeCollector.NewNodeCollector(logger)
if err != nil {
logger.Log("couldn't create collector: %s", err)
}
registry.MustRegister(nc)
// add node_exporter end
gatherers := prometheus.Gatherers{
prometheus.DefaultGatherer,
registry,
}
// Delegate http serving to Prometheus client library, which will call collector.Collect.
h := promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)
}
}
func main() {
// Generate ON/OFF flags for all scrapers.
scraperFlags := map[collector.Scraper]*bool{}
for scraper, enabledByDefault := range scrapers {
defaultOn := "false"
if enabledByDefault {
defaultOn = "true"
}
f := kingpin.Flag(
"collect."+scraper.Name(),
scraper.Help(),
).Default(defaultOn).Bool()
scraperFlags[scraper] = f
}
// Parse flags.
promlogConfig := &promlog.Config{}
flag.AddFlags(kingpin.CommandLine, promlogConfig)
kingpin.Version(version.Print("mysqld_exporter"))
kingpin.HelpFlag.Short('h')
kingpin.Parse()
logger := promlog.New(promlogConfig)
// landingPage contains the HTML served at '/'.
// TODO: Make this nicer and more informative.
var landingPage = []byte(`<html>
<head><title>MySQLd exporter</title></head>
<body>
<h1>MySQLd exporter</h1>
<p><a href='` + *metricPath + `'>Metrics</a></p>
</body>
</html>
`)
level.Info(logger).Log("msg", "Starting msqyld_exporter", "version", version.Info())
level.Info(logger).Log("msg", "Build context", version.BuildContext())
dsn = os.Getenv("DATA_SOURCE_NAME")
if len(dsn) == 0 {
var err error
if dsn, err = parseMycnf(*configMycnf); err != nil {
level.Info(logger).Log("msg", "Error parsing my.cnf", "file", *configMycnf, "err", err)
os.Exit(1)
}
}
// Register only scrapers enabled by flag.
enabledScrapers := []collector.Scraper{}
for scraper, enabled := range scraperFlags {
if *enabled {
level.Info(logger).Log("msg", "Scraper enabled", "scraper", scraper.Name())
enabledScrapers = append(enabledScrapers, scraper)
}
}
handlerFunc := newHandler(collector.NewMetrics(), enabledScrapers, logger)
http.Handle(*metricPath, promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, handlerFunc))
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write(landingPage)
})
level.Info(logger).Log("msg", "Listening on address", "address", *listenAddress)
if err := http.ListenAndServe(*listenAddress, nil); err != nil {
level.Error(logger).Log("msg", "Error starting HTTP server", "err", err)
os.Exit(1)
}
}
2.接口实现
而关键就在于 registry.MustRegister 要求给的参数是符合 Collector 接口的实现,也就是说,每次需要收集信息的时候,就会调用 Collector 接口的 Collect 方法:
type Collector interface {
Describe(chan<- *Desc)
Collect(chan<- Metric)
}
我们不难发现,收集器并发收集所有指标,每个具体指标都会实现 Scraper 这个接口:
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"database/sql"
"github.com/go-kit/kit/log"
_ "github.com/go-sql-driver/mysql"
"github.com/prometheus/client_golang/prometheus"
)
// Scraper is minimal interface that let's you add new prometheus metrics to mysqld_exporter.
type Scraper interface {
// Name of the Scraper. Should be unique.
Name() string
// Help describes the role of the Scraper.
// Example: "Collect from SHOW ENGINE INNODB STATUS"
Help() string
// Version of MySQL from which scraper is available.
Version() float64
// Scrape collects data from database connection and sends it over channel as prometheus metric.
Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error
}
那就简单了,我们如果想要实现一个指标的采集只要实现该接口就行,而具体的指标,就在 Scrape 这个接口里,从数据库里查出来,并且利用 各种方式把需要的数据提取出来,例如文本解析,正则等等。我们来实现一个简单的收集器:
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Scrape `mysql.user test`.
package collector
import (
"context"
"database/sql"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
)
// Metric descriptors.
var (
RdsDemo2Desc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, mysql, "rds_demo2_test"),
"this is rds_demo2 test",
nil, nil)
)
// ScrapeUser collects from `information_schema.processlist`.
type ScrapeRdsMysqlDemo2 struct{}
// Name of the Scraper. Should be unique.
func (ScrapeRdsMysqlDemo2) Name() string {
return "ScrapeRdsMysqlDemo2"
}
// Help describes the role of the Scraper.
func (ScrapeRdsMysqlDemo2) Help() string {
return " 1 Collect data from mysql.user"
}
// Version of MySQL from which scraper is available.
func (ScrapeRdsMysqlDemo2) Version() float64 {
return 8.0
}
// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeRdsMysqlDemo2) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
ch <- prometheus.MustNewConstMetric(RdsDemo2Desc, prometheus.GaugeValue, float64(11))
return nil
}
// check interface
var _ Scraper = ScrapeRdsMysqlDemo2{}
3.采集的集合
通过上边的代码,我们已经知道了,mysql exporter采集指标的方式就是一个个实现接口就行,我们可以很方便扩展。我们可以用集合来表示监控参数的范围。首先exporter中利用scrapers常量记录了一个默认的采集范围集合A。
// scrapers lists all possible collection methods and if they should be enabled by default.
var scrapers = map[collector.Scraper]bool{
collector.ScrapeGlobalStatus{}: true,
collector.ScrapeGlobalVariables{}: true,
collector.ScrapeSlaveStatus{}: true,
collector.ScrapeProcesslist{}: false,
collector.ScrapeUser{}: false,
collector.ScrapeTableSchema{}: false,
collector.ScrapeInfoSchemaInnodbTablespaces{}: false,
collector.ScrapeInnodbMetrics{}: false,
collector.ScrapeAutoIncrementColumns{}: false,
collector.ScrapeBinlogSize{}: false,
collector.ScrapePerfTableIOWaits{}: false,
collector.ScrapePerfIndexIOWaits{}: false,
collector.ScrapePerfTableLockWaits{}: false,
collector.ScrapePerfEventsStatements{}: false,
collector.ScrapePerfEventsStatementsSum{}: false,
collector.ScrapePerfEventsWaits{}: false,
collector.ScrapePerfFileEvents{}: false,
collector.ScrapePerfFileInstances{}: false,
collector.ScrapePerfMemoryEvents{}: false,
collector.ScrapePerfReplicationGroupMembers{}: false,
collector.ScrapePerfReplicationGroupMemberStats{}: false,
collector.ScrapePerfReplicationApplierStatsByWorker{}: false,
collector.ScrapeUserStat{}: false,
collector.ScrapeClientStat{}: false,
collector.ScrapeTableStat{}: false,
collector.ScrapeSchemaStat{}: false,
collector.ScrapeInnodbCmp{}: true,
collector.ScrapeInnodbCmpMem{}: true,
collector.ScrapeQueryResponseTime{}: true,
collector.ScrapeEngineTokudbStatus{}: false,
collector.ScrapeEngineInnodbStatus{}: false,
collector.ScrapeHeartbeat{}: false,
collector.ScrapeSlaveHosts{}: false,
collector.ScrapeReplicaHost{}: false,
collector.ScrapeRdsMysqlDemo2{}: true,//此处为自己简单的实现
}
exporter也允许在exporter启动的时候,通过设置启动参数来设置采集范围B。当集合B不存在时,集合A生效;当集合B存在时,集合B生效,集合A失效。Prometheus在采集exporter的数据时,可以携带一个collect[]参数设定采集范围C。当集合C不存在时,Prometheus最终的采集范围是A或者B(取决于哪个集合生效);当集合C存在时,Prometheus最终的采集范围时C和A或者B(取决于哪个集合生效)的交集。