This commit is contained in:
Alex Yatskov 2015-12-19 20:51:23 +09:00
parent 5aae6ed383
commit bb80029ef9
3 changed files with 32 additions and 36 deletions

View File

@ -37,7 +37,7 @@ type goldsmith struct {
stages []*stage
active int64
stalled int64
idle int64
refs map[string]bool
refMtx sync.Mutex
@ -57,7 +57,7 @@ func (gs *goldsmith) queueFiles(target uint) {
for path := range files {
for {
if gs.active-gs.stalled >= int64(target) {
if gs.active-gs.idle >= int64(target) {
time.Sleep(time.Millisecond)
} else {
break
@ -149,14 +149,15 @@ func (gs *goldsmith) referenceFile(path string) {
}
}
func (gs *goldsmith) fault(s *stage, step string, f *file, err error) {
func (gs *goldsmith) fault(name string, f *file, err error) {
gs.faultMtx.Lock()
defer gs.faultMtx.Unlock()
color.Red("Fault Detected\n")
color.Yellow("\tPlugin:\t%s\n", color.WhiteString(s.name))
color.Yellow("\tStep:\t%s\n", color.WhiteString(step))
color.Yellow("\tPlugin:\t%s\n", color.WhiteString(name))
if f != nil {
color.Yellow("\tFile:\t%s\n", color.WhiteString(f.path))
}
color.Yellow("\tError:\t%s\n\n", color.WhiteString(err.Error()))
gs.tainted = true

View File

@ -28,7 +28,6 @@ import (
)
type stage struct {
name string
gs *goldsmith
input, output chan *file
}
@ -45,9 +44,13 @@ func newStage(gs *goldsmith) *stage {
func (s *stage) chain(p Plugin) {
defer close(s.output)
s.name = p.Name()
init, _ := p.(Initializer)
name, flags, err := p.Initialize(s)
if err != nil {
s.gs.fault(name, nil, err)
return
}
accept, _ := p.(Accepter)
proc, _ := p.(Processor)
fin, _ := p.(Finalizer)
@ -59,21 +62,13 @@ func (s *stage) chain(p Plugin) {
)
dispatch := func(f *file) {
if fin == nil {
s.output <- f
} else {
if flags&PLUGIN_FLAG_BATCH == PLUGIN_FLAG_BATCH {
atomic.AddInt64(&s.gs.idle, 1)
mtx.Lock()
batch = append(batch, f)
mtx.Unlock()
atomic.AddInt64(&s.gs.stalled, 1)
}
}
if init != nil {
if err := init.Initialize(s); err != nil {
s.gs.fault(s, "Initialization", nil, err)
return
} else {
s.output <- f
}
}
@ -89,7 +84,7 @@ func (s *stage) chain(p Plugin) {
f.rewind()
keep, err := proc.Process(s, f)
if err != nil {
s.gs.fault(s, "Processing", f, err)
s.gs.fault(name, f, err)
} else if keep {
dispatch(f)
} else {
@ -102,15 +97,15 @@ func (s *stage) chain(p Plugin) {
wg.Wait()
if fin != nil {
if err := fin.Finalize(s, batch); err != nil {
s.gs.fault(s, "Finalization", nil, err)
if err := fin.Finalize(s); err != nil {
s.gs.fault(name, nil, err)
}
}
for _, f := range batch {
atomic.AddInt64(&s.gs.stalled, -1)
atomic.AddInt64(&s.gs.idle, -1)
s.output <- f.(*file)
}
}
}
//

View File

@ -79,20 +79,20 @@ type Context interface {
DstDir() string
}
const (
PLUGIN_FLAG_BATCH = 1 << iota
)
type Plugin interface {
Name() string
Initialize(ctx Context) (string, uint, error)
}
type Accepter interface {
Accept(file File) bool
}
type Initializer interface {
Initialize(ctx Context) error
}
type Finalizer interface {
Finalize(ctx Context, fs []File) error
Finalize(ctx Context) error
}
type Processor interface {