From c4dbfc62ccc04e596a14bb2efe7829603f9b0102 Mon Sep 17 00:00:00 2001 From: thekingofworld <904852632@qq.com> Date: Thu, 6 Aug 2020 18:16:15 +0800 Subject: [PATCH 1/6] command: register support topic/channel state --- command.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/command.go b/command.go index 80e47436..c7ff4284 100644 --- a/command.go +++ b/command.go @@ -103,11 +103,13 @@ func Auth(secret string) (*Command, error) { return &Command{[]byte("AUTH"), nil, []byte(secret)}, nil } -// Register creates a new Command to add a topic/channel for the connected nsqd -func Register(topic string, channel string) *Command { - params := [][]byte{[]byte(topic)} - if len(channel) > 0 { - params = append(params, []byte(channel)) +// Register creates a new Command to add a topic/channel with state for the connected nsqd +func Register(topic string, topicPaused int, channel string, channelPaused int) *Command { + params := [][]byte{ + []byte(topic), + []byte(channel), + []byte(strconv.Itoa(topicPaused)), + []byte(strconv.Itoa(channelPaused)), } return &Command{[]byte("REGISTER"), params, nil} } From 4aae6e898cf14f07b88f90d74df7b7eb910efcb1 Mon Sep 17 00:00:00 2001 From: thekingofworld <904852632@qq.com> Date: Thu, 6 Aug 2020 23:34:42 +0800 Subject: [PATCH 2/6] backwards compatible --- command.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/command.go b/command.go index c7ff4284..ba5d3b4b 100644 --- a/command.go +++ b/command.go @@ -104,12 +104,13 @@ func Auth(secret string) (*Command, error) { } // Register creates a new Command to add a topic/channel with state for the connected nsqd -func Register(topic string, topicPaused int, channel string, channelPaused int) *Command { - params := [][]byte{ - []byte(topic), - []byte(channel), - []byte(strconv.Itoa(topicPaused)), - []byte(strconv.Itoa(channelPaused)), +func Register(topic string, channel string, paused ...int) *Command { + params := [][]byte{[]byte(topic), []byte(channel)} + if len(paused) > 0 { + params = append(params, []byte(strconv.Itoa(paused[0]))) //topic isPaused + if len(paused) > 1 { + params = append(params, []byte(strconv.Itoa(paused[1]))) //channel isPaused + } } return &Command{[]byte("REGISTER"), params, nil} } From 1a51bfdcc43584345c6f2f20548c92783eaca0bf Mon Sep 17 00:00:00 2001 From: thekingofworld <904852632@qq.com> Date: Sun, 9 Aug 2020 16:14:58 +0800 Subject: [PATCH 3/6] command: add command[SyncState] that is sent whenever a topic/channel pause state changes. --- command.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/command.go b/command.go index ba5d3b4b..7355e18c 100644 --- a/command.go +++ b/command.go @@ -124,6 +124,14 @@ func UnRegister(topic string, channel string) *Command { return &Command{[]byte("UNREGISTER"), params, nil} } +func SyncState(topic string, channel string, state int) *Command { + params := [][]byte{[]byte(strconv.Itoa(state)), []byte(topic)} + if len(channel) > 0 { + params = append(params, []byte(channel)) + } + return &Command{[]byte("SYNCSTATE"), params, nil} +} + // Ping creates a new Command to keep-alive the state of all the // announced topic/channels for a given client func Ping() *Command { From 33656774e764d874ae6140d6073770244a8df1ac Mon Sep 17 00:00:00 2001 From: thekingofworld <904852632@qq.com> Date: Sun, 9 Aug 2020 16:17:36 +0800 Subject: [PATCH 4/6] add comment --- command.go | 1 + 1 file changed, 1 insertion(+) diff --git a/command.go b/command.go index 7355e18c..6cf95bf9 100644 --- a/command.go +++ b/command.go @@ -124,6 +124,7 @@ func UnRegister(topic string, channel string) *Command { return &Command{[]byte("UNREGISTER"), params, nil} } +// SyncState creates a new Command to sync a topic/channel pause state changes. func SyncState(topic string, channel string, state int) *Command { params := [][]byte{[]byte(strconv.Itoa(state)), []byte(topic)} if len(channel) > 0 { From 2ff7c0388dc90279d0b2cede4e32afa4adc8e9aa Mon Sep 17 00:00:00 2001 From: thekingofworld <904852632@qq.com> Date: Mon, 10 Aug 2020 21:34:34 +0800 Subject: [PATCH 5/6] command: Register & SyncState param state type change to json, make more extendability and readability --- command.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/command.go b/command.go index 6cf95bf9..09024dd0 100644 --- a/command.go +++ b/command.go @@ -104,12 +104,12 @@ func Auth(secret string) (*Command, error) { } // Register creates a new Command to add a topic/channel with state for the connected nsqd -func Register(topic string, channel string, paused ...int) *Command { +func Register(topic string, channel string, states ...[]byte) *Command { params := [][]byte{[]byte(topic), []byte(channel)} - if len(paused) > 0 { - params = append(params, []byte(strconv.Itoa(paused[0]))) //topic isPaused - if len(paused) > 1 { - params = append(params, []byte(strconv.Itoa(paused[1]))) //channel isPaused + if len(states) > 0 { + params = append(params, states[0]) //topic state + if len(states) > 1 { + params = append(params, states[1]) //channel state } } return &Command{[]byte("REGISTER"), params, nil} @@ -124,9 +124,9 @@ func UnRegister(topic string, channel string) *Command { return &Command{[]byte("UNREGISTER"), params, nil} } -// SyncState creates a new Command to sync a topic/channel pause state changes. -func SyncState(topic string, channel string, state int) *Command { - params := [][]byte{[]byte(strconv.Itoa(state)), []byte(topic)} +// SyncState creates a new Command to sync a topic/channel state changes, like paused, unpaused +func SyncState(topic string, channel string, state []byte) *Command { + params := [][]byte{state, []byte(topic)} if len(channel) > 0 { params = append(params, []byte(channel)) } From 015554cb0b901959c23bd142f28a3ba963e26fb6 Mon Sep 17 00:00:00 2001 From: thekingofworld <904852632@qq.com> Date: Sat, 15 Aug 2020 16:08:34 +0800 Subject: [PATCH 6/6] command: SyncState change param type to map & remove Register extra state param. --- command.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/command.go b/command.go index 09024dd0..563a1105 100644 --- a/command.go +++ b/command.go @@ -103,14 +103,11 @@ func Auth(secret string) (*Command, error) { return &Command{[]byte("AUTH"), nil, []byte(secret)}, nil } -// Register creates a new Command to add a topic/channel with state for the connected nsqd -func Register(topic string, channel string, states ...[]byte) *Command { - params := [][]byte{[]byte(topic), []byte(channel)} - if len(states) > 0 { - params = append(params, states[0]) //topic state - if len(states) > 1 { - params = append(params, states[1]) //channel state - } +// Register creates a new Command to add a topic/channel for the connected nsqd +func Register(topic string, channel string) *Command { + params := [][]byte{[]byte(topic)} + if len(channel) > 0 { + params = append(params, []byte(channel)) } return &Command{[]byte("REGISTER"), params, nil} } @@ -125,12 +122,16 @@ func UnRegister(topic string, channel string) *Command { } // SyncState creates a new Command to sync a topic/channel state changes, like paused, unpaused -func SyncState(topic string, channel string, state []byte) *Command { - params := [][]byte{state, []byte(topic)} +func SyncState(topic string, channel string, js map[string]interface{}) (*Command, error) { + body, err := json.Marshal(js) + if err != nil { + return nil, err + } + params := [][]byte{[]byte(topic)} if len(channel) > 0 { params = append(params, []byte(channel)) } - return &Command{[]byte("SYNCSTATE"), params, nil} + return &Command{[]byte("SYNCSTATE"), params, body}, nil } // Ping creates a new Command to keep-alive the state of all the