Simplification
This commit is contained in:
parent
7f3789a6d4
commit
2f684bb48d
10
file.go
10
file.go
@ -87,7 +87,7 @@ func (f *file) cache() error {
|
||||
return err
|
||||
}
|
||||
|
||||
f.Rewrite(data)
|
||||
f.reader = bytes.NewReader(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -99,10 +99,6 @@ func (f *file) Path() string {
|
||||
return f.path
|
||||
}
|
||||
|
||||
func (f *file) Rename(path string) {
|
||||
f.path = path
|
||||
}
|
||||
|
||||
func (f *file) Meta() map[string]interface{} {
|
||||
return f.meta
|
||||
}
|
||||
@ -128,7 +124,3 @@ func (f *file) WriteTo(w io.Writer) (int64, error) {
|
||||
|
||||
return f.reader.WriteTo(w)
|
||||
}
|
||||
|
||||
func (f *file) Rewrite(data []byte) {
|
||||
f.reader = bytes.NewReader(data)
|
||||
}
|
||||
|
45
goldsmith.go
45
goldsmith.go
@ -26,27 +26,20 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/fatih/color"
|
||||
)
|
||||
|
||||
type goldsmith struct {
|
||||
srcDir, dstDir string
|
||||
|
||||
stages []*stage
|
||||
active int64
|
||||
idle int64
|
||||
|
||||
refs map[string]bool
|
||||
refMtx sync.Mutex
|
||||
|
||||
tainted bool
|
||||
faultMtx sync.Mutex
|
||||
errors []error
|
||||
errorMtx sync.Mutex
|
||||
}
|
||||
|
||||
func (gs *goldsmith) queueFiles(target uint) {
|
||||
func (gs *goldsmith) queueFiles() {
|
||||
files := make(chan string)
|
||||
go scanDir(gs.srcDir, files, nil)
|
||||
|
||||
@ -54,16 +47,7 @@ func (gs *goldsmith) queueFiles(target uint) {
|
||||
|
||||
go func() {
|
||||
defer close(s.output)
|
||||
|
||||
for path := range files {
|
||||
for {
|
||||
if gs.active-gs.idle >= int64(target) {
|
||||
time.Sleep(time.Millisecond)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
relPath, err := filepath.Rel(gs.srcDir, path)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
@ -115,8 +99,6 @@ func (gs *goldsmith) cleanupFiles() {
|
||||
}
|
||||
|
||||
func (gs *goldsmith) exportFile(f *file) error {
|
||||
defer atomic.AddInt64(&gs.active, -1)
|
||||
|
||||
absPath := filepath.Join(gs.dstDir, f.path)
|
||||
if err := f.export(absPath); err != nil {
|
||||
return err
|
||||
@ -142,19 +124,10 @@ func (gs *goldsmith) referenceFile(path string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (gs *goldsmith) fault(name string, f *file, err error) {
|
||||
gs.faultMtx.Lock()
|
||||
defer gs.faultMtx.Unlock()
|
||||
|
||||
color.Red("Fault Detected\n")
|
||||
color.Yellow("\tPlugin:\t%s\n", color.WhiteString(name))
|
||||
color.Yellow("\tError:\t%s\n\n", color.WhiteString(err.Error()))
|
||||
if f != nil {
|
||||
color.Yellow("\tFile:\t%s\n", color.WhiteString(f.path))
|
||||
}
|
||||
|
||||
gs.tainted = true
|
||||
|
||||
func (gs *goldsmith) fault(f *file, err error) {
|
||||
gs.errorMtx.Lock()
|
||||
gs.errors = append(gs.errors, &Error{f.path, err})
|
||||
gs.errorMtx.Unlock()
|
||||
}
|
||||
|
||||
//
|
||||
@ -167,12 +140,12 @@ func (gs *goldsmith) Chain(p Plugin) Goldsmith {
|
||||
return gs
|
||||
}
|
||||
|
||||
func (gs *goldsmith) Complete() bool {
|
||||
func (gs *goldsmith) Complete() []error {
|
||||
s := gs.stages[len(gs.stages)-1]
|
||||
for f := range s.output {
|
||||
gs.exportFile(f)
|
||||
}
|
||||
|
||||
gs.cleanupFiles()
|
||||
return gs.tainted
|
||||
return gs.errors
|
||||
}
|
||||
|
64
stage.go
64
stage.go
@ -23,8 +23,8 @@
|
||||
package goldsmith
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type stage struct {
|
||||
@ -45,67 +45,42 @@ func newStage(gs *goldsmith) *stage {
|
||||
func (s *stage) chain(p Plugin) {
|
||||
defer close(s.output)
|
||||
|
||||
name, flags, err := p.Initialize()
|
||||
if err != nil {
|
||||
s.gs.fault(name, nil, err)
|
||||
return
|
||||
}
|
||||
|
||||
init, _ := p.(Initializer)
|
||||
accept, _ := p.(Accepter)
|
||||
proc, _ := p.(Processor)
|
||||
fin, _ := p.(Finalizer)
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
mtx sync.Mutex
|
||||
batch []File
|
||||
)
|
||||
|
||||
dispatch := func(f *file) {
|
||||
if flags&PLUGIN_FLAG_BATCH == PLUGIN_FLAG_BATCH {
|
||||
atomic.AddInt64(&s.gs.idle, 1)
|
||||
mtx.Lock()
|
||||
batch = append(batch, f)
|
||||
mtx.Unlock()
|
||||
} else {
|
||||
s.output <- f
|
||||
if init != nil {
|
||||
if err := init.Initialize(); err != nil {
|
||||
s.gs.fault(nil, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for f := range s.input {
|
||||
if accept != nil && !accept.Accept(f) {
|
||||
s.output <- f
|
||||
} else if proc == nil {
|
||||
dispatch(f)
|
||||
} else {
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < runtime.NumCPU(); i++ {
|
||||
wg.Add(1)
|
||||
go func(f *file) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
f.rewind()
|
||||
keep, err := proc.Process(s, f)
|
||||
if err != nil {
|
||||
s.gs.fault(name, f, err)
|
||||
} else if keep {
|
||||
dispatch(f)
|
||||
for f := range s.input {
|
||||
if proc == nil || accept != nil && !accept.Accept(f) {
|
||||
s.output <- f
|
||||
} else {
|
||||
atomic.AddInt64(&s.gs.active, -1)
|
||||
}
|
||||
}(f)
|
||||
f.rewind()
|
||||
if err := proc.Process(s, f); err != nil {
|
||||
s.gs.fault(f, err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if fin != nil {
|
||||
if err := fin.Finalize(s); err != nil {
|
||||
s.gs.fault(name, nil, err)
|
||||
s.gs.fault(nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, f := range batch {
|
||||
atomic.AddInt64(&s.gs.idle, -1)
|
||||
s.output <- f.(*file)
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
@ -113,7 +88,6 @@ func (s *stage) chain(p Plugin) {
|
||||
//
|
||||
|
||||
func (s *stage) DispatchFile(f File) {
|
||||
atomic.AddInt64(&s.gs.active, 1)
|
||||
s.output <- f.(*file)
|
||||
}
|
||||
|
||||
|
26
types.go
26
types.go
@ -25,12 +25,11 @@ package goldsmith
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
type Goldsmith interface {
|
||||
Chain(p Plugin) Goldsmith
|
||||
Complete() bool
|
||||
Complete() []error
|
||||
}
|
||||
|
||||
func New(srcDir, dstDir string) Goldsmith {
|
||||
@ -40,14 +39,16 @@ func New(srcDir, dstDir string) Goldsmith {
|
||||
refs: make(map[string]bool),
|
||||
}
|
||||
|
||||
gs.queueFiles(uint(runtime.NumCPU()))
|
||||
gs.queueFiles()
|
||||
return gs
|
||||
}
|
||||
|
||||
type File interface {
|
||||
Path() string
|
||||
|
||||
Meta() map[string]interface{}
|
||||
Apply(m map[string]interface{})
|
||||
|
||||
Read(p []byte) (int, error)
|
||||
WriteTo(w io.Writer) (int64, error)
|
||||
}
|
||||
@ -76,12 +77,17 @@ type Context interface {
|
||||
DstDir() string
|
||||
}
|
||||
|
||||
const (
|
||||
PLUGIN_FLAG_BATCH = 1 << iota
|
||||
)
|
||||
type Error struct {
|
||||
path string
|
||||
err error
|
||||
}
|
||||
|
||||
func (e *Error) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
||||
|
||||
type Initializer interface {
|
||||
Initialize() (string, uint, error)
|
||||
Initialize() error
|
||||
}
|
||||
|
||||
type Accepter interface {
|
||||
@ -93,9 +99,7 @@ type Finalizer interface {
|
||||
}
|
||||
|
||||
type Processor interface {
|
||||
Process(ctx Context, f File) (bool, error)
|
||||
Process(ctx Context, f File) error
|
||||
}
|
||||
|
||||
type Plugin interface {
|
||||
Initializer
|
||||
}
|
||||
type Plugin interface{}
|
||||
|
Loading…
Reference in New Issue
Block a user