You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
132 lines
2.5 KiB
132 lines
2.5 KiB
package pipeline
|
|
|
|
import (
|
|
"context"
|
|
"reflect"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/bilibili/kratos/pkg/net/metadata"
|
|
xtime "github.com/bilibili/kratos/pkg/time"
|
|
)
|
|
|
|
func TestPipeline(t *testing.T) {
|
|
conf := &Config{
|
|
MaxSize: 3,
|
|
Interval: xtime.Duration(time.Millisecond * 20),
|
|
Buffer: 3,
|
|
Worker: 10,
|
|
}
|
|
type recv struct {
|
|
mirror string
|
|
ch int
|
|
values map[string][]interface{}
|
|
}
|
|
var runs []recv
|
|
do := func(c context.Context, ch int, values map[string][]interface{}) {
|
|
runs = append(runs, recv{
|
|
mirror: metadata.String(c, metadata.Mirror),
|
|
values: values,
|
|
ch: ch,
|
|
})
|
|
}
|
|
split := func(s string) int {
|
|
n, _ := strconv.Atoi(s)
|
|
return n
|
|
}
|
|
p := NewPipeline(conf)
|
|
p.Do = do
|
|
p.Split = split
|
|
p.Start()
|
|
p.Add(context.Background(), "1", 1)
|
|
p.Add(context.Background(), "1", 2)
|
|
p.Add(context.Background(), "11", 3)
|
|
p.Add(context.Background(), "2", 3)
|
|
time.Sleep(time.Millisecond * 60)
|
|
mirrorCtx := metadata.NewContext(context.Background(), metadata.MD{metadata.Mirror: "1"})
|
|
p.Add(mirrorCtx, "2", 3)
|
|
time.Sleep(time.Millisecond * 60)
|
|
p.SyncAdd(mirrorCtx, "5", 5)
|
|
time.Sleep(time.Millisecond * 60)
|
|
p.Close()
|
|
expt := []recv{
|
|
{
|
|
mirror: "",
|
|
ch: 1,
|
|
values: map[string][]interface{}{
|
|
"1": {1, 2},
|
|
"11": {3},
|
|
},
|
|
},
|
|
{
|
|
mirror: "",
|
|
ch: 2,
|
|
values: map[string][]interface{}{
|
|
"2": {3},
|
|
},
|
|
},
|
|
{
|
|
mirror: "1",
|
|
ch: 2,
|
|
values: map[string][]interface{}{
|
|
"2": {3},
|
|
},
|
|
},
|
|
{
|
|
mirror: "1",
|
|
ch: 5,
|
|
values: map[string][]interface{}{
|
|
"5": {5},
|
|
},
|
|
},
|
|
}
|
|
if !reflect.DeepEqual(runs, expt) {
|
|
t.Errorf("expect get %+v,\n got: %+v", expt, runs)
|
|
}
|
|
}
|
|
|
|
func TestPipelineSmooth(t *testing.T) {
|
|
conf := &Config{
|
|
MaxSize: 100,
|
|
Interval: xtime.Duration(time.Second),
|
|
Buffer: 100,
|
|
Worker: 10,
|
|
Smooth: true,
|
|
}
|
|
type result struct {
|
|
index int
|
|
ts time.Time
|
|
}
|
|
var results []result
|
|
do := func(c context.Context, index int, values map[string][]interface{}) {
|
|
results = append(results, result{
|
|
index: index,
|
|
ts: time.Now(),
|
|
})
|
|
}
|
|
split := func(s string) int {
|
|
n, _ := strconv.Atoi(s)
|
|
return n
|
|
}
|
|
p := NewPipeline(conf)
|
|
p.Do = do
|
|
p.Split = split
|
|
p.Start()
|
|
for i := 0; i < 10; i++ {
|
|
p.Add(context.Background(), strconv.Itoa(i), 1)
|
|
}
|
|
time.Sleep(time.Millisecond * 1500)
|
|
if len(results) != conf.Worker {
|
|
t.Errorf("expect results equal worker")
|
|
t.FailNow()
|
|
}
|
|
for i, r := range results {
|
|
if i > 0 {
|
|
if r.ts.Sub(results[i-1].ts) < time.Millisecond*20 {
|
|
t.Errorf("expect runs be smooth")
|
|
t.FailNow()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|