Skip to content

Commit

Permalink
Added error handling in sendworkers (compression threads) to avoid th…
Browse files Browse the repository at this point in the history
…e "send on closed channel" panic

Changed how hashback is closed down when Interrupt or Kill signals are received
Fixed a design mistake where recovery cache was not saved on panics
Debug web server now only listens to 127.0.0.1 to avoid firewall pop-ups on Windows
  • Loading branch information
fredli74 committed Jun 10, 2016
1 parent 0942a62 commit 2ccebe7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
34 changes: 23 additions & 11 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Client struct {

sendqueue []*sendQueueEntry

handlerSignal chan error
handlerErrorSignal chan error

dispatchChannel chan *messageDispatch
storeChannel chan *messageDispatch
Expand All @@ -75,8 +75,8 @@ func NewClient(conn net.Conn, account string, accesskey Byte128) *Client {
dispatchChannel: make(chan *messageDispatch, 1024),
storeChannel: make(chan *messageDispatch, 1),
}
client.handlerErrorSignal = make(chan error, 1)
client.wg.Add(1)
client.handlerSignal = make(chan error, 1)
go client.ioHandler()

{ // Say hello
Expand Down Expand Up @@ -104,8 +104,10 @@ func (c *Client) Paint(what string) {
}
}

func (c *Client) Close() {
c.dispatchAndWait(MsgTypeGoodbye, nil)
func (c *Client) Close(polite bool) {
if (polite) {
c.dispatchAndWait(MsgTypeGoodbye, nil)
}

c.dispatchMutex.Lock()
if !c.closing {
Expand Down Expand Up @@ -135,6 +137,17 @@ func (c *Client) sendQueue(what Byte128) {
if c.sendworkers < int32(runtime.NumCPU()) {
atomic.AddInt32(&c.sendworkers, 1)
go func() {
defer func() { // a panic was raised inside the goroutine (most likely the channel was closed)
if r := recover(); !c.closing && r != nil {
err, _ := <-c.handlerErrorSignal
if err != nil {
panic(err)
} else {
panic(r)
}
}
}()

for done := false; !done; {
var workItem *sendQueueEntry

Expand Down Expand Up @@ -243,9 +256,9 @@ func (c *Client) singleExchange(outgoing *messageDispatch) *ProtocolMessage {
func (c *Client) ioHandler() {
defer func() {
if r := recover(); !c.closing && r != nil { // a panic was raised inside the goroutine
fmt.Println("ioHandler", r)
c.handlerSignal <- r.(error)
close(c.handlerSignal)
// fmt.Println("ioHandler error:", r)
c.handlerErrorSignal <- r.(error)
close(c.handlerErrorSignal)
}

c.dispatchMutex.Lock()
Expand Down Expand Up @@ -283,8 +296,8 @@ func (c *Client) ioHandler() {
// dispatchMessage returns a result channel if a returnChannel was specified, otherwise it just returns nil
func (c *Client) dispatchMessage(msgType uint32, msgData interface{}, returnChannel chan interface{}) {
defer func() {
if r := recover(); !c.closing && r != nil { // a panic was raised inside the goroutine
err, _ := <-c.handlerSignal
if r := recover(); !c.closing && r != nil { // a panic was raised (most likely the channel was closed)
err, _ := <-c.handlerErrorSignal
if err != nil {
panic(err)
} else {
Expand Down Expand Up @@ -321,8 +334,7 @@ func (c *Client) dispatchAndWait(msgType uint32, msgData interface{}) interface{
return t.Data
}
}
fmt.Println("här", R)
case err := <-c.handlerSignal:
case err := <-c.handlerErrorSignal:
if err != nil {
panic(err)
}
Expand Down
28 changes: 17 additions & 11 deletions hashback/hashback.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,14 @@ func (session *BackupSession) Connect() *core.Client {
session.Client = client
return session.Client
}
func (session *BackupSession) Close() {
session.Client.Close()
func (session *BackupSession) Close(polite bool) {
if session.reference != nil {
session.reference.Close()
}
if session.Client != nil {
session.Client.Close(polite)
}

Debug("Disconnected from %s", session.ServerString)
}

Expand Down Expand Up @@ -341,7 +347,7 @@ func main() {

runtime.SetBlockProfileRate(1000)
go func() {
log.Println(http.ListenAndServe(":6060", nil))
log.Println(http.ListenAndServe("127.0.0.1:6060", nil))
}()

defer func() {
Expand Down Expand Up @@ -428,7 +434,7 @@ func main() {

cmd.Command("info", "", func() {
session.Connect()
defer session.Close()
defer session.Close(true)

info := session.Client.GetAccountInfo()
var hashbackEnabled bool = false
Expand Down Expand Up @@ -466,7 +472,7 @@ func main() {
}

session.Connect()
defer session.Close()
defer session.Close(true)

list := session.Client.ListDataset(cmd.Args[2])
if len(list.States) > 0 {
Expand Down Expand Up @@ -583,7 +589,7 @@ func main() {

func() {
session.Connect()
defer session.Close()
defer session.Close(true)
if latestBackup > 0 || intervalBackup == 0 {
session.State = &core.DatasetState{StateID: session.Client.SessionNonce}
session.Store(cmd.Args[2], cmd.Args[3:]...)
Expand Down Expand Up @@ -625,7 +631,7 @@ func main() {
}

session.Connect()
defer session.Close()
defer session.Close(true)

list := session.Client.ListDataset(cmd.Args[2])

Expand Down Expand Up @@ -669,7 +675,7 @@ func main() {
}

session.Connect()
defer session.Close()
defer session.Close(true)

list := session.Client.ListDataset(cmd.Args[2])

Expand Down Expand Up @@ -713,7 +719,7 @@ func main() {
}

session.Connect()
defer session.Close()
defer session.Close(true)

list := session.Client.ListDataset(cmd.Args[2])

Expand All @@ -739,8 +745,8 @@ func main() {
signal.Notify(signalchan, os.Kill)
go func() {
for range signalchan {
if session.reference != nil {
session.reference.Close()
if session != nil {
session.Close(false)
}
if lockFile != nil {
lockFile.Unlock()
Expand Down

0 comments on commit 2ccebe7

Please sign in to comment.