diff --git a/broker/backend.go b/broker/backend.go index 09a21fe..881d890 100644 --- a/broker/backend.go +++ b/broker/backend.go @@ -81,6 +81,9 @@ type MemoryBackend struct { // A map of username and passwords that grant read and write access. Credentials map[string]string + // The Logger callback handles incoming log events. + Logger func(LogEvent, *Client, packet.Generic, *packet.Message, error) + activeClients map[string]*Client storedSessions map[string]*memorySession temporarySessions map[*Client]*memorySession @@ -455,6 +458,14 @@ func (m *MemoryBackend) Terminate(client *Client) error { return nil } +// Log will call the associated logger. +func (m *MemoryBackend) Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error) { + // call logger if available + if m.Logger != nil { + m.Logger(event, client, pkt, msg, err) + } +} + // Close will close all active clients and close the backend. The return value // denotes if the timeout has been reached. func (m *MemoryBackend) Close(timeout time.Duration) bool { diff --git a/broker/client.go b/broker/client.go index 82fcc4b..5ab8e8b 100644 --- a/broker/client.go +++ b/broker/client.go @@ -12,6 +12,50 @@ import ( "gopkg.in/tomb.v2" ) +// LogEvent denotes the class of an event passed to the logger. +type LogEvent string + +const ( + // NewConnection is emitted when a client comes online. + NewConnection LogEvent = "new connection" + + // PacketReceived is emitted when a packet has been received. + PacketReceived LogEvent = "packet received" + + // MessagePublished is emitted after a message has been published. + MessagePublished LogEvent = "message published" + + // MessageAcknowledged is emitted after a message has been acknowledged. + MessageAcknowledged LogEvent = "message acknowledged" + + // MessageDequeued is emitted after a message has been dequeued. + MessageDequeued LogEvent = "message dequeued" + + // MessageForwarded is emitted after a message has been forwarded. + MessageForwarded LogEvent = "message forwarded" + + // PacketSent is emitted when a packet has been sent. + PacketSent LogEvent = "packet sent" + + // ClientDisconnected is emitted when a client disconnects cleanly. + ClientDisconnected LogEvent = "client disconnected" + + // TransportError is emitted when an underlying transport error occurs. + TransportError LogEvent = "transport error" + + // SessionError is emitted when a call to the session fails. + SessionError LogEvent = "session error" + + // BackendError is emitted when a call to the backend fails. + BackendError LogEvent = "backend error" + + // ClientError is emitted when the client violates the protocol. + ClientError LogEvent = "client error" + + // LostConnection is emitted when the connection has been terminated. + LostConnection LogEvent = "lost connection" +) + // A Session is used to get packet ids and persist incoming/outgoing packets. type Session interface { // NextID should return the next id for outgoing packets. @@ -118,6 +162,10 @@ type Backend interface { // that client as the broker will close the connection when the call // returns. Terminate(client *Client) error + + // Log is called multiple times during the lifecycle of a client see LogEvent + // for a list of all events. + Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error) } // ErrExpectedConnect is returned when the first received packet is not a @@ -170,7 +218,6 @@ type Client struct { // read-only backend Backend - logger Logger conn transport.Conn // atomically written and read @@ -193,12 +240,11 @@ type Client struct { } // NewClient takes over a connection and returns a Client. -func NewClient(backend Backend, logger Logger, conn transport.Conn) *Client { +func NewClient(backend Backend, conn transport.Conn) *Client { // create client c := &Client{ state: clientConnecting, backend: backend, - logger: logger, conn: conn, done: make(chan struct{}), } @@ -255,7 +301,7 @@ func (c *Client) Closed() <-chan struct{} { // main processor func (c *Client) processor() error { - c.log(NewConnection, c, nil, nil, nil) + c.backend.Log(NewConnection, c, nil, nil, nil) // get first packet from connection pkt, err := c.conn.Receive() @@ -263,7 +309,7 @@ func (c *Client) processor() error { return c.die(TransportError, err) } - c.log(PacketReceived, c, pkt, nil, nil) + c.backend.Log(PacketReceived, c, pkt, nil, nil) // get connect connect, ok := pkt.(*packet.Connect) @@ -293,7 +339,7 @@ func (c *Client) processor() error { return c.die(TransportError, err) } - c.log(PacketReceived, c, pkt, nil, nil) + c.backend.Log(PacketReceived, c, pkt, nil, nil) // process packet err = c.processPacket(pkt) @@ -322,7 +368,7 @@ func (c *Client) dequeuer() error { return tomb.ErrDying } - c.log(MessageDequeued, c, nil, msg, nil) + c.backend.Log(MessageDequeued, c, nil, msg, nil) // queue message select { @@ -618,7 +664,7 @@ func (c *Client) processPublish(publish *packet.Publish) error { return c.die(BackendError, err) } - c.log(MessagePublished, c, nil, &publish.Message, nil) + c.backend.Log(MessagePublished, c, nil, &publish.Message, nil) } // handle qos 1 flow @@ -637,7 +683,7 @@ func (c *Client) processPublish(publish *packet.Publish) error { // publish message to others and queue puback if ack is called err := c.backend.Publish(c, &publish.Message, func() { - c.log(MessageAcknowledged, c, nil, &publish.Message, nil) + c.backend.Log(MessageAcknowledged, c, nil, &publish.Message, nil) select { case c.outgoing <- outgoing{pkt: puback}: @@ -648,7 +694,7 @@ func (c *Client) processPublish(publish *packet.Publish) error { return c.die(BackendError, err) } - c.log(MessagePublished, c, nil, &publish.Message, nil) + c.backend.Log(MessagePublished, c, nil, &publish.Message, nil) } // handle qos 2 flow @@ -739,7 +785,7 @@ func (c *Client) processPubrel(id packet.ID) error { // publish message to others and queue pubcomp if ack is called err = c.backend.Publish(c, &publish.Message, func() { - c.log(MessageAcknowledged, c, nil, &publish.Message, nil) + c.backend.Log(MessageAcknowledged, c, nil, &publish.Message, nil) select { case c.outgoing <- outgoing{pkt: pubcomp}: @@ -750,7 +796,7 @@ func (c *Client) processPubrel(id packet.ID) error { return c.die(BackendError, err) } - c.log(MessagePublished, c, nil, &publish.Message, nil) + c.backend.Log(MessagePublished, c, nil, &publish.Message, nil) return nil } @@ -766,7 +812,7 @@ func (c *Client) processDisconnect() error { // close underlying connection (triggers cleanup) c.conn.Close() - c.log(ClientDisconnected, c, nil, nil, nil) + c.backend.Log(ClientDisconnected, c, nil, nil, nil) return ErrClientDisconnected } @@ -797,7 +843,7 @@ func (c *Client) sendMessage(msg *packet.Message, ack Ack) error { if ack != nil { ack() - c.log(MessageAcknowledged, c, nil, msg, nil) + c.backend.Log(MessageAcknowledged, c, nil, msg, nil) } // send packet @@ -811,7 +857,7 @@ func (c *Client) sendMessage(msg *packet.Message, ack Ack) error { c.dequeueTokens <- struct{}{} } - c.log(MessageForwarded, c, nil, msg, nil) + c.backend.Log(MessageForwarded, c, nil, msg, nil) return nil } @@ -860,24 +906,17 @@ func (c *Client) send(pkt packet.Generic, buffered bool) error { return err } - c.log(PacketSent, c, pkt, nil, nil) + c.backend.Log(PacketSent, c, pkt, nil, nil) return nil } /* error handling and logging */ -// log a message -func (c *Client) log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error) { - if c.logger != nil { - c.logger(event, client, pkt, msg, err) - } -} - // used for closing and cleaning up from internal goroutines func (c *Client) die(event LogEvent, err error) error { - // report error - c.log(event, c, nil, nil, err) + // log error + c.backend.Log(event, c, nil, nil, err) // close connection if requested c.conn.Close() @@ -892,19 +931,19 @@ func (c *Client) cleanup() { // publish message to others err := c.backend.Publish(c, c.will, nil) if err != nil { - c.log(BackendError, c, nil, nil, err) + c.backend.Log(BackendError, c, nil, nil, err) } - c.log(MessagePublished, c, nil, c.will, nil) + c.backend.Log(MessagePublished, c, nil, c.will, nil) } // remove client from the queue if atomic.LoadUint32(&c.state) >= clientConnected { err := c.backend.Terminate(c) if err != nil { - c.log(BackendError, c, nil, nil, err) + c.backend.Log(BackendError, c, nil, nil, err) } } - c.log(LostConnection, c, nil, nil, nil) + c.backend.Log(LostConnection, c, nil, nil, nil) } diff --git a/broker/engine.go b/broker/engine.go index 2e1d88a..99a184f 100644 --- a/broker/engine.go +++ b/broker/engine.go @@ -6,67 +6,16 @@ import ( "sync" "time" - "github.com/256dpi/gomqtt/packet" "github.com/256dpi/gomqtt/transport" "gopkg.in/tomb.v2" ) -// LogEvent denotes the class of an event passed to the logger. -type LogEvent string - -const ( - // NewConnection is emitted when a client comes online. - NewConnection LogEvent = "new connection" - - // PacketReceived is emitted when a packet has been received. - PacketReceived LogEvent = "packet received" - - // MessagePublished is emitted after a message has been published. - MessagePublished LogEvent = "message published" - - // MessageAcknowledged is emitted after a message has been acknowledged. - MessageAcknowledged LogEvent = "message acknowledged" - - // MessageDequeued is emitted after a message has been dequeued. - MessageDequeued LogEvent = "message dequeued" - - // MessageForwarded is emitted after a message has been forwarded. - MessageForwarded LogEvent = "message forwarded" - - // PacketSent is emitted when a packet has been sent. - PacketSent LogEvent = "packet sent" - - // ClientDisconnected is emitted when a client disconnects cleanly. - ClientDisconnected LogEvent = "client disconnected" - - // TransportError is emitted when an underlying transport error occurs. - TransportError LogEvent = "transport error" - - // SessionError is emitted when a call to the session fails. - SessionError LogEvent = "session error" - - // BackendError is emitted when a call to the backend fails. - BackendError LogEvent = "backend error" - - // ClientError is emitted when the client violates the protocol. - ClientError LogEvent = "client error" - - // LostConnection is emitted when the connection has been terminated. - LostConnection LogEvent = "lost connection" -) - -// The Logger callback handles incoming log events. -type Logger func(LogEvent, *Client, packet.Generic, *packet.Message, error) - // The Engine handles incoming connections and connects them to the backend. type Engine struct { // The Backend that will passed to accepted clients. Backend Backend - // The logger that will be passed to accepted clients. - Logger Logger - // ConnectTimeout defines the timeout to receive the first packet. ConnectTimeout time.Duration @@ -150,7 +99,7 @@ func (e *Engine) Handle(conn transport.Conn) bool { conn.SetReadTimeout(e.ConnectTimeout) // handle client - NewClient(e.Backend, e.Logger, conn) + NewClient(e.Backend, conn) return true } diff --git a/cmd/gomqtt-membroker/main.go b/cmd/gomqtt-membroker/main.go index 90f41f6..ea99cbb 100644 --- a/cmd/gomqtt-membroker/main.go +++ b/cmd/gomqtt-membroker/main.go @@ -38,14 +38,11 @@ func main() { backend := broker.NewMemoryBackend() backend.SessionQueueSize = *sqz - engine := broker.NewEngine(backend) - engine.Accept(server) - var published int32 var forwarded int32 var clients int32 - engine.Logger = func(event broker.LogEvent, client *broker.Client, pkt packet.Generic, msg *packet.Message, err error) { + backend.Logger = func(event broker.LogEvent, client *broker.Client, pkt packet.Generic, msg *packet.Message, err error) { if event == broker.NewConnection { atomic.AddInt32(&clients, 1) } else if event == broker.MessagePublished { @@ -57,6 +54,9 @@ func main() { } } + engine := broker.NewEngine(backend) + engine.Accept(server) + go func() { for { <-time.After(1 * time.Second)