Skip to content

Instantly share code, notes, and snippets.

@austince
Created December 12, 2022 20:27
Show Gist options
  • Save austince/59cf732f52fb60a7b067ec9640f23e24 to your computer and use it in GitHub Desktop.
Save austince/59cf732f52fb60a7b067ec9640f23e24 to your computer and use it in GitHub Desktop.
An example of adding per-queue additional context to k8s workqueue metrics using OpenTelemetry.
package main
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"k8s.io/client-go/util/workqueue"
"time"
)
func main() {
// Shared meter provider
meterProvider := metric.NewNoopMeterProvider().Meter("workqueue")
// Context that has been passed down
ctx := context.Background()
ctx = context.WithValue(ctx, "org", "austince")
ctx = context.WithValue(ctx, "project", "test2")
// Structured keys
orgAttrKey := attribute.Key("org")
projectAttrKey := attribute.Key("project")
// per-queue metrics provider that is enhanced with contextual information to identify the queue
provider := &customMetricsProvider{
meter: meterProvider,
ctx: ctx,
extraAttributes: []attribute.KeyValue{
orgAttrKey.String(ctx.Value("org").(string)),
projectAttrKey.String(ctx.Value("project").(string)),
},
}
queue := workqueue.NewNamed("test", workqueue.WithMetricsProvider(provider))
go func() {
for i := 1; ; i++ {
queue.Add(i)
time.Sleep(time.Second)
}
}()
<-context.Background().Done()
}
// customMetricsProvider adds additional identifying context to workqueue metrics
// by parsing the queue name and delegating to an OpenTelemetry meter.
type customMetricsProvider struct {
meter metric.Meter
ctx context.Context
extraAttributes []attribute.KeyValue
}
var _ workqueue.MetricsProvider = &customMetricsProvider{}
func (d *customMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
counter, err := d.meter.SyncInt64().
UpDownCounter(name, instrument.WithDescription("Current depth of workqueue"))
if err != nil {
panic(err)
}
return &otelGauge{
counter: counter,
ctx: d.ctx,
extraAttributes: d.extraAttributes,
}
}
func (d *customMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
return &noOpMetrics{}
}
func (d *customMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
return &noOpMetrics{}
}
func (d *customMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
return &noOpMetrics{}
}
func (d *customMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
return &noOpMetrics{}
}
func (d *customMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
return &noOpMetrics{}
}
func (d *customMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
return &noOpMetrics{}
}
// noOpMetrics is a no-op implementation of all workqueue metrics.
type noOpMetrics struct{}
var _ workqueue.GaugeMetric = &noOpMetrics{}
func (n *noOpMetrics) Dec() {
}
var _ workqueue.CounterMetric = &noOpMetrics{}
func (n *noOpMetrics) Inc() {}
var _ workqueue.SettableGaugeMetric = &noOpMetrics{}
func (n *noOpMetrics) Set(f float64) {}
var _ workqueue.HistogramMetric = &noOpMetrics{}
func (n *noOpMetrics) Observe(f float64) {
}
// otelGauge implements workqueue.GaugeMetric using OpenTelemetry.
type otelGauge struct {
counter syncint64.UpDownCounter
ctx context.Context
extraAttributes []attribute.KeyValue
}
func (o *otelGauge) Inc() {
o.counter.Add(o.ctx, 1, o.extraAttributes...)
}
func (o *otelGauge) Dec() {
o.counter.Add(o.ctx, -1, o.extraAttributes...)
}
var _ workqueue.GaugeMetric = &otelGauge{}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment