Skip to content

Commit

Permalink
Custom inner events channel to receive fsnotify's Watcher.Events
Browse files Browse the repository at this point in the history
  • Loading branch information
mstmdev committed Dec 11, 2021
1 parent 3f91e47 commit 569e9f2
Showing 1 changed file with 36 additions and 14 deletions.
50 changes: 36 additions & 14 deletions monitor/fsnotify_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type fsNotifyMonitor struct {
baseMonitor
watcher *fsnotify.Watcher
syncOnce bool
events chan fsnotify.Event
}

// NewFsNotifyMonitor create an instance of fsNotifyMonitor to monitor the disk change
Expand All @@ -31,6 +32,7 @@ func NewFsNotifyMonitor(syncer sync.Sync, retry retry.Retry, syncOnce bool) (m M
watcher: watcher,
syncOnce: syncOnce,
baseMonitor: newBaseMonitor(syncer, retry),
events: make(chan fsnotify.Event, 1000),
}
return m, nil
}
Expand Down Expand Up @@ -79,7 +81,35 @@ func (m *fsNotifyMonitor) Start() error {

go m.processWrite()
go m.startSyncWrite()
go m.processEvents()

return m.listenEvents()
}

func (m *fsNotifyMonitor) listenEvents() error {
for {
select {
case event, ok := <-m.watcher.Events:
{
if !ok {
return errors.New("get fsnotify watch event failed")
}
log.Debug("notify received [%s] -> [%s]", event.Op.String(), event.Name)
m.events <- event
}
case err, ok := <-m.watcher.Errors:
{
if !ok {
return errors.New("get watch error failed")
}
log.Error(err, "watcher error")
}

}
}
}

func (m *fsNotifyMonitor) processEvents() error {
// action trigger
// cp => Create -> Write
// mv => Rename -> Create +->Write
Expand All @@ -89,12 +119,14 @@ func (m *fsNotifyMonitor) Start() error {
// chmod => Chmod
for {
select {
case event, ok := <-m.watcher.Events:
case event, ok := <-m.events:
{
if !ok {
return errors.New("get watch event failed")
err := errors.New("get watch event failed")
log.Error(err, "processEvents get event error")
return err
}
log.Debug("notify received [%s] -> [%s]", event.Op.String(), event.Name)
log.Debug("process event => [%s] -> [%s]", event.Op.String(), event.Name)
if event.Op&fsnotify.Write == fsnotify.Write {
m.addWrite(event.Name)
} else if event.Op&fsnotify.Create == fsnotify.Create {
Expand All @@ -112,7 +144,7 @@ func (m *fsNotifyMonitor) Start() error {
// send a Write event manually
go func() {
log.Debug("prepare to send a Write event after Create event [%s]", event.Name)
m.watcher.Events <- fsnotify.Event{
m.events <- fsnotify.Event{
Name: event.Name,
Op: fsnotify.Write,
}
Expand All @@ -134,16 +166,6 @@ func (m *fsNotifyMonitor) Start() error {
}
break
}

case err, ok := <-m.watcher.Errors:
{
if !ok {
return errors.New("get watch error failed")
}
log.Error(err, "watcher error")
break
}

}
}
}
Expand Down

0 comments on commit 569e9f2

Please sign in to comment.