diff --git a/goldsmith.go b/goldsmith.go index 43d93fa..2782c1e 100644 --- a/goldsmith.go +++ b/goldsmith.go @@ -35,9 +35,9 @@ import ( type goldsmith struct { srcDir, dstDir string - stages []*stage - active int64 - stalled int64 + stages []*stage + active int64 + idle int64 refs map[string]bool refMtx sync.Mutex @@ -57,7 +57,7 @@ func (gs *goldsmith) queueFiles(target uint) { for path := range files { for { - if gs.active-gs.stalled >= int64(target) { + if gs.active-gs.idle >= int64(target) { time.Sleep(time.Millisecond) } else { break @@ -149,14 +149,15 @@ func (gs *goldsmith) referenceFile(path string) { } } -func (gs *goldsmith) fault(s *stage, step string, f *file, err error) { +func (gs *goldsmith) fault(name string, f *file, err error) { gs.faultMtx.Lock() defer gs.faultMtx.Unlock() color.Red("Fault Detected\n") - color.Yellow("\tPlugin:\t%s\n", color.WhiteString(s.name)) - color.Yellow("\tStep:\t%s\n", color.WhiteString(step)) - color.Yellow("\tFile:\t%s\n", color.WhiteString(f.path)) + color.Yellow("\tPlugin:\t%s\n", color.WhiteString(name)) + if f != nil { + color.Yellow("\tFile:\t%s\n", color.WhiteString(f.path)) + } color.Yellow("\tError:\t%s\n\n", color.WhiteString(err.Error())) gs.tainted = true diff --git a/stage.go b/stage.go index f166e32..9b483db 100644 --- a/stage.go +++ b/stage.go @@ -28,7 +28,6 @@ import ( ) type stage struct { - name string gs *goldsmith input, output chan *file } @@ -45,9 +44,13 @@ func newStage(gs *goldsmith) *stage { func (s *stage) chain(p Plugin) { defer close(s.output) - s.name = p.Name() - init, _ := p.(Initializer) + name, flags, err := p.Initialize(s) + if err != nil { + s.gs.fault(name, nil, err) + return + } + accept, _ := p.(Accepter) proc, _ := p.(Processor) fin, _ := p.(Finalizer) @@ -59,21 +62,13 @@ func (s *stage) chain(p Plugin) { ) dispatch := func(f *file) { - if fin == nil { - s.output <- f - } else { + if flags&PLUGIN_FLAG_BATCH == PLUGIN_FLAG_BATCH { + atomic.AddInt64(&s.gs.idle, 1) mtx.Lock() batch = append(batch, f) mtx.Unlock() - - atomic.AddInt64(&s.gs.stalled, 1) - } - } - - if init != nil { - if err := init.Initialize(s); err != nil { - s.gs.fault(s, "Initialization", nil, err) - return + } else { + s.output <- f } } @@ -89,7 +84,7 @@ func (s *stage) chain(p Plugin) { f.rewind() keep, err := proc.Process(s, f) if err != nil { - s.gs.fault(s, "Processing", f, err) + s.gs.fault(name, f, err) } else if keep { dispatch(f) } else { @@ -102,14 +97,14 @@ func (s *stage) chain(p Plugin) { wg.Wait() if fin != nil { - if err := fin.Finalize(s, batch); err != nil { - s.gs.fault(s, "Finalization", nil, err) + if err := fin.Finalize(s); err != nil { + s.gs.fault(name, nil, err) } + } - for _, f := range batch { - atomic.AddInt64(&s.gs.stalled, -1) - s.output <- f.(*file) - } + for _, f := range batch { + atomic.AddInt64(&s.gs.idle, -1) + s.output <- f.(*file) } } diff --git a/types.go b/types.go index 7fa045e..fb5033b 100644 --- a/types.go +++ b/types.go @@ -79,20 +79,20 @@ type Context interface { DstDir() string } +const ( + PLUGIN_FLAG_BATCH = 1 << iota +) + type Plugin interface { - Name() string + Initialize(ctx Context) (string, uint, error) } type Accepter interface { Accept(file File) bool } -type Initializer interface { - Initialize(ctx Context) error -} - type Finalizer interface { - Finalize(ctx Context, fs []File) error + Finalize(ctx Context) error } type Processor interface {