M goldsmith.go => goldsmith.go +28 -17
@@ 70,8 70,8 @@ func (gs *goldsmith) scan(srcDir string) {
var f *os.File
if f, file.Err = os.Open(match); file.Err == nil {
- defer f.Close()
_, file.Err = file.Buff.ReadFrom(f)
+ f.Close()
}
s.output <- file
@@ 86,40 86,51 @@ func (gs *goldsmith) makeStage() stage {
return s
}
-func (gs *goldsmith) chainSingle(ts ChainerSingle) {
- s := gs.makeStage()
+func (gs *goldsmith) chainSingle(s stage, cs ChainerSingle) {
+ defer close(s.output)
var wg sync.WaitGroup
for file := range s.input {
wg.Add(1)
go func(f File) {
- s.output <- ts.ChainSingle(f)
- wg.Done()
+ defer wg.Done()
+ if f.Err == nil {
+ s.output <- cs.ChainSingle(f)
+ } else {
+ s.output <- f
+ }
}(file)
}
- go func() {
- wg.Wait()
- close(s.output)
- }()
+ wg.Wait()
}
-func (gs *goldsmith) chainMultiple(tm ChainerMultiple) {
- s := gs.makeStage()
- tm.ChainMultiple(s.input, s.output)
+func (gs *goldsmith) chainMultiple(s stage, cm ChainerMultiple) {
+ filtered := make(chan File)
+ defer close(filtered)
+
+ go cm.ChainMultiple(filtered, s.output)
+
+ for file := range s.input {
+ if file.Err == nil {
+ filtered <- file
+ } else {
+ s.output <- file
+ }
+ }
}
-func (gs *goldsmith) Chain(chain interface{}, err error) Goldsmith {
+func (gs *goldsmith) Chain(ctx *Context) Goldsmith {
if gs.err != nil {
return gs
}
- if gs.err = err; gs.err != nil {
- switch t := chain.(type) {
+ if gs.err = ctx.Err; gs.err != nil {
+ switch c := ctx.Chainer.(type) {
case ChainerSingle:
- gs.chainSingle(t)
+ go gs.chainSingle(gs.makeStage(), c)
case ChainerMultiple:
- gs.chainMultiple(t)
+ go gs.chainMultiple(gs.makeStage(), c)
}
}
M types.go => types.go +6 -1
@@ 25,7 25,7 @@ package goldsmith
import "bytes"
type Goldsmith interface {
- Chain(task interface{}, err error) Goldsmith
+ Chain(ctx *Context) Goldsmith
Complete(dstDir string) ([]File, error)
}
@@ 43,3 43,8 @@ type File struct {
Buff bytes.Buffer
Err error
}
+
+type Context struct {
+ Chainer interface{}
+ Err error
+}