• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1package interactors
2
3import (
4	"sync"
5
6	"github.com/pkg/errors"
7)
8
9type taskRunner struct {
10	errorChan chan error
11	sync.Mutex
12	sync.WaitGroup
13}
14
15func NewTaskRunner() *taskRunner {
16	return &taskRunner{}
17}
18
19func (t *taskRunner) ExecuteFunctionsAsync(functions []func() error) error {
20	t.Lock()
21	defer t.Unlock()
22	t.errorChan = make(chan error)
23	t.spawnTasksAsync(
24		t.syncErrorFnToAsync(functions),
25	)
26	go t.closeErrorChanOnComplete()
27	return t.breakOnError()
28}
29
30func (t *taskRunner) breakOnError() error {
31	return <-t.errorChan
32}
33
34func (t *taskRunner) syncErrorFnToAsync(functions []func() error) []func() {
35	transformed := make([]func(), len(functions))
36	for i, fn := range functions {
37		transformed[i] = t.redirectErrToChannel(fn)
38	}
39	return transformed
40}
41
42func (t *taskRunner) closeErrorChanOnComplete() {
43	t.Wait()
44	close(t.errorChan)
45}
46
47func (t *taskRunner) spawnTasksAsync(tasks []func()) {
48	t.Add(len(tasks))
49	for _, task := range tasks {
50		go task()
51	}
52}
53
54func (t *taskRunner) redirectErrToChannel(f func() error) func() {
55	return func() {
56		defer t.Done()
57		err := f()
58		if err != nil && t.errorChan != nil {
59			t.errorChan <- errors.Wrap(err, "Error redirected to channel")
60		}
61	}
62}
63