Skip to content

Commit

Permalink
Use the list.List to save messages and events, avoid lost data, and r…
Browse files Browse the repository at this point in the history
…emove the msg_queue param
  • Loading branch information
mstmdev committed Dec 13, 2021
1 parent 6f838d4 commit c0f02c2
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 79 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ gofs -src="rs://127.0.0.1:9016?mode=server&local_sync_disabled=true&path=./src&f
Start a remote disk client to sync files from remote disk server.

```bash
gofs -src="rs://127.0.0.1:9016?msg_queue=500" -target=./target
gofs -src="rs://127.0.0.1:9016" -target=./target
```
24 changes: 2 additions & 22 deletions core/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,15 @@ type VFS struct {
port int
server bool
fsServer string
msgQueue int
localSyncDisabled bool
}

const (
paramPath = "path"
paramMode = "mode"
paramFsServer = "fs_server"
paramMsgQueue = "msg_queue"
paramLocalSyncDisabled = "local_sync_disabled"
valueModeServer = "server"
valueDefaultMsgQueue = 500
valueLocalSyncIsDisabled = "true"
RemoteServerSchema = "rs://"
)
Expand Down Expand Up @@ -71,11 +68,6 @@ func (vfs *VFS) FsServer() string {
return vfs.fsServer
}

// MessageQueue receive message queue size
func (vfs *VFS) MessageQueue() int {
return vfs.msgQueue
}

// LocalSyncDisabled is local disk sync disabled
func (vfs *VFS) LocalSyncDisabled() bool {
return vfs.localSyncDisabled
Expand Down Expand Up @@ -106,15 +98,15 @@ func NewVFS(path string) VFS {
if strings.HasPrefix(lowerPath, RemoteServerSchema) {
// example of rs protocol to see README.md
vfs.fsType = RemoteDisk
_, vfs.host, vfs.port, vfs.path, vfs.server, vfs.fsServer, vfs.msgQueue, vfs.localSyncDisabled, err = parse(path)
_, vfs.host, vfs.port, vfs.path, vfs.server, vfs.fsServer, vfs.localSyncDisabled, err = parse(path)
}
if err != nil {
return NewEmptyVFS()
}
return vfs
}

func parse(path string) (scheme string, host string, port int, localPath string, isServer bool, fsServer string, msgQueue int, localSyncDisabled bool, err error) {
func parse(path string) (scheme string, host string, port int, localPath string, isServer bool, fsServer string, localSyncDisabled bool, err error) {
parseUrl, err := url.Parse(path)
if err != nil {
return
Expand All @@ -138,18 +130,6 @@ func parse(path string) (scheme string, host string, port int, localPath string,
}
}

defaultMsgQueue := valueDefaultMsgQueue
msgQueueStr := parseUrl.Query().Get(paramMsgQueue)
if len(msgQueueStr) > 0 {
msgQueue, err = strconv.Atoi(msgQueueStr)
if err != nil || msgQueue <= 0 {
// if error happened, reset to the default value
msgQueue = defaultMsgQueue
}
} else {
msgQueue = defaultMsgQueue
}

localSyncDisabledValue := parseUrl.Query().Get(paramLocalSyncDisabled)
if strings.ToLower(localSyncDisabledValue) == valueLocalSyncIsDisabled {
localSyncDisabled = true
Expand Down
3 changes: 3 additions & 0 deletions monitor/base_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func (m *baseMonitor) processWrite() {
if wm != nil {
if (wm.count <= 2 && now-wm.last <= time.Second.Nanoseconds()) || (wm.count > 2 && now-wm.last <= 3*time.Second.Nanoseconds()) {
m.mu.Unlock()
go func() {
m.writeNotify <- true
}()
continue
}
go func() {
Expand Down
98 changes: 49 additions & 49 deletions monitor/fsnotify_monitor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package monitor

import (
"container/list"
"errors"
"github.com/fsnotify/fsnotify"
"github.com/no-src/gofs/core"
Expand All @@ -10,13 +11,14 @@ import (
"github.com/no-src/log"
"io/fs"
"path/filepath"
"time"
)

type fsNotifyMonitor struct {
baseMonitor
watcher *fsnotify.Watcher
syncOnce bool
events chan fsnotify.Event
events *list.List
}

// NewFsNotifyMonitor create an instance of fsNotifyMonitor to monitor the disk change
Expand All @@ -33,7 +35,7 @@ func NewFsNotifyMonitor(syncer sync.Sync, retry retry.Retry, syncOnce bool) (m M
watcher: watcher,
syncOnce: syncOnce,
baseMonitor: newBaseMonitor(syncer, retry),
events: make(chan fsnotify.Event, 1000),
events: list.New(),
}
return m, nil
}
Expand Down Expand Up @@ -96,7 +98,7 @@ func (m *fsNotifyMonitor) listenEvents() error {
return errors.New("get fsnotify watch event failed")
}
log.Debug("notify received [%s] -> [%s]", event.Op.String(), event.Name)
m.events <- event
m.events.PushBack(event)
}
case err, ok := <-m.watcher.Errors:
{
Expand All @@ -119,56 +121,54 @@ func (m *fsNotifyMonitor) processEvents() error {
// touch => Create -> Chmod
// chmod => Chmod
for {
select {
case event, ok := <-m.events:
{
if !ok {
err := errors.New("get watch event failed")
log.Error(err, "processEvents get event error")
return err
}
log.Debug("process event => [%s] -> [%s]", event.Op.String(), event.Name)
if event.Op&fsnotify.Write == fsnotify.Write {
m.addWrite(event.Name)
} else if event.Op&fsnotify.Create == fsnotify.Create {
err := m.syncer.Create(event.Name)
if err == nil {
// if create a new dir, then monitor it
isDir, err := m.syncer.IsDir(event.Name)
if err == nil && isDir {
if err = m.monitor(event.Name); err != nil {
log.Error(err, "Create event execute monitor error => [%s]", event.Name)
}
}
if err == nil && (!isDir || (isDir && !util.IsWindows())) {
// rename a file, will not trigger the Write event
// rename a dir, will not trigger the Write event on Linux, but it will trigger the Write event for parent dir on Windows
// send a Write event manually
go func() {
log.Debug("prepare to send a Write event after Create event [%s]", event.Name)
m.events <- fsnotify.Event{
Name: event.Name,
Op: fsnotify.Write,
}
}()
}
}
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
if err := m.syncer.Remove(event.Name); err != nil {
log.Error(err, "Remove event execute error => [%s]", event.Name)
}
} else if event.Op&fsnotify.Rename == fsnotify.Rename {
if err := m.syncer.Rename(event.Name); err != nil {
log.Error(err, "Rename event execute error => [%s]", event.Name)
}
} else if event.Op&fsnotify.Chmod == fsnotify.Chmod {
if err := m.syncer.Chmod(event.Name); err != nil {
log.Error(err, "Chmod event execute error => [%s]", event.Name)
element := m.events.Front()
if element == nil || element.Value == nil {
if element != nil {
m.events.Remove(element)
}
<-time.After(time.Second)
continue
}

event := element.Value.(fsnotify.Event)
if event.Op&fsnotify.Write == fsnotify.Write {
m.addWrite(event.Name)
} else if event.Op&fsnotify.Create == fsnotify.Create {
err := m.syncer.Create(event.Name)
if err == nil {
// if create a new dir, then monitor it
isDir, err := m.syncer.IsDir(event.Name)
if err == nil && isDir {
if err = m.monitor(event.Name); err != nil {
log.Error(err, "Create event execute monitor error => [%s]", event.Name)
}
}
break
if err == nil && (!isDir || (isDir && !util.IsWindows())) {
// rename a file, will not trigger the Write event
// rename a dir, will not trigger the Write event on Linux, but it will trigger the Write event for parent dir on Windows
// send a Write event manually
log.Debug("prepare to send a Write event after Create event [%s]", event.Name)
m.events.PushBack(fsnotify.Event{
Name: event.Name,
Op: fsnotify.Write,
})

}
}
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
if err := m.syncer.Remove(event.Name); err != nil {
log.Error(err, "Remove event execute error => [%s]", event.Name)
}
} else if event.Op&fsnotify.Rename == fsnotify.Rename {
if err := m.syncer.Rename(event.Name); err != nil {
log.Error(err, "Rename event execute error => [%s]", event.Name)
}
} else if event.Op&fsnotify.Chmod == fsnotify.Chmod {
if err := m.syncer.Chmod(event.Name); err != nil {
log.Error(err, "Chmod event execute error => [%s]", event.Name)
}
}
m.events.Remove(element)
}
}

Expand Down
2 changes: 1 addition & 1 deletion monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewMonitor(syncer sync.Sync, retry retry.Retry, syncOnce bool, enableTLS bo
} else if src.Is(core.RemoteDisk) && src.Server() {
return NewFsNotifyMonitor(syncer, retry, syncOnce)
} else if src.Is(core.RemoteDisk) && !src.Server() {
return NewRemoteClientMonitor(syncer, retry, syncOnce, src.Host(), src.Port(), src.MessageQueue(), enableTLS, certFile, keyFile, users)
return NewRemoteClientMonitor(syncer, retry, syncOnce, src.Host(), src.Port(), enableTLS, certFile, keyFile, users)
}
return nil, fmt.Errorf("file system unsupported ! src=>%s", src.Type().String())

Expand Down
23 changes: 17 additions & 6 deletions monitor/remote_client_monitor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package monitor

import (
"container/list"
"errors"
"fmt"
"github.com/no-src/gofs/auth"
Expand All @@ -12,13 +13,14 @@ import (
"github.com/no-src/log"
"net/url"
"strings"
"time"
)

type remoteClientMonitor struct {
baseMonitor
client tran.Client
closed bool
messages chan message
messages *list.List
syncOnce bool
currentUser *auth.HashUser
authorized bool
Expand All @@ -29,14 +31,14 @@ type message struct {
}

// NewRemoteClientMonitor create an instance of remoteClientMonitor to monitor the remote file change
func NewRemoteClientMonitor(syncer sync.Sync, retry retry.Retry, syncOnce bool, host string, port int, messageQueue int, enableTLS bool, certFile string, keyFile string, users []*auth.User) (Monitor, error) {
func NewRemoteClientMonitor(syncer sync.Sync, retry retry.Retry, syncOnce bool, host string, port int, enableTLS bool, certFile string, keyFile string, users []*auth.User) (Monitor, error) {
if syncer == nil {
err := errors.New("syncer can't be nil")
return nil, err
}
m := &remoteClientMonitor{
client: tran.NewClient(host, port, enableTLS),
messages: make(chan message, messageQueue),
messages: list.New(),
syncOnce: syncOnce,
baseMonitor: newBaseMonitor(syncer, retry),
}
Expand Down Expand Up @@ -175,17 +177,25 @@ func (m *remoteClientMonitor) Start() error {
}, fmt.Sprintf("client reconnect to %s:%d", m.client.Host(), m.client.Port()))
}
} else {
m.messages <- message{
m.messages.PushBack(message{
data: data,
}
})
}
}
return nil
}

func (m *remoteClientMonitor) processingMessage() {
for {
message := <-m.messages
element := m.messages.Front()
if element == nil || element.Value == nil {
if element != nil {
m.messages.Remove(element)
}
<-time.After(time.Second)
continue
}
message := element.Value.(message)
log.Info("client read request => %s", string(message.data))
var msg sync.Message
err := util.Unmarshal(message.data, &msg)
Expand Down Expand Up @@ -225,6 +235,7 @@ func (m *remoteClientMonitor) processingMessage() {
break
}
}
m.messages.Remove(element)
}
}

Expand Down

0 comments on commit c0f02c2

Please sign in to comment.