From 803f98775bbf4c4e005b91167adb7bcb3376e61c Mon Sep 17 00:00:00 2001 From: Alex Yatskov Date: Thu, 17 Dec 2015 17:55:59 +0900 Subject: [PATCH] Detect stalled files --- goldsmith.go | 40 +++++++++++++++++++++------------------- stage.go | 4 +++- types.go | 8 +++++++- 3 files changed, 31 insertions(+), 21 deletions(-) diff --git a/goldsmith.go b/goldsmith.go index 4af6bca..f1611af 100644 --- a/goldsmith.go +++ b/goldsmith.go @@ -28,6 +28,7 @@ import ( "path/filepath" "sync" "sync/atomic" + "time" ) type stage struct { @@ -41,10 +42,11 @@ type goldsmith struct { stages []*stage refs map[string]bool mtx sync.Mutex - count int32 + busy int64 + stalled int64 } -func (gs *goldsmith) queueFiles() { +func (gs *goldsmith) queueFiles(target uint) { files := make(chan string) go scanDir(gs.srcDir, files, nil) @@ -54,6 +56,14 @@ func (gs *goldsmith) queueFiles() { defer close(s.output) for path := range files { + for { + if gs.busy-gs.stalled >= int64(target) { + time.Sleep(time.Millisecond) + } else { + break + } + } + relPath, err := filepath.Rel(gs.srcDir, path) if err != nil { panic(err) @@ -67,7 +77,7 @@ func (gs *goldsmith) queueFiles() { f.Close() } - s.output <- file + s.AddFile(file) } }() } @@ -117,7 +127,7 @@ func (gs *goldsmith) cleanupFiles() { func (gs *goldsmith) exportFile(file *File) { defer func() { file.Buff.Reset() - gs.decFiles() + atomic.AddInt64(&gs.busy, -1) }() if file.Err != nil { @@ -158,20 +168,8 @@ func (gs *goldsmith) refFile(path string) { } } -func (gs *goldsmith) incFiles() { - atomic.AddInt32(&gs.count, 1) -} - -func (gs *goldsmith) decFiles() { - atomic.AddInt32(&gs.count, -1) -} - func (gs *goldsmith) newStage() *stage { - s := &stage{ - gs: gs, - output: make(chan *File), - } - + s := &stage{gs: gs, output: make(chan *File)} if len(gs.stages) > 0 { s.input = gs.stages[len(gs.stages)-1].output } @@ -200,6 +198,7 @@ func (gs *goldsmith) chain(s *stage, p Plugin) { } else { mtx.Lock() batch = append(batch, f) + atomic.AddInt64(&s.gs.stalled, 1) mtx.Unlock() } } @@ -211,7 +210,9 @@ func (gs *goldsmith) chain(s *stage, p Plugin) { } for file := range s.input { - if file.Err != nil || proc == nil || (accept != nil && !accept.Accept(file)) { + if file.Err != nil || accept != nil && !accept.Accept(file) { + s.output <- file + } else if proc == nil { dispatch(file) } else { wg.Add(1) @@ -220,7 +221,7 @@ func (gs *goldsmith) chain(s *stage, p Plugin) { if proc.Process(s, f) { dispatch(f) } else { - gs.decFiles() + atomic.AddInt64(&gs.busy, -1) } }(file) } @@ -231,6 +232,7 @@ func (gs *goldsmith) chain(s *stage, p Plugin) { if fin != nil { if s.err = fin.Finalize(s, batch); s.err == nil { for _, file := range batch { + atomic.AddInt64(&s.gs.stalled, -1) s.output <- file } } diff --git a/stage.go b/stage.go index e55c0ab..9faa212 100644 --- a/stage.go +++ b/stage.go @@ -22,12 +22,14 @@ package goldsmith +import "sync/atomic" + func (s *stage) RefFile(path string) { s.gs.refFile(path) } func (s *stage) AddFile(file *File) { - s.gs.incFiles() + atomic.AddInt64(&s.gs.busy, 1) s.output <- file } diff --git a/types.go b/types.go index a52504e..e58f4b2 100644 --- a/types.go +++ b/types.go @@ -24,14 +24,20 @@ package goldsmith import "bytes" +const TargetFileCount = 32 + type Goldsmith interface { Chain(p Plugin) Goldsmith Complete() ([]*File, []error) } func New(srcDir, dstDir string) Goldsmith { + return NewThrottled(srcDir, dstDir, TargetFileCount) +} + +func NewThrottled(srcDir, dstDir string, targetFileCount uint) Goldsmith { gs := &goldsmith{srcDir: srcDir, dstDir: dstDir} - gs.queueFiles() + gs.queueFiles(targetFileCount) return gs }