1package interactors 2 3import ( 4 "sync" 5 6 "github.com/pkg/errors" 7) 8 9type taskRunner struct { 10 errorChan chan error 11 sync.Mutex 12 sync.WaitGroup 13} 14 15func NewTaskRunner() *taskRunner { 16 return &taskRunner{} 17} 18 19func (t *taskRunner) ExecuteFunctionsAsync(functions []func() error) error { 20 t.Lock() 21 defer t.Unlock() 22 t.errorChan = make(chan error) 23 t.spawnTasksAsync( 24 t.syncErrorFnToAsync(functions), 25 ) 26 go t.closeErrorChanOnComplete() 27 return t.breakOnError() 28} 29 30func (t *taskRunner) breakOnError() error { 31 return <-t.errorChan 32} 33 34func (t *taskRunner) syncErrorFnToAsync(functions []func() error) []func() { 35 transformed := make([]func(), len(functions)) 36 for i, fn := range functions { 37 transformed[i] = t.redirectErrToChannel(fn) 38 } 39 return transformed 40} 41 42func (t *taskRunner) closeErrorChanOnComplete() { 43 t.Wait() 44 close(t.errorChan) 45} 46 47func (t *taskRunner) spawnTasksAsync(tasks []func()) { 48 t.Add(len(tasks)) 49 for _, task := range tasks { 50 go task() 51 } 52} 53 54func (t *taskRunner) redirectErrToChannel(f func() error) func() { 55 return func() { 56 defer t.Done() 57 err := f() 58 if err != nil && t.errorChan != nil { 59 t.errorChan <- errors.Wrap(err, "Error redirected to channel") 60 } 61 } 62} 63