123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- // Forked from github.com/StefanKopieczek/gossip by @StefanKopieczek
- package util
- import (
- "fmt"
- "sync"
- "github.com/ghettovoice/gosip/log"
- )
- // The buffer size of the primitive input and output chans.
- const c_ELASTIC_CHANSIZE = 3
- // A dynamic channel that does not block on send, but has an unlimited buffer capacity.
- // ElasticChan uses a dynamic slice to buffer signals received on the input channel until
- // the output channel is ready to process them.
- type ElasticChan struct {
- In chan interface{}
- Out chan interface{}
- buffer []interface{}
- stopped bool
- done chan struct{}
- log log.Logger
- logMu sync.RWMutex
- }
- // Initialise the Elastic channel, and start the management goroutine.
- func (c *ElasticChan) Init() {
- c.In = make(chan interface{}, c_ELASTIC_CHANSIZE)
- c.Out = make(chan interface{}, c_ELASTIC_CHANSIZE)
- c.buffer = make([]interface{}, 0)
- c.done = make(chan struct{})
- }
- func (c *ElasticChan) Run() {
- go c.manage()
- }
- func (c *ElasticChan) Stop() {
- select {
- case <-c.done:
- return
- default:
- }
- logger := c.Log()
- if logger != nil {
- logger.Trace("stopping elastic chan...")
- }
- close(c.In)
- <-c.done
- if logger != nil {
- logger.Trace("elastic chan stopped")
- }
- }
- func (c *ElasticChan) Log() log.Logger {
- c.logMu.RLock()
- defer c.logMu.RUnlock()
- return c.log
- }
- func (c *ElasticChan) SetLog(logger log.Logger) {
- c.logMu.Lock()
- c.log = logger.
- WithPrefix("util.ElasticChan").
- WithFields(log.Fields{
- "elastic_chan_ptr": fmt.Sprintf("%p", c),
- })
- c.logMu.Unlock()
- }
- // Poll for input from one end of the channel and add it to the buffer.
- // Also poll sending buffered signals out over the output chan.
- // TODO: add cancel chan
- func (c *ElasticChan) manage() {
- defer close(c.done)
- loop:
- for {
- logger := c.Log()
- if len(c.buffer) > 0 {
- // The buffer has something in it, so try to send as well as
- // receive.
- // (Receive first in order to minimize blocked Send() calls).
- select {
- case in, ok := <-c.In:
- if !ok {
- if logger != nil {
- logger.Trace("elastic chan will dispose")
- }
- break loop
- }
- c.Log().Tracef("ElasticChan %p gets '%v'", c, in)
- c.buffer = append(c.buffer, in)
- case c.Out <- c.buffer[0]:
- c.Log().Tracef("ElasticChan %p sends '%v'", c, c.buffer[0])
- c.buffer = c.buffer[1:]
- }
- } else {
- // The buffer is empty, so there's nothing to send.
- // Just wait to receive.
- in, ok := <-c.In
- if !ok {
- if logger != nil {
- logger.Trace("elastic chan will dispose")
- }
- break loop
- }
- c.Log().Tracef("ElasticChan %p gets '%v'", c, in)
- c.buffer = append(c.buffer, in)
- }
- }
- c.dispose()
- }
- func (c *ElasticChan) dispose() {
- logger := c.Log()
- if logger != nil {
- logger.Trace("elastic chan disposing...")
- }
- for len(c.buffer) > 0 {
- select {
- case c.Out <- c.buffer[0]:
- c.buffer = c.buffer[1:]
- default:
- }
- }
- if logger != nil {
- logger.Trace("elastic chan disposed")
- }
- }
|