elasticchan.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. // Forked from github.com/StefanKopieczek/gossip by @StefanKopieczek
  2. package util
  3. import (
  4. "fmt"
  5. "sync"
  6. "github.com/ghettovoice/gosip/log"
  7. )
  8. // The buffer size of the primitive input and output chans.
  9. const c_ELASTIC_CHANSIZE = 3
  10. // A dynamic channel that does not block on send, but has an unlimited buffer capacity.
  11. // ElasticChan uses a dynamic slice to buffer signals received on the input channel until
  12. // the output channel is ready to process them.
  13. type ElasticChan struct {
  14. In chan interface{}
  15. Out chan interface{}
  16. buffer []interface{}
  17. stopped bool
  18. done chan struct{}
  19. log log.Logger
  20. logMu sync.RWMutex
  21. }
  22. // Initialise the Elastic channel, and start the management goroutine.
  23. func (c *ElasticChan) Init() {
  24. c.In = make(chan interface{}, c_ELASTIC_CHANSIZE)
  25. c.Out = make(chan interface{}, c_ELASTIC_CHANSIZE)
  26. c.buffer = make([]interface{}, 0)
  27. c.done = make(chan struct{})
  28. }
  29. func (c *ElasticChan) Run() {
  30. go c.manage()
  31. }
  32. func (c *ElasticChan) Stop() {
  33. select {
  34. case <-c.done:
  35. return
  36. default:
  37. }
  38. logger := c.Log()
  39. if logger != nil {
  40. logger.Trace("stopping elastic chan...")
  41. }
  42. close(c.In)
  43. <-c.done
  44. if logger != nil {
  45. logger.Trace("elastic chan stopped")
  46. }
  47. }
  48. func (c *ElasticChan) Log() log.Logger {
  49. c.logMu.RLock()
  50. defer c.logMu.RUnlock()
  51. return c.log
  52. }
  53. func (c *ElasticChan) SetLog(logger log.Logger) {
  54. c.logMu.Lock()
  55. c.log = logger.
  56. WithPrefix("util.ElasticChan").
  57. WithFields(log.Fields{
  58. "elastic_chan_ptr": fmt.Sprintf("%p", c),
  59. })
  60. c.logMu.Unlock()
  61. }
  62. // Poll for input from one end of the channel and add it to the buffer.
  63. // Also poll sending buffered signals out over the output chan.
  64. // TODO: add cancel chan
  65. func (c *ElasticChan) manage() {
  66. defer close(c.done)
  67. loop:
  68. for {
  69. logger := c.Log()
  70. if len(c.buffer) > 0 {
  71. // The buffer has something in it, so try to send as well as
  72. // receive.
  73. // (Receive first in order to minimize blocked Send() calls).
  74. select {
  75. case in, ok := <-c.In:
  76. if !ok {
  77. if logger != nil {
  78. logger.Trace("elastic chan will dispose")
  79. }
  80. break loop
  81. }
  82. c.Log().Tracef("ElasticChan %p gets '%v'", c, in)
  83. c.buffer = append(c.buffer, in)
  84. case c.Out <- c.buffer[0]:
  85. c.Log().Tracef("ElasticChan %p sends '%v'", c, c.buffer[0])
  86. c.buffer = c.buffer[1:]
  87. }
  88. } else {
  89. // The buffer is empty, so there's nothing to send.
  90. // Just wait to receive.
  91. in, ok := <-c.In
  92. if !ok {
  93. if logger != nil {
  94. logger.Trace("elastic chan will dispose")
  95. }
  96. break loop
  97. }
  98. c.Log().Tracef("ElasticChan %p gets '%v'", c, in)
  99. c.buffer = append(c.buffer, in)
  100. }
  101. }
  102. c.dispose()
  103. }
  104. func (c *ElasticChan) dispose() {
  105. logger := c.Log()
  106. if logger != nil {
  107. logger.Trace("elastic chan disposing...")
  108. }
  109. for len(c.buffer) > 0 {
  110. select {
  111. case c.Out <- c.buffer[0]:
  112. c.buffer = c.buffer[1:]
  113. default:
  114. }
  115. }
  116. if logger != nil {
  117. logger.Trace("elastic chan disposed")
  118. }
  119. }