package pool import ( "context" "io" "sync" "time" ) var _ Pool = &Slice{} // Slice . type Slice struct { // New is an application supplied function for creating and configuring a // item. // // The item returned from new must not be in a special state // (subscribed to pubsub channel, transaction started, ...). New func(ctx context.Context) (io.Closer, error) stop func() // stop cancels the item opener. // mu protects fields defined below. mu sync.Mutex freeItem []*item itemRequests map[uint64]chan item nextRequest uint64 // Next key to use in itemRequests. active int // number of opened and pending open items // Used to signal the need for new items // a goroutine running itemOpener() reads on this chan and // maybeOpenNewItems sends on the chan (one send per needed item) // It is closed during db.Close(). The close tells the itemOpener // goroutine to exit. openerCh chan struct{} closed bool cleanerCh chan struct{} // Config pool configuration conf *Config } // NewSlice creates a new pool. func NewSlice(c *Config) *Slice { // check Config if c == nil || c.Active < c.Idle { panic("config nil or Idle Must <= Active") } ctx, cancel := context.WithCancel(context.Background()) // new pool p := &Slice{ conf: c, stop: cancel, itemRequests: make(map[uint64]chan item), openerCh: make(chan struct{}, 1000000), } p.startCleanerLocked(time.Duration(c.IdleTimeout)) go p.itemOpener(ctx) return p } // Reload reload config. func (p *Slice) Reload(c *Config) error { p.mu.Lock() //p.startCleanerLocked(time.Duration(c.IdleTimeout)) p.setActive(c.Active) p.setIdle(c.Idle) p.conf = c p.mu.Unlock() return nil } // Get returns a newly-opened or cached *item. func (p *Slice) Get(ctx context.Context) (io.Closer, error) { p.mu.Lock() if p.closed { p.mu.Unlock() return nil, ErrPoolClosed } idleTimeout := time.Duration(p.conf.IdleTimeout) // Prefer a free item, if possible. numFree := len(p.freeItem) for numFree > 0 { i := p.freeItem[0] copy(p.freeItem, p.freeItem[1:]) p.freeItem = p.freeItem[:numFree-1] p.mu.Unlock() if i.expired(idleTimeout) { i.close() p.mu.Lock() p.release() } else { return i.c, nil } numFree = len(p.freeItem) } // Out of free items or we were asked not to use one. If we're not // allowed to open any more items, make a request and wait. if p.conf.Active > 0 && p.active >= p.conf.Active { // check WaitTimeout and return directly if p.conf.WaitTimeout == 0 && !p.conf.Wait { p.mu.Unlock() return nil, ErrPoolExhausted } // Make the item channel. It's buffered so that the // itemOpener doesn't block while waiting for the req to be read. req := make(chan item, 1) reqKey := p.nextRequestKeyLocked() p.itemRequests[reqKey] = req wt := p.conf.WaitTimeout p.mu.Unlock() // reset context timeout if wt > 0 { var cancel func() _, ctx, cancel = wt.Shrink(ctx) defer cancel() } // Timeout the item request with the context. select { case <-ctx.Done(): // Remove the item request and ensure no value has been sent // on it after removing. p.mu.Lock() delete(p.itemRequests, reqKey) p.mu.Unlock() return nil, ctx.Err() case ret, ok := <-req: if !ok { return nil, ErrPoolClosed } if ret.expired(idleTimeout) { ret.close() p.mu.Lock() p.release() } else { return ret.c, nil } } } p.active++ // optimistically p.mu.Unlock() c, err := p.New(ctx) if err != nil { p.mu.Lock() p.release() p.mu.Unlock() return nil, err } return c, nil } // Put adds a item to the p's free pool. // err is optionally the last error that occurred on this item. func (p *Slice) Put(ctx context.Context, c io.Closer, forceClose bool) error { p.mu.Lock() defer p.mu.Unlock() if forceClose { p.release() return c.Close() } added := p.putItemLocked(c) if !added { p.active-- return c.Close() } return nil } // Satisfy a item or put the item in the idle pool and return true // or return false. // putItemLocked will satisfy a item if there is one, or it will // return the *item to the freeItem list if err == nil and the idle // item limit will not be exceeded. // If err != nil, the value of i is ignored. // If err == nil, then i must not equal nil. // If a item was fulfilled or the *item was placed in the // freeItem list, then true is returned, otherwise false is returned. func (p *Slice) putItemLocked(c io.Closer) bool { if p.closed { return false } if p.conf.Active > 0 && p.active > p.conf.Active { return false } i := item{ c: c, createdAt: nowFunc(), } if l := len(p.itemRequests); l > 0 { var req chan item var reqKey uint64 for reqKey, req = range p.itemRequests { break } delete(p.itemRequests, reqKey) // Remove from pending requests. req <- i return true } else if !p.closed && p.maxIdleItemsLocked() > len(p.freeItem) { p.freeItem = append(p.freeItem, &i) return true } return false } // Runs in a separate goroutine, opens new item when requested. func (p *Slice) itemOpener(ctx context.Context) { for { select { case <-ctx.Done(): return case <-p.openerCh: p.openNewItem(ctx) } } } func (p *Slice) maybeOpenNewItems() { numRequests := len(p.itemRequests) if p.conf.Active > 0 { numCanOpen := p.conf.Active - p.active if numRequests > numCanOpen { numRequests = numCanOpen } } for numRequests > 0 { p.active++ // optimistically numRequests-- if p.closed { return } p.openerCh <- struct{}{} } } // openNewItem one new item func (p *Slice) openNewItem(ctx context.Context) { // maybeOpenNewConnctions has already executed p.active++ before it sent // on p.openerCh. This function must execute p.active-- if the // item fails or is closed before returning. c, err := p.New(ctx) p.mu.Lock() defer p.mu.Unlock() if err != nil { p.release() return } if !p.putItemLocked(c) { p.active-- c.Close() } } // setIdle sets the maximum number of items in the idle // item pool. // // If MaxOpenConns is greater than 0 but less than the new IdleConns // then the new IdleConns will be reduced to match the MaxOpenConns limit // // If n <= 0, no idle items are retained. func (p *Slice) setIdle(n int) { if n > 0 { p.conf.Idle = n } else { // No idle items. p.conf.Idle = -1 } // Make sure maxIdle doesn't exceed maxOpen if p.conf.Active > 0 && p.maxIdleItemsLocked() > p.conf.Active { p.conf.Idle = p.conf.Active } var closing []*item idleCount := len(p.freeItem) maxIdle := p.maxIdleItemsLocked() if idleCount > maxIdle { closing = p.freeItem[maxIdle:] p.freeItem = p.freeItem[:maxIdle] } for _, c := range closing { c.close() } } // setActive sets the maximum number of open items to the database. // // If IdleConns is greater than 0 and the new MaxOpenConns is less than // IdleConns, then IdleConns will be reduced to match the new // MaxOpenConns limit // // If n <= 0, then there is no limit on the number of open items. // The default is 0 (unlimited). func (p *Slice) setActive(n int) { p.conf.Active = n if n < 0 { p.conf.Active = 0 } syncIdle := p.conf.Active > 0 && p.maxIdleItemsLocked() > p.conf.Active if syncIdle { p.setIdle(n) } } // startCleanerLocked starts itemCleaner if needed. func (p *Slice) startCleanerLocked(d time.Duration) { if d <= 0 { // if set 0, staleCleaner() will return directly return } if d < time.Duration(p.conf.IdleTimeout) && p.cleanerCh != nil { select { case p.cleanerCh <- struct{}{}: default: } } // run only one, clean stale items. if p.cleanerCh == nil { p.cleanerCh = make(chan struct{}, 1) go p.staleCleaner(time.Duration(p.conf.IdleTimeout)) } } func (p *Slice) staleCleaner(d time.Duration) { const minInterval = 100 * time.Millisecond if d < minInterval { d = minInterval } t := time.NewTimer(d) for { select { case <-t.C: case <-p.cleanerCh: // maxLifetime was changed or db was closed. } p.mu.Lock() d = time.Duration(p.conf.IdleTimeout) if p.closed || d <= 0 { p.mu.Unlock() return } expiredSince := nowFunc().Add(-d) var closing []*item for i := 0; i < len(p.freeItem); i++ { c := p.freeItem[i] if c.createdAt.Before(expiredSince) { closing = append(closing, c) p.active-- last := len(p.freeItem) - 1 p.freeItem[i] = p.freeItem[last] p.freeItem[last] = nil p.freeItem = p.freeItem[:last] i-- } } p.mu.Unlock() for _, c := range closing { c.close() } if d < minInterval { d = minInterval } t.Reset(d) } } // nextRequestKeyLocked returns the next item request key. // It is assumed that nextRequest will not overflow. func (p *Slice) nextRequestKeyLocked() uint64 { next := p.nextRequest p.nextRequest++ return next } const defaultIdleItems = 2 func (p *Slice) maxIdleItemsLocked() int { n := p.conf.Idle switch { case n == 0: return defaultIdleItems case n < 0: return 0 default: return n } } func (p *Slice) release() { p.active-- p.maybeOpenNewItems() } // Close close pool. func (p *Slice) Close() error { p.mu.Lock() if p.closed { p.mu.Unlock() return nil } if p.cleanerCh != nil { close(p.cleanerCh) } var err error for _, i := range p.freeItem { i.close() } p.freeItem = nil p.closed = true for _, req := range p.itemRequests { close(req) } p.mu.Unlock() p.stop() return err }