Skip to content

Instantly share code, notes, and snippets.

@austince
Last active December 15, 2022 15:37
Show Gist options
  • Save austince/bd3a27417ad68d713276b8c818ddbf7f to your computer and use it in GitHub Desktop.
Save austince/bd3a27417ad68d713276b8c818ddbf7f to your computer and use it in GitHub Desktop.
An example of the current way to add additional context to k8s workqueue metrics using OpenTelemetry.
package main
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"k8s.io/client-go/util/workqueue"
"strings"
"time"
)
func main() {
provider := &customMetricsProvider{
meter: metric.NewNoopMeterProvider().Meter("workqueue"),
}
workqueue.SetProvider(provider)
// Queue name must encode all context for the queue,
// as that is the only state passed to the global metrics provider.
// Let's say name is in the format "%s-{key:val};*"
queue := workqueue.NewNamed("test-org:austince;project:test2")
go func() {
for i := 1; ; i++ {
queue.Add(i)
time.Sleep(time.Second)
}
}()
<-context.Background().Done()
}
// customMetricsProvider adds additional identifying context to workqueue metrics
// by parsing the queue name and delegating to an OpenTelemetry meter.
type customMetricsProvider struct {
meter metric.Meter
}
var _ workqueue.MetricsProvider = &customMetricsProvider{}
func (d *customMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
// In the format "%s-{key:val};*"
// Would also need to handle if this is called from a library and doesn't have the same structure.
parts := strings.SplitN(name, "-", 2)
name = parts[0]
var attrs []attribute.KeyValue
keyVals := strings.Split(parts[1], ";")
for _, p := range keyVals {
keyParts := strings.SplitN(p, ":", 2)
attrs = append(attrs, attribute.KeyValue{
Key: attribute.Key(keyParts[0]),
// Can't easily support other value types besides strings
Value: attribute.StringValue(keyParts[1]),
})
}
counter, err := d.meter.SyncInt64().
UpDownCounter(name, instrument.WithDescription("Current depth of workqueue"))
if err != nil {
panic(err)
}
return &otelGauge{
counter: counter,
extraAttributes: attrs,
}
}
func (d *customMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
return &noOpMetrics{}
}
func (d *customMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
return &noOpMetrics{}
}
func (d *customMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
return &noOpMetrics{}
}
func (d *customMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
return &noOpMetrics{}
}
func (d *customMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
return &noOpMetrics{}
}
func (d *customMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
return &noOpMetrics{}
}
// noOpMetrics is a no-op implementation of all workqueue metrics.
type noOpMetrics struct{}
var _ workqueue.GaugeMetric = &noOpMetrics{}
func (n *noOpMetrics) Dec() {
}
var _ workqueue.CounterMetric = &noOpMetrics{}
func (n *noOpMetrics) Inc() {}
var _ workqueue.SettableGaugeMetric = &noOpMetrics{}
func (n *noOpMetrics) Set(f float64) {}
var _ workqueue.HistogramMetric = &noOpMetrics{}
func (n *noOpMetrics) Observe(f float64) {
}
// otelGauge implements workqueue.GaugeMetric using OpenTelemetry.
type otelGauge struct {
counter syncint64.UpDownCounter
extraAttributes []attribute.KeyValue
}
func (o *otelGauge) Inc() {
o.counter.Add(context.Background(), 1, o.extraAttributes...)
}
func (o *otelGauge) Dec() {
o.counter.Add(context.Background(), -1, o.extraAttributes...)
}
var _ workqueue.GaugeMetric = &otelGauge{}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment