通用包
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
utils/tracer/provider/sls.go

408 lines
13 KiB

package provider
import (
"context"
"encoding/json"
"errors"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
"os"
"strings"
"time"
"github.com/sethvargo/go-envconfig"
"go.opentelemetry.io/contrib/instrumentation/host"
"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
otlpTraceGrpc "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
)
const (
slsProjectHeader = "x-sls-otel-project"
slsInstanceIDHeader = "x-sls-otel-instance-id"
slsAccessKeyIDHeader = "x-sls-otel-ak-id"
slsAccessKeySecretHeader = "x-sls-otel-ak-secret"
slsSecurityTokenHeader = "x-sls-otel-token"
)
// Option configures the sls otel provider
type Option func(*Config)
// WithMetricExporterEndpoint configures the endpoint for sending metrics via OTLP
// 配置Metric的输出地址,如果配置为空则禁用Metric功能,配置为stdout则打印到标准输出用于测试
func WithMetricExporterEndpoint(url string) Option {
return func(c *Config) {
c.MetricExporterEndpoint = url
}
}
// WithTraceExporterEndpoint configures the endpoint for sending traces via OTLP
// 配置Trace的输出地址,如果配置为空则禁用Trace功能,配置为stdout则打印到标准输出用于测试
func WithTraceExporterEndpoint(url string) Option {
return func(c *Config) {
c.TraceExporterEndpoint = url
}
}
// WithServiceName configures a "service.name" resource label
// 配置服务名称
func WithServiceName(name string) Option {
return func(c *Config) {
c.ServiceName = name
}
}
func WithServiceNamespace(namespace string) Option {
return func(c *Config) {
c.ServiceNamespace = namespace
}
}
// WithServiceVersion configures a "service.version" resource label
// 配置版本号
func WithServiceVersion(version string) Option {
return func(c *Config) {
c.ServiceVersion = version
}
}
// WithTraceExporterInsecure permits connecting to the trace endpoint without a certificate
// 配置是否禁用SSL,如果输出到SLS,则必须打开SLS
func WithTraceExporterInsecure(insecure bool) Option {
return func(c *Config) {
c.TraceExporterEndpointInsecure = insecure
}
}
// WithMetricExporterInsecure permits connecting to the metric endpoint without a certificate
// 配置是否禁用SSL,如果输出到SLS,则必须打开SLS
func WithMetricExporterInsecure(insecure bool) Option {
return func(c *Config) {
c.MetricExporterEndpointInsecure = insecure
}
}
// WithResourceAttributes configures attributes on the resource
// 配置上传附加的一些tag信息,例如环境、可用区等
func WithResourceAttributes(attributes map[string]string) Option {
return func(c *Config) {
c.resourceAttributes = attributes
}
}
// WithResource configures attributes on the resource
// 配置上传附加的一些tag信息,例如环境、可用区等
func WithResource(resource *resource.Resource) Option {
return func(c *Config) {
c.Resource = resource
}
}
// WithErrorHandler Configures a global error handler to be used throughout an OpenTelemetry instrumented project.
// See "go.opentelemetry.io/otel"
// 配置OpenTelemetry错误处理函数
func WithErrorHandler(handler otel.ErrorHandler) Option {
return func(c *Config) {
c.errorHandler = handler
}
}
// WithMetricReportingPeriod configures the metric reporting period,
// how often the controller collects and exports metric data.
// 配置Metric导出间隔,默认为30s
func WithMetricReportingPeriod(p time.Duration) Option {
return func(c *Config) {
c.MetricReportingPeriod = fmt.Sprint(p)
}
}
// WithSLSConfig configures sls project, instanceID, accessKeyID, accessKeySecret to send data to sls directly
// 配置输出到SLS的信息,包括 project, instanceID, accessKeyID, accessKeySecret
func WithSLSConfig(project, instanceID, accessKeyID, accessKeySecret string) Option {
return func(c *Config) {
c.Project, c.InstanceID, c.AccessKeyID, c.AccessKeySecret = project, instanceID, accessKeyID, accessKeySecret
}
}
func WithIDGenerator(generator sdktrace.IDGenerator) Option {
return func(config *Config) {
if generator != nil {
config.IDGenerator = generator
}
}
}
// Config configure for sls otel
type Config struct {
TraceExporterEndpoint string `env:"SLS_OTEL_TRACE_ENDPOINT,default=stdout"`
TraceExporterEndpointInsecure bool `env:"SLS_OTEL_TRACE_INSECURE,default=false"`
MetricExporterEndpoint string `env:"SLS_OTEL_METRIC_ENDPOINT,default=stdout"`
MetricExporterEndpointInsecure bool `env:"SLS_OTEL_METRIC_INSECURE,default=false"`
MetricReportingPeriod string `env:"SLS_OTEL_METRIC_EXPORT_PERIOD,default=30s"`
ServiceName string `env:"SLS_OTEL_SERVICE_NAME"`
ServiceNamespace string `env:"SLS_OTEL_SERVICE_NAMESPACE"`
ServiceVersion string `env:"SLS_OTEL_SERVICE_VERSION,default=v0.1.0"`
Project string `env:"SLS_OTEL_PROJECT"`
InstanceID string `env:"SLS_OTEL_INSTANCE_ID"`
AccessKeyID string `env:"SLS_OTEL_ACCESS_KEY_ID"`
AccessKeySecret string `env:"SLS_OTEL_ACCESS_KEY_SECRET"`
AttributesEnvKeys string `env:"SLS_OTEL_ATTRIBUTES_ENV_KEYS"`
IDGenerator sdktrace.IDGenerator
Resource *resource.Resource
resourceAttributes map[string]string
errorHandler otel.ErrorHandler
stop []func()
}
func parseEnvKeys(c *Config) {
if c.AttributesEnvKeys == "" {
return
}
envKeys := strings.Split(c.AttributesEnvKeys, "|")
for _, key := range envKeys {
key = strings.TrimSpace(key)
value := os.Getenv(key)
if value != "" {
c.resourceAttributes[key] = value
}
}
}
// 默认使用本机hostname作为hostname
func getDefaultResource(c *Config) *resource.Resource {
hostname, _ := os.Hostname()
return resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(c.ServiceName),
semconv.HostNameKey.String(hostname),
semconv.ServiceNamespaceKey.String(c.ServiceNamespace),
semconv.ServiceVersionKey.String(c.ServiceVersion),
semconv.ProcessPIDKey.Int(os.Getpid()),
semconv.ProcessCommandKey.String(os.Args[0]),
)
}
func mergeResource(c *Config) error {
var e error
if c.Resource, e = resource.Merge(getDefaultResource(c), c.Resource); e != nil {
return e
}
r := resource.Environment()
if c.Resource, e = resource.Merge(c.Resource, r); e != nil {
return e
}
var keyValues []attribute.KeyValue
for key, value := range c.resourceAttributes {
keyValues = append(keyValues, attribute.KeyValue{
Key: attribute.Key(key),
Value: attribute.StringValue(value),
})
}
newResource := resource.NewWithAttributes(semconv.SchemaURL, keyValues...)
if c.Resource, e = resource.Merge(c.Resource, newResource); e != nil {
return e
}
return nil
}
// 初始化Exporter,如果otlpEndpoint传入的值为 stdout,则默认把信息打印到标准输出用于调试
func (c *Config) initOtelExporter(otlpEndpoint string, insecure bool) (trace.SpanExporter, metric.Exporter, func(), error) {
var traceExporter trace.SpanExporter
var metricsExporter metric.Exporter
var err error
var exporterStop = func() {
if traceExporter != nil {
traceExporter.Shutdown(context.Background())
}
}
if otlpEndpoint == "stdout" {
// 使用Pretty的打印方式
traceExporter, err = stdouttrace.New(stdouttrace.WithPrettyPrint())
if err != nil {
return nil, nil, nil, err
}
enc := json.NewEncoder(os.Stdout)
metricsExporter, err = stdoutmetric.New(stdoutmetric.WithEncoder(enc))
} else if otlpEndpoint != "" {
headers := map[string]string{}
if c.Project != "" && c.InstanceID != "" {
headers = map[string]string{
slsProjectHeader: c.Project,
slsInstanceIDHeader: c.InstanceID,
slsAccessKeyIDHeader: c.AccessKeyID,
slsAccessKeySecretHeader: c.AccessKeySecret,
}
}
// 使用GRPC方式导出数据
traceSecureOption := otlpTraceGrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(nil, ""))
if insecure {
traceSecureOption = otlpTraceGrpc.WithInsecure()
}
traceExporter, err = otlptrace.New(context.Background(),
otlpTraceGrpc.NewClient(otlpTraceGrpc.WithEndpoint(otlpEndpoint),
traceSecureOption,
otlpTraceGrpc.WithHeaders(headers),
otlpTraceGrpc.WithCompressor(gzip.Name)))
if err != nil {
return nil, nil, nil, err
}
metricSecureOption := otlpmetricgrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(nil, ""))
if insecure {
metricSecureOption = otlpmetricgrpc.WithInsecure()
}
metricsExporter, err = otlpmetricgrpc.New(context.Background(), otlpmetricgrpc.WithEndpoint(otlpEndpoint),
metricSecureOption, otlpmetricgrpc.WithHeaders(headers), otlpmetricgrpc.WithCompressor(gzip.Name))
}
return traceExporter, metricsExporter, exporterStop, nil
}
// 初始化Metrics,默认30秒导出一次Metrics
// 默认该函数导出主机和Golang runtime基础指标
func (c *Config) initMetric(metricsExporter metric.Exporter, stop func()) error {
if metricsExporter == nil {
return nil
}
period, err := time.ParseDuration(c.MetricReportingPeriod)
if err != nil {
period = time.Second * 30
}
reader := metric.NewPeriodicReader(metricsExporter, metric.WithInterval(period))
meterProvider := metric.NewMeterProvider(
metric.WithReader(reader),
metric.WithResource(c.Resource))
otel.SetMeterProvider(meterProvider)
// 默认集成主机基础指标
if err := host.Start(host.WithMeterProvider(meterProvider)); err != nil {
return err
}
// 默认集成Golang runtime指标
err = runtime.Start(runtime.WithMeterProvider(meterProvider), runtime.WithMinimumReadMemStatsInterval(time.Second))
c.stop = append(c.stop, func() {
meterProvider.Shutdown(context.Background())
stop()
})
return err
}
// 初始化Traces,默认全量上传
func (c *Config) initTracer(traceExporter trace.SpanExporter, stop func(), config *Config) error {
if traceExporter == nil {
return nil
}
// 建议使用AlwaysSample全量上传Trace数据,若您的数据太多,可以使用sdktrace.ProbabilitySampler进行采样上传
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(
traceExporter,
),
sdktrace.WithIDGenerator(config.IDGenerator),
sdktrace.WithResource(c.Resource),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
c.stop = append(c.stop, func() {
tp.Shutdown(context.Background())
stop()
})
return nil
}
// IsValid check config and return error if config invalid
func (c *Config) IsValid() error {
if c.ServiceName == "" {
return errors.New("empty service name")
}
if c.ServiceVersion == "" {
return errors.New("empty service version")
}
if (strings.Contains(c.TraceExporterEndpoint, "log.aliyuncs.com") && c.TraceExporterEndpointInsecure) ||
(strings.Contains(c.MetricExporterEndpoint, "log.aliyuncs.com") && c.MetricExporterEndpointInsecure) {
return errors.New("insecure grpc is not allowed when send data to sls directly")
}
if strings.Contains(c.TraceExporterEndpoint, "log.aliyuncs.com") || strings.Contains(c.MetricExporterEndpoint, "log.aliyuncs.com") {
if c.Project == "" || c.InstanceID == "" || c.AccessKeyID == "" || c.AccessKeySecret == "" {
return errors.New("empty project, instanceID, accessKeyID or accessKeySecret when send data to sls directly")
}
if strings.ContainsAny(c.Project, "${}") ||
strings.ContainsAny(c.InstanceID, "${}") ||
strings.ContainsAny(c.AccessKeyID, "${}") ||
strings.ContainsAny(c.AccessKeySecret, "${}") {
return errors.New("invalid project, instanceID, accessKeyID or accessKeySecret when send data to sls directly, you should replace these parameters with actual values")
}
}
return nil
}
// NewConfig create a config
func NewConfig(opts ...Option) (*Config, error) {
var c Config
// 1. load env config
envError := envconfig.Process(context.Background(), &c)
if envError != nil {
return nil, envError
}
// 2. load code config
for _, opt := range opts {
opt(&c)
}
// 3. merge resource
parseEnvKeys(&c)
mergeResource(&c)
return &c, c.IsValid()
}
// Start 初始化OpenTelemetry SDK,需要把 ${endpoint} 替换为实际的地址
// 如果填写为stdout则为调试默认,数据将打印到标准输出
func Start(c *Config) error {
if c.errorHandler != nil {
otel.SetErrorHandler(c.errorHandler)
}
traceExporter, _, traceExpStop, err := c.initOtelExporter(c.TraceExporterEndpoint, c.TraceExporterEndpointInsecure)
if err != nil {
return err
}
_, metricExporter, metricExpStop, err := c.initOtelExporter(c.MetricExporterEndpoint, c.MetricExporterEndpointInsecure)
if err != nil {
return err
}
err = c.initTracer(traceExporter, traceExpStop, c)
if err != nil {
return err
}
err = c.initMetric(metricExporter, metricExpStop)
return err
}
// Shutdown 优雅关闭,将OpenTelemetry SDK内存中的数据发送到服务端
func Shutdown(c *Config) {
for _, stop := range c.stop {
stop()
}
}