在prometheus里面有很多的exporter,每个exporter里面的都有一个collector,我在这里先写分析一下prometheus自身的监控系统,采集自己的监控数据。
先看接口
type Collector interface { Describe(chan<- *Desc) Collect(chan<- Metric) }
有很多数据类型实现了这个接口
Gauge
type Gauge interface { Metric Collector // Set sets the Gauge to an arbitrary value. Set(float64) // Inc increments the Gauge by 1. Inc() // Dec decrements the Gauge by 1. Dec() // Add adds the given value to the Gauge. (The value can be // negative, resulting in a decrease of the Gauge.) Add(float64) // Sub subtracts the given value from the Gauge. (The value can be // negative, resulting in an increase of the Gauge.) Sub(float64)}
Histogram
type Histogram interface { Metric Collector // Observe adds a single observation to the histogram. Observe(float64)}
Counter
type Counter interface { Metric Collector // Set is used to set the Counter to an arbitrary value. It is only used // if you have to transfer a value from an external counter into this // Prometheus metric. Do not use it for regular handling of a // Prometheus counter (as it can be used to break the contract of // monotonically increasing values). // // Deprecated: Use NewConstMetric to create a counter for an external // value. A Counter should never be set. Set(float64) // Inc increments the counter by 1. Inc() // Add adds the given value to the counter. It panics if the value is < // 0. Add(float64) }
Summary
type Summary interface { Metric Collector // Observe adds a single observation to the summary. Observe(float64)}
这是Collector接口还有一个prometheus自己的一个实现selfCollector
type selfCollector struct { self Metric }// init provides the selfCollector with a reference to the metric it is supposed// to collect. It is usually called within the factory function to create a// metric. See example.func (c *selfCollector) init(self Metric) { c.self = self}// Describe implements Collector.func (c *selfCollector) Describe(ch chan<- *Desc) { ch <- c.self.Desc() }// Collect implements Collector.func (c *selfCollector) Collect(ch chan<- Metric) { ch <- c.self}
当执行selfCollector的Collect方法就是返回本身的Metric。还记得第一篇说的注册吗?prometheus.MustRegister(configSuccess)注册这个configSuccess
configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "prometheus", Name: "config_last_reload_successful", Help: "Whether the last configuration reload attempt was successful.", })
在NewGauge里面,本质上就创建一个value。这个value里面有selfCollector,就是上面的selfCollector
type value struct { valBits uint64 selfCollector desc *Desc valType ValueType labelPairs []*dto.LabelPair}
创建完Gauge后就可以注册MustRegister(…Collector),具体看
func (r *Registry) MustRegister(cs ...Collector) { for _, c := range cs { if err := r.Register(c); err != nil { panic(err) } } }
再深入看一下Register方法
if len(newDescIDs) == 0 { return errors.New("collector has no descriptors") } if existing, exists := r.collectorsByID[collectorID]; exists { return AlreadyRegisteredError{ ExistingCollector: existing, NewCollector: c, } } // If the collectorID is new, but at least one of the descs existed // before, we are in trouble. if duplicateDescErr != nil { return duplicateDescErr } // Only after all tests have passed, actually register. r.collectorsByID[collectorID] = c for hash := range newDescIDs { r.descIDs[hash] = struct{}{} } for name, dimHash := range newDimHashesByName { r.dimHashesByName[name] = dimHash }
就是注册到collectorsByID这map里面,collectorsByID map[uint64]Collector 它的key是descID,值就是我们注册的collector。
通过这个map去维护collector。取消注册的方法是删除
r.mtx.RLock() if _, exists := r.collectorsByID[collectorID]; !exists { r.mtx.RUnlock() return false } r.mtx.RUnlock() r.mtx.Lock() defer r.mtx.Unlock() delete(r.collectorsByID, collectorID) for id := range descIDs { delete(r.descIDs, id) }
现在已经把collector的结构和注册讲完了,那么采集就变的顺理成章了,Gather()方法采集数据
wg.Add(len(r.collectorsByID)) go func() { wg.Wait() close(metricChan) }() for _, collector := range r.collectorsByID { go func(collector Collector) { defer wg.Done() collector.Collect(metricChan) }(collector) }
循环遍历执行collecto去采集,把结果放到metricChan,然后就参数解析封装了,这里涉及到了数据类型,和上面接口组合是对应的
dtoMetric := &dto.Metric{} if err := metric.Write(dtoMetric); err != nil { errs = append(errs, fmt.Errorf( "error collecting metric %v: %s", desc, err, )) continue } ... metricFamily.Metric = append(metricFamily.Metric, dtoMetric)
上面的write方法在需要解释一下,如果是value类型
func (v *value) Write(out *dto.Metric) error { val := math.Float64frombits(atomic.LoadUint64(&v.valBits)) return populateMetric(v.valType, val, v.labelPairs, out) } func populateMetric( t ValueType, v float64, labelPairs []*dto.LabelPair, m *dto.Metric, ) error { m.Label = labelPairs switch t { case CounterValue: m.Counter = &dto.Counter{Value: proto.Float64(v)} case GaugeValue: m.Gauge = &dto.Gauge{Value: proto.Float64(v)} case UntypedValue: m.Untyped = &dto.Untyped{Value: proto.Float64(v)} default: return fmt.Errorf("encountered unknown type %v", t) } return nil }
如果是其它类型,在自己的
这里还有补充一下对于指标的定义
type Metric struct { Label []*LabelPair `protobuf:"bytes,1,rep,name=label" json:"label,omitempty"` Gauge *Gauge `protobuf:"bytes,2,opt,name=gauge" json:"gauge,omitempty"` Counter *Counter `protobuf:"bytes,3,opt,name=counter" json:"counter,omitempty"` Summary *Summary `protobuf:"bytes,4,opt,name=summary" json:"summary,omitempty"` Untyped *Untyped `protobuf:"bytes,5,opt,name=untyped" json:"untyped,omitempty"` Histogram *Histogram `protobuf:"bytes,7,opt,name=histogram" json:"histogram,omitempty"` TimestampMs *int64 `protobuf:"varint,6,opt,name=timestamp_ms" json:"timestamp_ms,omitempty"` XXX_unrecognized []byte `json:"-"`}
文章最后发布于: 2017-04-28 18:06:00
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文转载自:csdn
作者:u010278923
原文链接:https://blog.csdn.net/u010278923/article/details/70927773