diff --git a/README.md b/README.md index 21572580..6ed05146 100644 --- a/README.md +++ b/README.md @@ -51,5 +51,5 @@ gofs -src="rs://127.0.0.1:9016?mode=server&local_sync_disabled=true&path=./src&f Start a remote disk client to sync files from remote disk server. ```bash -gofs -src="rs://127.0.0.1:9016?msg_queue=500" -target=./target +gofs -src="rs://127.0.0.1:9016" -target=./target ``` \ No newline at end of file diff --git a/core/vfs.go b/core/vfs.go index 1fef1b50..06b13d2f 100644 --- a/core/vfs.go +++ b/core/vfs.go @@ -15,7 +15,6 @@ type VFS struct { port int server bool fsServer string - msgQueue int localSyncDisabled bool } @@ -23,10 +22,8 @@ const ( paramPath = "path" paramMode = "mode" paramFsServer = "fs_server" - paramMsgQueue = "msg_queue" paramLocalSyncDisabled = "local_sync_disabled" valueModeServer = "server" - valueDefaultMsgQueue = 500 valueLocalSyncIsDisabled = "true" RemoteServerSchema = "rs://" ) @@ -71,11 +68,6 @@ func (vfs *VFS) FsServer() string { return vfs.fsServer } -// MessageQueue receive message queue size -func (vfs *VFS) MessageQueue() int { - return vfs.msgQueue -} - // LocalSyncDisabled is local disk sync disabled func (vfs *VFS) LocalSyncDisabled() bool { return vfs.localSyncDisabled @@ -106,7 +98,7 @@ func NewVFS(path string) VFS { if strings.HasPrefix(lowerPath, RemoteServerSchema) { // example of rs protocol to see README.md vfs.fsType = RemoteDisk - _, vfs.host, vfs.port, vfs.path, vfs.server, vfs.fsServer, vfs.msgQueue, vfs.localSyncDisabled, err = parse(path) + _, vfs.host, vfs.port, vfs.path, vfs.server, vfs.fsServer, vfs.localSyncDisabled, err = parse(path) } if err != nil { return NewEmptyVFS() @@ -114,7 +106,7 @@ func NewVFS(path string) VFS { return vfs } -func parse(path string) (scheme string, host string, port int, localPath string, isServer bool, fsServer string, msgQueue int, localSyncDisabled bool, err error) { +func parse(path string) (scheme string, host string, port int, localPath string, isServer bool, fsServer string, localSyncDisabled bool, err error) { parseUrl, err := url.Parse(path) if err != nil { return @@ -138,18 +130,6 @@ func parse(path string) (scheme string, host string, port int, localPath string, } } - defaultMsgQueue := valueDefaultMsgQueue - msgQueueStr := parseUrl.Query().Get(paramMsgQueue) - if len(msgQueueStr) > 0 { - msgQueue, err = strconv.Atoi(msgQueueStr) - if err != nil || msgQueue <= 0 { - // if error happened, reset to the default value - msgQueue = defaultMsgQueue - } - } else { - msgQueue = defaultMsgQueue - } - localSyncDisabledValue := parseUrl.Query().Get(paramLocalSyncDisabled) if strings.ToLower(localSyncDisabledValue) == valueLocalSyncIsDisabled { localSyncDisabled = true diff --git a/monitor/base_monitor.go b/monitor/base_monitor.go index 1c79e965..f604a243 100644 --- a/monitor/base_monitor.go +++ b/monitor/base_monitor.go @@ -63,6 +63,9 @@ func (m *baseMonitor) processWrite() { if wm != nil { if (wm.count <= 2 && now-wm.last <= time.Second.Nanoseconds()) || (wm.count > 2 && now-wm.last <= 3*time.Second.Nanoseconds()) { m.mu.Unlock() + go func() { + m.writeNotify <- true + }() continue } go func() { diff --git a/monitor/fsnotify_monitor.go b/monitor/fsnotify_monitor.go index 6902919b..49cceb19 100644 --- a/monitor/fsnotify_monitor.go +++ b/monitor/fsnotify_monitor.go @@ -1,6 +1,7 @@ package monitor import ( + "container/list" "errors" "github.com/fsnotify/fsnotify" "github.com/no-src/gofs/core" @@ -10,13 +11,14 @@ import ( "github.com/no-src/log" "io/fs" "path/filepath" + "time" ) type fsNotifyMonitor struct { baseMonitor watcher *fsnotify.Watcher syncOnce bool - events chan fsnotify.Event + events *list.List } // NewFsNotifyMonitor create an instance of fsNotifyMonitor to monitor the disk change @@ -33,7 +35,7 @@ func NewFsNotifyMonitor(syncer sync.Sync, retry retry.Retry, syncOnce bool) (m M watcher: watcher, syncOnce: syncOnce, baseMonitor: newBaseMonitor(syncer, retry), - events: make(chan fsnotify.Event, 1000), + events: list.New(), } return m, nil } @@ -96,7 +98,7 @@ func (m *fsNotifyMonitor) listenEvents() error { return errors.New("get fsnotify watch event failed") } log.Debug("notify received [%s] -> [%s]", event.Op.String(), event.Name) - m.events <- event + m.events.PushBack(event) } case err, ok := <-m.watcher.Errors: { @@ -119,56 +121,54 @@ func (m *fsNotifyMonitor) processEvents() error { // touch => Create -> Chmod // chmod => Chmod for { - select { - case event, ok := <-m.events: - { - if !ok { - err := errors.New("get watch event failed") - log.Error(err, "processEvents get event error") - return err - } - log.Debug("process event => [%s] -> [%s]", event.Op.String(), event.Name) - if event.Op&fsnotify.Write == fsnotify.Write { - m.addWrite(event.Name) - } else if event.Op&fsnotify.Create == fsnotify.Create { - err := m.syncer.Create(event.Name) - if err == nil { - // if create a new dir, then monitor it - isDir, err := m.syncer.IsDir(event.Name) - if err == nil && isDir { - if err = m.monitor(event.Name); err != nil { - log.Error(err, "Create event execute monitor error => [%s]", event.Name) - } - } - if err == nil && (!isDir || (isDir && !util.IsWindows())) { - // rename a file, will not trigger the Write event - // rename a dir, will not trigger the Write event on Linux, but it will trigger the Write event for parent dir on Windows - // send a Write event manually - go func() { - log.Debug("prepare to send a Write event after Create event [%s]", event.Name) - m.events <- fsnotify.Event{ - Name: event.Name, - Op: fsnotify.Write, - } - }() - } - } - } else if event.Op&fsnotify.Remove == fsnotify.Remove { - if err := m.syncer.Remove(event.Name); err != nil { - log.Error(err, "Remove event execute error => [%s]", event.Name) - } - } else if event.Op&fsnotify.Rename == fsnotify.Rename { - if err := m.syncer.Rename(event.Name); err != nil { - log.Error(err, "Rename event execute error => [%s]", event.Name) - } - } else if event.Op&fsnotify.Chmod == fsnotify.Chmod { - if err := m.syncer.Chmod(event.Name); err != nil { - log.Error(err, "Chmod event execute error => [%s]", event.Name) + element := m.events.Front() + if element == nil || element.Value == nil { + if element != nil { + m.events.Remove(element) + } + <-time.After(time.Second) + continue + } + + event := element.Value.(fsnotify.Event) + if event.Op&fsnotify.Write == fsnotify.Write { + m.addWrite(event.Name) + } else if event.Op&fsnotify.Create == fsnotify.Create { + err := m.syncer.Create(event.Name) + if err == nil { + // if create a new dir, then monitor it + isDir, err := m.syncer.IsDir(event.Name) + if err == nil && isDir { + if err = m.monitor(event.Name); err != nil { + log.Error(err, "Create event execute monitor error => [%s]", event.Name) } } - break + if err == nil && (!isDir || (isDir && !util.IsWindows())) { + // rename a file, will not trigger the Write event + // rename a dir, will not trigger the Write event on Linux, but it will trigger the Write event for parent dir on Windows + // send a Write event manually + log.Debug("prepare to send a Write event after Create event [%s]", event.Name) + m.events.PushBack(fsnotify.Event{ + Name: event.Name, + Op: fsnotify.Write, + }) + + } + } + } else if event.Op&fsnotify.Remove == fsnotify.Remove { + if err := m.syncer.Remove(event.Name); err != nil { + log.Error(err, "Remove event execute error => [%s]", event.Name) + } + } else if event.Op&fsnotify.Rename == fsnotify.Rename { + if err := m.syncer.Rename(event.Name); err != nil { + log.Error(err, "Rename event execute error => [%s]", event.Name) + } + } else if event.Op&fsnotify.Chmod == fsnotify.Chmod { + if err := m.syncer.Chmod(event.Name); err != nil { + log.Error(err, "Chmod event execute error => [%s]", event.Name) } } + m.events.Remove(element) } } diff --git a/monitor/monitor.go b/monitor/monitor.go index 2727d2d6..52002d12 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -27,7 +27,7 @@ func NewMonitor(syncer sync.Sync, retry retry.Retry, syncOnce bool, enableTLS bo } else if src.Is(core.RemoteDisk) && src.Server() { return NewFsNotifyMonitor(syncer, retry, syncOnce) } else if src.Is(core.RemoteDisk) && !src.Server() { - return NewRemoteClientMonitor(syncer, retry, syncOnce, src.Host(), src.Port(), src.MessageQueue(), enableTLS, certFile, keyFile, users) + return NewRemoteClientMonitor(syncer, retry, syncOnce, src.Host(), src.Port(), enableTLS, certFile, keyFile, users) } return nil, fmt.Errorf("file system unsupported ! src=>%s", src.Type().String()) diff --git a/monitor/remote_client_monitor.go b/monitor/remote_client_monitor.go index 021a9095..84ef6632 100644 --- a/monitor/remote_client_monitor.go +++ b/monitor/remote_client_monitor.go @@ -1,6 +1,7 @@ package monitor import ( + "container/list" "errors" "fmt" "github.com/no-src/gofs/auth" @@ -12,13 +13,14 @@ import ( "github.com/no-src/log" "net/url" "strings" + "time" ) type remoteClientMonitor struct { baseMonitor client tran.Client closed bool - messages chan message + messages *list.List syncOnce bool currentUser *auth.HashUser authorized bool @@ -29,14 +31,14 @@ type message struct { } // NewRemoteClientMonitor create an instance of remoteClientMonitor to monitor the remote file change -func NewRemoteClientMonitor(syncer sync.Sync, retry retry.Retry, syncOnce bool, host string, port int, messageQueue int, enableTLS bool, certFile string, keyFile string, users []*auth.User) (Monitor, error) { +func NewRemoteClientMonitor(syncer sync.Sync, retry retry.Retry, syncOnce bool, host string, port int, enableTLS bool, certFile string, keyFile string, users []*auth.User) (Monitor, error) { if syncer == nil { err := errors.New("syncer can't be nil") return nil, err } m := &remoteClientMonitor{ client: tran.NewClient(host, port, enableTLS), - messages: make(chan message, messageQueue), + messages: list.New(), syncOnce: syncOnce, baseMonitor: newBaseMonitor(syncer, retry), } @@ -175,9 +177,9 @@ func (m *remoteClientMonitor) Start() error { }, fmt.Sprintf("client reconnect to %s:%d", m.client.Host(), m.client.Port())) } } else { - m.messages <- message{ + m.messages.PushBack(message{ data: data, - } + }) } } return nil @@ -185,7 +187,15 @@ func (m *remoteClientMonitor) Start() error { func (m *remoteClientMonitor) processingMessage() { for { - message := <-m.messages + element := m.messages.Front() + if element == nil || element.Value == nil { + if element != nil { + m.messages.Remove(element) + } + <-time.After(time.Second) + continue + } + message := element.Value.(message) log.Info("client read request => %s", string(message.data)) var msg sync.Message err := util.Unmarshal(message.data, &msg) @@ -225,6 +235,7 @@ func (m *remoteClientMonitor) processingMessage() { break } } + m.messages.Remove(element) } }