diff --git a/file.go b/file.go index 2d91d0c..d038cc3 100644 --- a/file.go +++ b/file.go @@ -153,3 +153,7 @@ func (f *file) Rewrite(data []byte) { func (f *file) Bytes() []byte { return f.srcData } + +func (f *file) Meta() map[string]interface{} { + return f.meta +} diff --git a/goldsmith.go b/goldsmith.go index d257df5..559fe13 100644 --- a/goldsmith.go +++ b/goldsmith.go @@ -31,11 +31,6 @@ import ( "time" ) -type stage struct { - name string - input, output chan *file -} - type goldsmith struct { srcDir, dstDir string refs map[string]bool @@ -50,7 +45,7 @@ func (gs *goldsmith) queueFiles(target uint) { files := make(chan string) go scanDir(gs.srcDir, files, nil) - s := gs.newStage("Goldsmith") + s := newStage(gs) go func() { defer close(s.output) @@ -69,7 +64,7 @@ func (gs *goldsmith) queueFiles(target uint) { panic(err) } - gs.CopyFile(relPath, path) + s.AddFile(NewFileFromPath(relPath, path)) } }() } @@ -124,127 +119,11 @@ func (gs *goldsmith) exportFile(f *file) error { return err } - gs.RefFile(f.path) + gs.refFile(f.path) return nil } -func (gs *goldsmith) newStage(name string) *stage { - s := &stage{ - name: name, - output: make(chan *file), - } - - if len(gs.stages) > 0 { - s.input = gs.stages[len(gs.stages)-1].output - } - - gs.stages = append(gs.stages, s) - return s -} - -func (gs *goldsmith) chain(s *stage, p Plugin) { - defer close(s.output) - - init, _ := p.(Initializer) - accept, _ := p.(Accepter) - proc, _ := p.(Processor) - fin, _ := p.(Finalizer) - - var ( - wg sync.WaitGroup - mtx sync.Mutex - batch []File - ) - - dispatch := func(f *file) { - if fin == nil { - s.output <- f - } else { - mtx.Lock() - batch = append(batch, f) - mtx.Unlock() - - atomic.AddInt64(&gs.stalled, 1) - } - } - - if init != nil { - if err := init.Initialize(gs); err != nil { - gs.fault(s, nil, err) - return - } - } - - for f := range s.input { - if accept != nil && !accept.Accept(f) { - s.output <- f - } else if proc == nil { - dispatch(f) - } else { - wg.Add(1) - go func(f *file) { - defer wg.Done() - if err := proc.Process(gs, f); err != nil { - gs.fault(s, f, err) - } - dispatch(f) - }(f) - } - } - - wg.Wait() - - if fin != nil { - if err := fin.Finalize(gs, batch); err != nil { - gs.fault(s, nil, err) - } - - for _, f := range batch { - atomic.AddInt64(&gs.stalled, -1) - s.output <- f.(*file) - } - } -} - -func (gs *goldsmith) fault(s *stage, f *file, err error) { - log.Printf("%s\t%s\t%s", s.name, f.path, err) - gs.tainted = true -} - -// -// Goldsmith Implementation -// - -func (gs *goldsmith) Chain(p Plugin) Goldsmith { - go gs.chain(gs.newStage(p.Name()), p) - return gs -} - -func (gs *goldsmith) Complete() bool { - s := gs.stages[len(gs.stages)-1] - for f := range s.output { - gs.exportFile(f) - } - - gs.cleanupFiles() - return gs.tainted -} - -// -// Context Implementation -// - -func (gs *goldsmith) NewFile(path string, data []byte) File { - atomic.AddInt64(&gs.active, 1) - return newFileFromData(path, data) -} - -func (gs *goldsmith) CopyFile(dst, src string) File { - atomic.AddInt64(&gs.active, 1) - return newFileFromPath(dst, src) -} - -func (gs *goldsmith) RefFile(path string) { +func (gs *goldsmith) refFile(path string) { gs.mtx.Lock() defer gs.mtx.Unlock() @@ -264,10 +143,27 @@ func (gs *goldsmith) RefFile(path string) { } } -func (gs *goldsmith) SrcDir() string { - return gs.srcDir +func (gs *goldsmith) fault(s *stage, f *file, err error) { + log.Printf("%s\t%s\t%s", s.name, f.path, err) + gs.tainted = true } -func (gs *goldsmith) DstDir() string { - return gs.dstDir +// +// Goldsmith Implementation +// + +func (gs *goldsmith) Chain(p Plugin) Goldsmith { + s := newStage(gs) + go s.chain(p) + return gs +} + +func (gs *goldsmith) Complete() bool { + s := gs.stages[len(gs.stages)-1] + for f := range s.output { + gs.exportFile(f) + } + + gs.cleanupFiles() + return gs.tainted } diff --git a/stage.go b/stage.go new file mode 100644 index 0000000..88debbd --- /dev/null +++ b/stage.go @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2015 Alex Yatskov + * Author: Alex Yatskov + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package goldsmith + +import ( + "sync" + "sync/atomic" +) + +type stage struct { + name string + gs *goldsmith + input, output chan *file +} + +func newStage(gs *goldsmith) *stage { + s := &stage{gs: gs, output: make(chan *file)} + if len(gs.stages) > 0 { + s.input = gs.stages[len(gs.stages)-1].output + } + + gs.stages = append(gs.stages, s) + return s +} + +func (s *stage) chain(p Plugin) { + defer close(s.output) + + init, _ := p.(Initializer) + accept, _ := p.(Accepter) + proc, _ := p.(Processor) + fin, _ := p.(Finalizer) + + var ( + wg sync.WaitGroup + mtx sync.Mutex + batch []File + ) + + dispatch := func(f *file) { + if fin == nil { + s.output <- f + } else { + mtx.Lock() + batch = append(batch, f) + mtx.Unlock() + + atomic.AddInt64(&s.gs.stalled, 1) + } + } + + if init != nil { + if err := init.Initialize(s); err != nil { + s.gs.fault(s, nil, err) + return + } + } + + for f := range s.input { + if accept != nil && !accept.Accept(f) { + s.output <- f + } else if proc == nil { + dispatch(f) + } else { + wg.Add(1) + go func(f *file) { + defer wg.Done() + if err := proc.Process(s, f); err != nil { + s.gs.fault(s, f, err) + } + dispatch(f) + }(f) + } + } + + wg.Wait() + + if fin != nil { + if err := fin.Finalize(s, batch); err != nil { + s.gs.fault(s, nil, err) + } + + for _, f := range batch { + atomic.AddInt64(&s.gs.stalled, -1) + s.output <- f.(*file) + } + } +} + +// +// Context Implementation +// + +func (s *stage) AddFile(f File) { + atomic.AddInt64(&s.gs.active, 1) + s.output <- f.(*file) +} + +func (s *stage) RefFile(path string) { + s.gs.refFile(path) +} + +func (s *stage) SrcDir() string { + return s.gs.srcDir +} + +func (s *stage) DstDir() string { + return s.gs.dstDir +} diff --git a/types.go b/types.go index 3b3e071..9c6292d 100644 --- a/types.go +++ b/types.go @@ -60,11 +60,19 @@ type File interface { Rewrite(data []byte) Bytes() []byte + Meta() map[string]interface{} +} + +func NewFileFromData(path string, srcData []byte) File { + return newFileFromData(path, srcData) +} + +func NewFileFromPath(path, srcPath string) File { + return newFileFromPath(path, srcPath) } type Context interface { - NewFile(path string, data []byte) File - CopyFile(dst, src string) File + AddFile(f File) RefFile(path string) SrcDir() string