Skip to content

Commit

Permalink
moved logging to backend
Browse files Browse the repository at this point in the history
  • Loading branch information
256dpi committed Aug 1, 2018
1 parent 800649c commit 53258d6
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 85 deletions.
11 changes: 11 additions & 0 deletions broker/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type MemoryBackend struct {
// A map of username and passwords that grant read and write access.
Credentials map[string]string

// The Logger callback handles incoming log events.
Logger func(LogEvent, *Client, packet.Generic, *packet.Message, error)

activeClients map[string]*Client
storedSessions map[string]*memorySession
temporarySessions map[*Client]*memorySession
Expand Down Expand Up @@ -455,6 +458,14 @@ func (m *MemoryBackend) Terminate(client *Client) error {
return nil
}

// Log will call the associated logger.
func (m *MemoryBackend) Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error) {
// call logger if available
if m.Logger != nil {
m.Logger(event, client, pkt, msg, err)
}
}

// Close will close all active clients and close the backend. The return value
// denotes if the timeout has been reached.
func (m *MemoryBackend) Close(timeout time.Duration) bool {
Expand Down
97 changes: 68 additions & 29 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,50 @@ import (
"gopkg.in/tomb.v2"
)

// LogEvent denotes the class of an event passed to the logger.
type LogEvent string

const (
// NewConnection is emitted when a client comes online.
NewConnection LogEvent = "new connection"

// PacketReceived is emitted when a packet has been received.
PacketReceived LogEvent = "packet received"

// MessagePublished is emitted after a message has been published.
MessagePublished LogEvent = "message published"

// MessageAcknowledged is emitted after a message has been acknowledged.
MessageAcknowledged LogEvent = "message acknowledged"

// MessageDequeued is emitted after a message has been dequeued.
MessageDequeued LogEvent = "message dequeued"

// MessageForwarded is emitted after a message has been forwarded.
MessageForwarded LogEvent = "message forwarded"

// PacketSent is emitted when a packet has been sent.
PacketSent LogEvent = "packet sent"

// ClientDisconnected is emitted when a client disconnects cleanly.
ClientDisconnected LogEvent = "client disconnected"

// TransportError is emitted when an underlying transport error occurs.
TransportError LogEvent = "transport error"

// SessionError is emitted when a call to the session fails.
SessionError LogEvent = "session error"

// BackendError is emitted when a call to the backend fails.
BackendError LogEvent = "backend error"

// ClientError is emitted when the client violates the protocol.
ClientError LogEvent = "client error"

// LostConnection is emitted when the connection has been terminated.
LostConnection LogEvent = "lost connection"
)

// A Session is used to get packet ids and persist incoming/outgoing packets.
type Session interface {
// NextID should return the next id for outgoing packets.
Expand Down Expand Up @@ -118,6 +162,10 @@ type Backend interface {
// that client as the broker will close the connection when the call
// returns.
Terminate(client *Client) error

// Log is called multiple times during the lifecycle of a client see LogEvent
// for a list of all events.
Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error)
}

// ErrExpectedConnect is returned when the first received packet is not a
Expand Down Expand Up @@ -170,7 +218,6 @@ type Client struct {

// read-only
backend Backend
logger Logger
conn transport.Conn

// atomically written and read
Expand All @@ -193,12 +240,11 @@ type Client struct {
}

// NewClient takes over a connection and returns a Client.
func NewClient(backend Backend, logger Logger, conn transport.Conn) *Client {
func NewClient(backend Backend, conn transport.Conn) *Client {
// create client
c := &Client{
state: clientConnecting,
backend: backend,
logger: logger,
conn: conn,
done: make(chan struct{}),
}
Expand Down Expand Up @@ -255,15 +301,15 @@ func (c *Client) Closed() <-chan struct{} {

// main processor
func (c *Client) processor() error {
c.log(NewConnection, c, nil, nil, nil)
c.backend.Log(NewConnection, c, nil, nil, nil)

// get first packet from connection
pkt, err := c.conn.Receive()
if err != nil {
return c.die(TransportError, err)
}

c.log(PacketReceived, c, pkt, nil, nil)
c.backend.Log(PacketReceived, c, pkt, nil, nil)

// get connect
connect, ok := pkt.(*packet.Connect)
Expand Down Expand Up @@ -293,7 +339,7 @@ func (c *Client) processor() error {
return c.die(TransportError, err)
}

c.log(PacketReceived, c, pkt, nil, nil)
c.backend.Log(PacketReceived, c, pkt, nil, nil)

// process packet
err = c.processPacket(pkt)
Expand Down Expand Up @@ -322,7 +368,7 @@ func (c *Client) dequeuer() error {
return tomb.ErrDying
}

c.log(MessageDequeued, c, nil, msg, nil)
c.backend.Log(MessageDequeued, c, nil, msg, nil)

// queue message
select {
Expand Down Expand Up @@ -618,7 +664,7 @@ func (c *Client) processPublish(publish *packet.Publish) error {
return c.die(BackendError, err)
}

c.log(MessagePublished, c, nil, &publish.Message, nil)
c.backend.Log(MessagePublished, c, nil, &publish.Message, nil)
}

// handle qos 1 flow
Expand All @@ -637,7 +683,7 @@ func (c *Client) processPublish(publish *packet.Publish) error {

// publish message to others and queue puback if ack is called
err := c.backend.Publish(c, &publish.Message, func() {
c.log(MessageAcknowledged, c, nil, &publish.Message, nil)
c.backend.Log(MessageAcknowledged, c, nil, &publish.Message, nil)

select {
case c.outgoing <- outgoing{pkt: puback}:
Expand All @@ -648,7 +694,7 @@ func (c *Client) processPublish(publish *packet.Publish) error {
return c.die(BackendError, err)
}

c.log(MessagePublished, c, nil, &publish.Message, nil)
c.backend.Log(MessagePublished, c, nil, &publish.Message, nil)
}

// handle qos 2 flow
Expand Down Expand Up @@ -739,7 +785,7 @@ func (c *Client) processPubrel(id packet.ID) error {

// publish message to others and queue pubcomp if ack is called
err = c.backend.Publish(c, &publish.Message, func() {
c.log(MessageAcknowledged, c, nil, &publish.Message, nil)
c.backend.Log(MessageAcknowledged, c, nil, &publish.Message, nil)

select {
case c.outgoing <- outgoing{pkt: pubcomp}:
Expand All @@ -750,7 +796,7 @@ func (c *Client) processPubrel(id packet.ID) error {
return c.die(BackendError, err)
}

c.log(MessagePublished, c, nil, &publish.Message, nil)
c.backend.Log(MessagePublished, c, nil, &publish.Message, nil)

return nil
}
Expand All @@ -766,7 +812,7 @@ func (c *Client) processDisconnect() error {
// close underlying connection (triggers cleanup)
c.conn.Close()

c.log(ClientDisconnected, c, nil, nil, nil)
c.backend.Log(ClientDisconnected, c, nil, nil, nil)

return ErrClientDisconnected
}
Expand Down Expand Up @@ -797,7 +843,7 @@ func (c *Client) sendMessage(msg *packet.Message, ack Ack) error {
if ack != nil {
ack()

c.log(MessageAcknowledged, c, nil, msg, nil)
c.backend.Log(MessageAcknowledged, c, nil, msg, nil)
}

// send packet
Expand All @@ -811,7 +857,7 @@ func (c *Client) sendMessage(msg *packet.Message, ack Ack) error {
c.dequeueTokens <- struct{}{}
}

c.log(MessageForwarded, c, nil, msg, nil)
c.backend.Log(MessageForwarded, c, nil, msg, nil)

return nil
}
Expand Down Expand Up @@ -860,24 +906,17 @@ func (c *Client) send(pkt packet.Generic, buffered bool) error {
return err
}

c.log(PacketSent, c, pkt, nil, nil)
c.backend.Log(PacketSent, c, pkt, nil, nil)

return nil
}

/* error handling and logging */

// log a message
func (c *Client) log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error) {
if c.logger != nil {
c.logger(event, client, pkt, msg, err)
}
}

// used for closing and cleaning up from internal goroutines
func (c *Client) die(event LogEvent, err error) error {
// report error
c.log(event, c, nil, nil, err)
// log error
c.backend.Log(event, c, nil, nil, err)

// close connection if requested
c.conn.Close()
Expand All @@ -892,19 +931,19 @@ func (c *Client) cleanup() {
// publish message to others
err := c.backend.Publish(c, c.will, nil)
if err != nil {
c.log(BackendError, c, nil, nil, err)
c.backend.Log(BackendError, c, nil, nil, err)
}

c.log(MessagePublished, c, nil, c.will, nil)
c.backend.Log(MessagePublished, c, nil, c.will, nil)
}

// remove client from the queue
if atomic.LoadUint32(&c.state) >= clientConnected {
err := c.backend.Terminate(c)
if err != nil {
c.log(BackendError, c, nil, nil, err)
c.backend.Log(BackendError, c, nil, nil, err)
}
}

c.log(LostConnection, c, nil, nil, nil)
c.backend.Log(LostConnection, c, nil, nil, nil)
}
53 changes: 1 addition & 52 deletions broker/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,67 +6,16 @@ import (
"sync"
"time"

"github.com/256dpi/gomqtt/packet"
"github.com/256dpi/gomqtt/transport"

"gopkg.in/tomb.v2"
)

// LogEvent denotes the class of an event passed to the logger.
type LogEvent string

const (
// NewConnection is emitted when a client comes online.
NewConnection LogEvent = "new connection"

// PacketReceived is emitted when a packet has been received.
PacketReceived LogEvent = "packet received"

// MessagePublished is emitted after a message has been published.
MessagePublished LogEvent = "message published"

// MessageAcknowledged is emitted after a message has been acknowledged.
MessageAcknowledged LogEvent = "message acknowledged"

// MessageDequeued is emitted after a message has been dequeued.
MessageDequeued LogEvent = "message dequeued"

// MessageForwarded is emitted after a message has been forwarded.
MessageForwarded LogEvent = "message forwarded"

// PacketSent is emitted when a packet has been sent.
PacketSent LogEvent = "packet sent"

// ClientDisconnected is emitted when a client disconnects cleanly.
ClientDisconnected LogEvent = "client disconnected"

// TransportError is emitted when an underlying transport error occurs.
TransportError LogEvent = "transport error"

// SessionError is emitted when a call to the session fails.
SessionError LogEvent = "session error"

// BackendError is emitted when a call to the backend fails.
BackendError LogEvent = "backend error"

// ClientError is emitted when the client violates the protocol.
ClientError LogEvent = "client error"

// LostConnection is emitted when the connection has been terminated.
LostConnection LogEvent = "lost connection"
)

// The Logger callback handles incoming log events.
type Logger func(LogEvent, *Client, packet.Generic, *packet.Message, error)

// The Engine handles incoming connections and connects them to the backend.
type Engine struct {
// The Backend that will passed to accepted clients.
Backend Backend

// The logger that will be passed to accepted clients.
Logger Logger

// ConnectTimeout defines the timeout to receive the first packet.
ConnectTimeout time.Duration

Expand Down Expand Up @@ -150,7 +99,7 @@ func (e *Engine) Handle(conn transport.Conn) bool {
conn.SetReadTimeout(e.ConnectTimeout)

// handle client
NewClient(e.Backend, e.Logger, conn)
NewClient(e.Backend, conn)

return true
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/gomqtt-membroker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,11 @@ func main() {
backend := broker.NewMemoryBackend()
backend.SessionQueueSize = *sqz

engine := broker.NewEngine(backend)
engine.Accept(server)

var published int32
var forwarded int32
var clients int32

engine.Logger = func(event broker.LogEvent, client *broker.Client, pkt packet.Generic, msg *packet.Message, err error) {
backend.Logger = func(event broker.LogEvent, client *broker.Client, pkt packet.Generic, msg *packet.Message, err error) {
if event == broker.NewConnection {
atomic.AddInt32(&clients, 1)
} else if event == broker.MessagePublished {
Expand All @@ -57,6 +54,9 @@ func main() {
}
}

engine := broker.NewEngine(backend)
engine.Accept(server)

go func() {
for {
<-time.After(1 * time.Second)
Expand Down

0 comments on commit 53258d6

Please sign in to comment.