You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
kratos/pkg/sync/pipeline/fanout/fanout.go

150 lines
2.8 KiB

package fanout
import (
"context"
"errors"
"runtime"
"sync"
"github.com/go-kratos/kratos/pkg/log"
"github.com/go-kratos/kratos/pkg/net/metadata"
"github.com/go-kratos/kratos/pkg/net/trace"
)
var (
// ErrFull chan full.
ErrFull = errors.New("fanout: chan full")
traceTags = []trace.Tag{
{Key: trace.TagSpanKind, Value: "background"},
{Key: trace.TagComponent, Value: "sync/pipeline/fanout"},
}
)
type options struct {
worker int
buffer int
}
// Option fanout option
type Option func(*options)
// Worker specifies the worker of fanout
func Worker(n int) Option {
if n <= 0 {
panic("fanout: worker should > 0")
}
return func(o *options) {
o.worker = n
}
}
// Buffer specifies the buffer of fanout
func Buffer(n int) Option {
if n <= 0 {
panic("fanout: buffer should > 0")
}
return func(o *options) {
o.buffer = n
}
}
type item struct {
f func(c context.Context)
ctx context.Context
}
// Fanout async consume data from chan.
type Fanout struct {
name string
ch chan item
options *options
waiter sync.WaitGroup
ctx context.Context
cancel func()
}
// New new a fanout struct.
func New(name string, opts ...Option) *Fanout {
if name == "" {
name = "anonymous"
}
o := &options{
worker: 1,
buffer: 1024,
}
for _, op := range opts {
op(o)
}
c := &Fanout{
ch: make(chan item, o.buffer),
name: name,
options: o,
}
c.ctx, c.cancel = context.WithCancel(context.Background())
c.waiter.Add(o.worker)
for i := 0; i < o.worker; i++ {
go c.proc()
}
return c
}
func (c *Fanout) proc() {
defer c.waiter.Done()
for {
select {
case t := <-c.ch:
wrapFunc(t.f)(t.ctx)
_metricChanSize.Set(float64(len(c.ch)), c.name)
_metricCount.Inc(c.name)
case <-c.ctx.Done():
return
}
}
}
func wrapFunc(f func(c context.Context)) (res func(context.Context)) {
res = func(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 64*1024)
buf = buf[:runtime.Stack(buf, false)]
log.Error("panic in fanout proc, err: %s, stack: %s", r, buf)
}
}()
f(ctx)
if tr, ok := trace.FromContext(ctx); ok {
tr.Finish(nil)
}
}
return
}
// Do save a callback func.
func (c *Fanout) Do(ctx context.Context, f func(ctx context.Context)) (err error) {
if f == nil || c.ctx.Err() != nil {
return c.ctx.Err()
}
nakeCtx := metadata.WithContext(ctx)
if tr, ok := trace.FromContext(ctx); ok {
tr = tr.Fork("", "Fanout:Do").SetTag(traceTags...)
nakeCtx = trace.NewContext(nakeCtx, tr)
}
select {
case c.ch <- item{f: f, ctx: nakeCtx}:
default:
err = ErrFull
}
_metricChanSize.Set(float64(len(c.ch)), c.name)
return
}
// Close close fanout
func (c *Fanout) Close() error {
if err := c.ctx.Err(); err != nil {
return err
}
c.cancel()
c.waiter.Wait()
return nil
}