operations.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. package webrtc
  4. import (
  5. "container/list"
  6. "sync"
  7. )
  8. // Operation is a function
  9. type operation func()
  10. // Operations is a task executor.
  11. type operations struct {
  12. mu sync.Mutex
  13. busy bool
  14. ops *list.List
  15. }
  16. func newOperations() *operations {
  17. return &operations{
  18. ops: list.New(),
  19. }
  20. }
  21. // Enqueue adds a new action to be executed. If there are no actions scheduled,
  22. // the execution will start immediately in a new goroutine.
  23. func (o *operations) Enqueue(op operation) {
  24. if op == nil {
  25. return
  26. }
  27. o.mu.Lock()
  28. running := o.busy
  29. o.ops.PushBack(op)
  30. o.busy = true
  31. o.mu.Unlock()
  32. if !running {
  33. go o.start()
  34. }
  35. }
  36. // IsEmpty checks if there are tasks in the queue
  37. func (o *operations) IsEmpty() bool {
  38. o.mu.Lock()
  39. defer o.mu.Unlock()
  40. return o.ops.Len() == 0
  41. }
  42. // Done blocks until all currently enqueued operations are finished executing.
  43. // For more complex synchronization, use Enqueue directly.
  44. func (o *operations) Done() {
  45. var wg sync.WaitGroup
  46. wg.Add(1)
  47. o.Enqueue(func() {
  48. wg.Done()
  49. })
  50. wg.Wait()
  51. }
  52. func (o *operations) pop() func() {
  53. o.mu.Lock()
  54. defer o.mu.Unlock()
  55. if o.ops.Len() == 0 {
  56. return nil
  57. }
  58. e := o.ops.Front()
  59. o.ops.Remove(e)
  60. if op, ok := e.Value.(operation); ok {
  61. return op
  62. }
  63. return nil
  64. }
  65. func (o *operations) start() {
  66. defer func() {
  67. o.mu.Lock()
  68. defer o.mu.Unlock()
  69. if o.ops.Len() == 0 {
  70. o.busy = false
  71. return
  72. }
  73. // either a new operation was enqueued while we
  74. // were busy, or an operation panicked
  75. go o.start()
  76. }()
  77. fn := o.pop()
  78. for fn != nil {
  79. fn()
  80. fn = o.pop()
  81. }
  82. }