From 7954a85a2a40a9e10c599a8b9be2e85635a073bc Mon Sep 17 00:00:00 2001 From: Alex Yatskov Date: Fri, 18 Dec 2015 13:14:39 +0900 Subject: [PATCH] WIP --- file.go | 141 +++++++++++++++++++++++++++++++++++++++++++++++++++ goldsmith.go | 113 ++++++++++++++++------------------------- stage.go | 25 ++++++--- types.go | 35 ++++++------- 4 files changed, 220 insertions(+), 94 deletions(-) create mode 100644 file.go diff --git a/file.go b/file.go new file mode 100644 index 0000000..b4e97d8 --- /dev/null +++ b/file.go @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2015 Alex Yatskov + * Author: Alex Yatskov + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package goldsmith + +import ( + "bytes" + "io" + "io/ioutil" + "os" + "path" +) + +type file struct { + path string + meta map[string]interface{} + + srcData *bytes.Reader + srcPath string +} + +func newFileFromData(path string, srcData []byte) *file { + return &file{ + path: path, + meta: make(map[string]interface{}), + srcData: bytes.NewReader(srcData), + } +} + +func newFileFromPath(path, srcPath string) *file { + return &file{ + path: path, + meta: make(map[string]interface{}), + srcPath: srcPath, + } +} + +func (f *file) rewind() { + if f.srcData != nil { + f.srcData.Seek(0, os.SEEK_SET) + } +} + +func (f *file) export(dstPath string) error { + if err := os.MkdirAll(path.Dir(dstPath), 0755); err != nil { + return err + } + + if err := f.cache(); err != nil { + return err + } + + f.rewind() + + fh, err := os.Create(dstPath) + if err != nil { + return err + } + defer fh.Close() + + var buff [1024]byte + for { + count, err := f.Read(buff[:]) + if err == io.EOF { + break + } else if err != nil { + return err + } + + if _, err := fh.Write(buff[:count]); err != nil { + return err + } + } + + return nil +} + +func (f *file) cache() error { + if f.srcData != nil { + return nil + } + + data, err := ioutil.ReadFile(f.srcPath) + if err != nil { + return err + } + + f.srcData = bytes.NewReader(data) + return nil +} + +func (f *file) Path() string { + return f.path +} + +func (f *file) Keys() (keys []string) { + for key := range f.meta { + keys = append(keys, key) + } + + return keys +} + +func (f *file) Value(key string, def interface{}) interface{} { + if value, ok := f.meta[key]; ok { + return value + } + + return def +} + +func (f *file) SetValue(key string, value interface{}) { + f.meta[key] = value +} + +func (f *file) Read(p []byte) (int, error) { + if err := f.cache(); err != nil { + return 0, err + } + + return f.srcData.Read(p) +} diff --git a/goldsmith.go b/goldsmith.go index 7dc8368..636ad30 100644 --- a/goldsmith.go +++ b/goldsmith.go @@ -23,21 +23,14 @@ package goldsmith import ( - "bytes" + "log" "os" - "path" "path/filepath" "sync" "sync/atomic" "time" ) -type stage struct { - gs *goldsmith - input, output chan *File - err error -} - type goldsmith struct { srcDir, dstDir string stages []*stage @@ -45,6 +38,12 @@ type goldsmith struct { mtx sync.Mutex active int64 stalled int64 + tainted bool +} + +func (gs *goldsmith) fault(s *stage, f *file, err error) { + log.Print("%s\t%s\t%s", s, f, err) + gs.tainted = true } func (gs *goldsmith) queueFiles(target uint) { @@ -70,15 +69,7 @@ func (gs *goldsmith) queueFiles(target uint) { panic(err) } - file := NewFile(relPath) - - var f *os.File - if f, file.Err = os.Open(path); file.Err == nil { - _, file.Err = file.Buff.ReadFrom(f) - f.Close() - } - - s.AddFile(file) + s.CopyFile(relPath, path) } }() } @@ -125,28 +116,16 @@ func (gs *goldsmith) cleanupFiles() { } } -func (gs *goldsmith) exportFile(file *File) { - defer func() { - file.Buff = *bytes.NewBuffer(nil) - atomic.AddInt64(&gs.active, -1) - }() +func (gs *goldsmith) exportFile(f *file) error { + defer atomic.AddInt64(&gs.active, -1) - if file.Err != nil { - return + absPath := filepath.Join(gs.dstDir, f.path) + if err := f.export(absPath); err != nil { + return err } - absPath := filepath.Join(gs.dstDir, file.Path) - if file.Err = os.MkdirAll(path.Dir(absPath), 0755); file.Err != nil { - return - } - - var f *os.File - if f, file.Err = os.Create(absPath); file.Err == nil { - defer f.Close() - if _, file.Err = f.Write(file.Buff.Bytes()); file.Err == nil { - gs.refFile(file.Path) - } - } + gs.refFile(f.path) + return nil } func (gs *goldsmith) refFile(path string) { @@ -170,7 +149,7 @@ func (gs *goldsmith) refFile(path string) { } func (gs *goldsmith) newStage() *stage { - s := &stage{gs: gs, output: make(chan *File)} + s := &stage{gs: gs, output: make(chan *file)} if len(gs.stages) > 0 { s.input = gs.stages[len(gs.stages)-1].output } @@ -190,53 +169,55 @@ func (gs *goldsmith) chain(s *stage, p Plugin) { var ( wg sync.WaitGroup mtx sync.Mutex - batch []*File + batch []File ) - dispatch := func(f *File) { + dispatch := func(f *file) { if fin == nil { s.output <- f } else { mtx.Lock() batch = append(batch, f) - atomic.AddInt64(&s.gs.stalled, 1) mtx.Unlock() + + atomic.AddInt64(&s.gs.stalled, 1) } } if init != nil { - if s.err = init.Initialize(s); s.err != nil { + if err := init.Initialize(s); err != nil { + gs.fault(s, nil, err) return } } - for file := range s.input { - if file.Err != nil || accept != nil && !accept.Accept(file) { - s.output <- file + for f := range s.input { + if accept != nil && !accept.Accept(f) { + s.output <- f } else if proc == nil { - dispatch(file) + dispatch(f) } else { wg.Add(1) - go func(f *File) { + go func(f *file) { defer wg.Done() - if proc.Process(s, f) { - dispatch(f) - } else { - f.Buff = *bytes.NewBuffer(nil) - atomic.AddInt64(&gs.active, -1) + if err := proc.Process(s, f); err != nil { + gs.fault(s, f, err) } - }(file) + dispatch(f) + }(f) } } wg.Wait() if fin != nil { - if s.err = fin.Finalize(s, batch); s.err == nil { - for _, file := range batch { - atomic.AddInt64(&s.gs.stalled, -1) - s.output <- file - } + if err := fin.Finalize(s, batch); err != nil { + gs.fault(s, nil, err) + } + + for _, f := range batch { + atomic.AddInt64(&s.gs.stalled, -1) + s.output <- f.(*file) } } } @@ -246,26 +227,20 @@ func (gs *goldsmith) Chain(p Plugin) Goldsmith { return gs } -func (gs *goldsmith) Complete() ([]*File, []error) { +func (gs *goldsmith) Complete() bool { s := gs.stages[len(gs.stages)-1] - var files []*File - for file := range s.output { - gs.exportFile(file) - files = append(files, file) + for f := range s.output { + gs.exportFile(f) } gs.cleanupFiles() - var errs []error - for _, s := range gs.stages { - if s.err != nil { - errs = append(errs, s.err) - } - } + tainted := gs.tainted gs.stages = nil gs.refs = nil + gs.tainted = false - return files, errs + return tainted } diff --git a/stage.go b/stage.go index 61bf6b2..76d934a 100644 --- a/stage.go +++ b/stage.go @@ -22,17 +22,30 @@ package goldsmith -import "sync/atomic" +import ( + "io" + "sync/atomic" +) + +type stage struct { + gs *goldsmith + input, output chan *file +} + +func (s *stage) NewFile(path string, r io.Reader) File { + atomic.AddInt64(&s.gs.active, 1) + return nil +} + +func (s *stage) CopyFile(dst, src string) File { + atomic.AddInt64(&s.gs.active, 1) + return nil +} func (s *stage) RefFile(path string) { s.gs.refFile(path) } -func (s *stage) AddFile(file *File) { - atomic.AddInt64(&s.gs.active, 1) - s.output <- file -} - func (s *stage) SrcDir() string { return s.gs.srcDir } diff --git a/types.go b/types.go index ffb02bd..f197c03 100644 --- a/types.go +++ b/types.go @@ -23,13 +23,13 @@ package goldsmith import ( - "bytes" + "io" "runtime" ) type Goldsmith interface { Chain(p Plugin) Goldsmith - Complete() ([]*File, []error) + Complete() bool } func New(srcDir, dstDir string) Goldsmith { @@ -42,32 +42,29 @@ func NewThrottled(srcDir, dstDir string, targetFileCount uint) Goldsmith { return gs } -type File struct { - Path string - Meta map[string]interface{} - Buff bytes.Buffer - Err error -} +type File interface { + Path() string -func NewFile(path string) *File { - return &File{ - Path: cleanPath(path), - Meta: make(map[string]interface{}), - } + Keys() []string + Value(key string, def interface{}) interface{} + SetValue(key string, value interface{}) + + Read(p []byte) (int, error) } type Context interface { + NewFile(path string, r io.Reader) File + CopyFile(dst, src string) File + RefFile(path string) + SrcDir() string DstDir() string - - AddFile(file *File) - RefFile(path string) } type Plugin interface{} type Accepter interface { - Accept(file *File) bool + Accept(file File) bool } type Initializer interface { @@ -75,9 +72,9 @@ type Initializer interface { } type Finalizer interface { - Finalize(ctx Context, files []*File) error + Finalize(ctx Context, fs []File) error } type Processor interface { - Process(ctx Context, file *File) bool + Process(ctx Context, f File) error }