diff --git a/file.go b/file.go index 15b502b..1ddc0dd 100644 --- a/file.go +++ b/file.go @@ -87,7 +87,7 @@ func (f *file) cache() error { return err } - f.Rewrite(data) + f.reader = bytes.NewReader(data) return nil } @@ -99,10 +99,6 @@ func (f *file) Path() string { return f.path } -func (f *file) Rename(path string) { - f.path = path -} - func (f *file) Meta() map[string]interface{} { return f.meta } @@ -128,7 +124,3 @@ func (f *file) WriteTo(w io.Writer) (int64, error) { return f.reader.WriteTo(w) } - -func (f *file) Rewrite(data []byte) { - f.reader = bytes.NewReader(data) -} diff --git a/goldsmith.go b/goldsmith.go index e11dc70..8877751 100644 --- a/goldsmith.go +++ b/goldsmith.go @@ -26,27 +26,20 @@ import ( "os" "path/filepath" "sync" - "sync/atomic" - "time" - - "github.com/fatih/color" ) type goldsmith struct { srcDir, dstDir string - - stages []*stage - active int64 - idle int64 + stages []*stage refs map[string]bool refMtx sync.Mutex - tainted bool - faultMtx sync.Mutex + errors []error + errorMtx sync.Mutex } -func (gs *goldsmith) queueFiles(target uint) { +func (gs *goldsmith) queueFiles() { files := make(chan string) go scanDir(gs.srcDir, files, nil) @@ -54,16 +47,7 @@ func (gs *goldsmith) queueFiles(target uint) { go func() { defer close(s.output) - for path := range files { - for { - if gs.active-gs.idle >= int64(target) { - time.Sleep(time.Millisecond) - } else { - break - } - } - relPath, err := filepath.Rel(gs.srcDir, path) if err != nil { panic(err) @@ -115,8 +99,6 @@ func (gs *goldsmith) cleanupFiles() { } func (gs *goldsmith) exportFile(f *file) error { - defer atomic.AddInt64(&gs.active, -1) - absPath := filepath.Join(gs.dstDir, f.path) if err := f.export(absPath); err != nil { return err @@ -142,19 +124,10 @@ func (gs *goldsmith) referenceFile(path string) { } } -func (gs *goldsmith) fault(name string, f *file, err error) { - gs.faultMtx.Lock() - defer gs.faultMtx.Unlock() - - color.Red("Fault Detected\n") - color.Yellow("\tPlugin:\t%s\n", color.WhiteString(name)) - color.Yellow("\tError:\t%s\n\n", color.WhiteString(err.Error())) - if f != nil { - color.Yellow("\tFile:\t%s\n", color.WhiteString(f.path)) - } - - gs.tainted = true - +func (gs *goldsmith) fault(f *file, err error) { + gs.errorMtx.Lock() + gs.errors = append(gs.errors, &Error{f.path, err}) + gs.errorMtx.Unlock() } // @@ -167,12 +140,12 @@ func (gs *goldsmith) Chain(p Plugin) Goldsmith { return gs } -func (gs *goldsmith) Complete() bool { +func (gs *goldsmith) Complete() []error { s := gs.stages[len(gs.stages)-1] for f := range s.output { gs.exportFile(f) } gs.cleanupFiles() - return gs.tainted + return gs.errors } diff --git a/stage.go b/stage.go index df2ca1f..3537083 100644 --- a/stage.go +++ b/stage.go @@ -23,8 +23,8 @@ package goldsmith import ( + "runtime" "sync" - "sync/atomic" ) type stage struct { @@ -45,67 +45,42 @@ func newStage(gs *goldsmith) *stage { func (s *stage) chain(p Plugin) { defer close(s.output) - name, flags, err := p.Initialize() - if err != nil { - s.gs.fault(name, nil, err) - return - } - + init, _ := p.(Initializer) accept, _ := p.(Accepter) proc, _ := p.(Processor) fin, _ := p.(Finalizer) - var ( - wg sync.WaitGroup - mtx sync.Mutex - batch []File - ) - - dispatch := func(f *file) { - if flags&PLUGIN_FLAG_BATCH == PLUGIN_FLAG_BATCH { - atomic.AddInt64(&s.gs.idle, 1) - mtx.Lock() - batch = append(batch, f) - mtx.Unlock() - } else { - s.output <- f + if init != nil { + if err := init.Initialize(); err != nil { + s.gs.fault(nil, err) + return } } - for f := range s.input { - if accept != nil && !accept.Accept(f) { - s.output <- f - } else if proc == nil { - dispatch(f) - } else { - wg.Add(1) - go func(f *file) { - defer wg.Done() - f.rewind() - keep, err := proc.Process(s, f) - if err != nil { - s.gs.fault(name, f, err) - } else if keep { - dispatch(f) + var wg sync.WaitGroup + for i := 0; i < runtime.NumCPU(); i++ { + wg.Add(1) + go func() { + defer wg.Done() + for f := range s.input { + if proc == nil || accept != nil && !accept.Accept(f) { + s.output <- f } else { - atomic.AddInt64(&s.gs.active, -1) + f.rewind() + if err := proc.Process(s, f); err != nil { + s.gs.fault(f, err) + } } - }(f) - } + } + }() } - wg.Wait() if fin != nil { if err := fin.Finalize(s); err != nil { - s.gs.fault(name, nil, err) + s.gs.fault(nil, err) } } - - for _, f := range batch { - atomic.AddInt64(&s.gs.idle, -1) - s.output <- f.(*file) - } } // @@ -113,7 +88,6 @@ func (s *stage) chain(p Plugin) { // func (s *stage) DispatchFile(f File) { - atomic.AddInt64(&s.gs.active, 1) s.output <- f.(*file) } diff --git a/types.go b/types.go index 6008515..f35f66b 100644 --- a/types.go +++ b/types.go @@ -25,12 +25,11 @@ package goldsmith import ( "bytes" "io" - "runtime" ) type Goldsmith interface { Chain(p Plugin) Goldsmith - Complete() bool + Complete() []error } func New(srcDir, dstDir string) Goldsmith { @@ -40,14 +39,16 @@ func New(srcDir, dstDir string) Goldsmith { refs: make(map[string]bool), } - gs.queueFiles(uint(runtime.NumCPU())) + gs.queueFiles() return gs } type File interface { Path() string + Meta() map[string]interface{} Apply(m map[string]interface{}) + Read(p []byte) (int, error) WriteTo(w io.Writer) (int64, error) } @@ -76,12 +77,17 @@ type Context interface { DstDir() string } -const ( - PLUGIN_FLAG_BATCH = 1 << iota -) +type Error struct { + path string + err error +} + +func (e *Error) Error() string { + return e.err.Error() +} type Initializer interface { - Initialize() (string, uint, error) + Initialize() error } type Accepter interface { @@ -93,9 +99,7 @@ type Finalizer interface { } type Processor interface { - Process(ctx Context, f File) (bool, error) + Process(ctx Context, f File) error } -type Plugin interface { - Initializer -} +type Plugin interface{}