Skip to content

Commit

Permalink
Merge pull request #49 from metaworking/master
Browse files Browse the repository at this point in the history
v0.7 release
  • Loading branch information
indiest authored Sep 21, 2023
2 parents 0b6c24a + e05ea1a commit 5188b23
Show file tree
Hide file tree
Showing 16 changed files with 1,912 additions and 1,419 deletions.
307 changes: 167 additions & 140 deletions examples/channeld-ue-tps/tpspb/tps.pb.go

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions examples/channeld-ue-tps/tpspb/tps.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ message TestRepChannelData {
map<uint32, unrealpb.PlayerControllerState> playerControllerStates = 7;
map<uint32, unrealpb.ActorComponentState> actorComponentStates = 8;
map<uint32, unrealpb.SceneComponentState> sceneComponentStates = 9;
TestRepGameState testGameState = 10;
map<uint32, TestRepPlayerControllerState> testRepPlayerControllerStates = 11;
map<uint32, TestNPCState> testNPCStates = 12;
map<uint32, unrealpb.StaticMeshComponentState> staticMeshComponentStates = 10;
TestRepGameState testGameState = 20;
map<uint32, TestRepPlayerControllerState> testRepPlayerControllerStates = 21;
map<uint32, TestNPCState> testNPCStates = 22;
}

/* No need to create a new ChannelData that contains the subset of fields of TestRepChannelData.
Expand Down
151 changes: 89 additions & 62 deletions pkg/channeld/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type ConnectionInChannel interface {
Close()
IsClosing() bool
Send(ctx MessageContext)
// Returns the subscription instance if successfully subscribed, and true if the subscription already exists.
// Returns the subscription instance if successfully subscribed, and true if subscription message should be sent.
SubscribeToChannel(ch *Channel, options *channeldpb.ChannelSubscriptionOptions) (*ChannelSubscription, bool)
UnsubscribeFromChannel(ch *Channel) (*channeldpb.ChannelSubscriptionOptions, error)
sendSubscribed(ctx MessageContext, ch *Channel, connToSub ConnectionInChannel, stubId uint32, subOptions *channeldpb.ChannelSubscriptionOptions)
Expand All @@ -61,13 +61,15 @@ type ConnectionInChannel interface {
}

type Channel struct {
id common.ChannelId
channelType channeldpb.ChannelType
state ChannelState
id common.ChannelId
channelType channeldpb.ChannelType
state ChannelState
// DO NOT use this field directly, use GetOwner() and SetOwner() instead.
ownerConnection ConnectionInChannel
ownerLock sync.RWMutex
subscribedConnections map[ConnectionInChannel]*ChannelSubscription
// Lock for reading all the subscribed connection outside the channel
connectionsLock sync.RWMutex
// Lock for sub/unsub outside the channel. Read lock: tickConnections, tickData(fan-out), Broadcast, GetAllConnections.
subLock sync.RWMutex
// Read-only property, e.g. name
metadata string
data *ChannelData
Expand Down Expand Up @@ -152,8 +154,9 @@ func createChannelWithId(channelId common.ChannelId, t channeldpb.ChannelType, o
id: channelId,
channelType: t,
ownerConnection: owner,
ownerLock: sync.RWMutex{},
subscribedConnections: make(map[ConnectionInChannel]*ChannelSubscription),
connectionsLock: sync.RWMutex{},
subLock: sync.RWMutex{},
/* Channel data is not created by default. See handleCreateChannel().
data: ReflectChannelData(t, nil),
*/
Expand Down Expand Up @@ -296,6 +299,7 @@ func (ch *Channel) PutMessageContext(ctx MessageContext, handler MessageHandlerF
if ch.IsRemoving() {
return
}

ch.inMsgQueue <- channelMessage{ctx: ctx, handler: handler}
}

Expand All @@ -316,7 +320,7 @@ func (ch *Channel) PutMessageInternal(msgType channeldpb.MessageType, msg common
ch.inMsgQueue <- channelMessage{ctx: MessageContext{
MsgType: msgType,
Msg: msg,
Connection: ch.ownerConnection,
Connection: ch.GetOwner(),
Channel: ch,
Broadcast: 0,
StubId: 0,
Expand Down Expand Up @@ -354,9 +358,10 @@ func (ch *Channel) Tick() {

ch.tickMessages(tickStart)

ch.subLock.RLock()
ch.tickData(ch.GetTime())

ch.tickConnections()
ch.subLock.RUnlock()

tickDuration := time.Since(tickStart)
channelTickDuration.WithLabelValues(ch.channelType.String()).Set(float64(tickDuration) / float64(time.Millisecond))
Expand Down Expand Up @@ -391,52 +396,44 @@ func (ch *Channel) tickMessages(tickStart time.Time) {
}

func (ch *Channel) tickConnections() {
defer func() {
ch.connectionsLock.RUnlock()
}()
ch.connectionsLock.RLock()
// defer func() {
// ch.subLock.RUnlock()
// }()
// ch.subLock.RLock()

for conn := range ch.subscribedConnections {
if conn.IsClosing() {
// Unsub the connection from the channel
delete(ch.subscribedConnections, conn)
conn.Logger().Info("removed subscription of a disconnected endpoint", zap.Uint32("channelId", uint32(ch.id)))
if ownerConn, ok := ch.ownerConnection.(*Connection); ok && conn != nil {
if ownerConn == conn {
// Reset the owner if it's removed
ch.ownerConnection = nil
if ch.channelType == channeldpb.ChannelType_GLOBAL {
Event_GlobalChannelUnpossessed.Broadcast(struct{}{})
}
conn.Logger().Info("found removed ownner connection of channel", zap.Uint32("channelId", uint32(ch.id)))
if GlobalSettings.GetChannelSettings(ch.channelType).RemoveChannelAfterOwnerRemoved {
atomic.AddInt32(&ch.removing, 1)
/* Let the GLOBAL channel handles the channel remove
// Send RemoveChannelMessage to all subscribed connections
ch.Broadcast(MessageContext{
MsgType: channeldpb.MessageType_REMOVE_CHANNEL,
Msg: &channeldpb.RemoveChannelMessage{
ChannelId: uint32(ch.id),
},
Broadcast: uint32(channeldpb.BroadcastType_ALL_BUT_OWNER),
StubId: 0,
ChannelId: uint32(ch.id),
})
RemoveChannel(ch)
*/
if ch.GetOwner() == conn {
// Reset the owner if it's removed
ch.SetOwner(nil)
if ch.channelType == channeldpb.ChannelType_GLOBAL {
Event_GlobalChannelUnpossessed.Broadcast(struct{}{})
}
conn.Logger().Info("found removed ownner connection of channel", zap.Uint32("channelId", uint32(ch.id)))
if GlobalSettings.GetChannelSettings(ch.channelType).RemoveChannelAfterOwnerRemoved {
atomic.AddInt32(&ch.removing, 1)

// DO NOT remove the GLOBAL channel!
if ch != globalChannel {
// Only the GLOBAL channel can handle the channel removal
globalChannel.PutMessage(&channeldpb.RemoveChannelMessage{
ChannelId: uint32(ch.id),
}, handleRemoveChannel, nil, &channeldpb.MessagePack{
Broadcast: 0,
StubId: 0,
ChannelId: uint32(GlobalChannelId),
})

ch.Logger().Info("removing channel after the owner is removed")
return
}
} else if conn != nil {
ch.ownerConnection.sendUnsubscribed(MessageContext{}, ch, conn.(*Connection), 0)

ch.Logger().Info("removing channel after the owner is removed")
return
}
} else if conn != nil {
if ownerConn := ch.GetOwner(); ownerConn != nil {
ownerConn.sendUnsubscribed(MessageContext{}, ch, conn.(*Connection), 0)
}
}
}
Expand All @@ -445,9 +442,9 @@ func (ch *Channel) tickConnections() {

func (ch *Channel) Broadcast(ctx MessageContext) {
defer func() {
ch.connectionsLock.RUnlock()
ch.subLock.RUnlock()
}()
ch.connectionsLock.RLock()
ch.subLock.RLock()

for conn := range ch.subscribedConnections {
//c := GetConnection(connId)
Expand All @@ -457,7 +454,7 @@ func (ch *Channel) Broadcast(ctx MessageContext) {
if channeldpb.BroadcastType_ALL_BUT_SENDER.Check(ctx.Broadcast) && conn == ctx.Connection {
continue
}
if channeldpb.BroadcastType_ALL_BUT_OWNER.Check(ctx.Broadcast) && conn == ch.ownerConnection {
if channeldpb.BroadcastType_ALL_BUT_OWNER.Check(ctx.Broadcast) && conn == ch.GetOwner() {
continue
}
if channeldpb.BroadcastType_ALL_BUT_CLIENT.Check(ctx.Broadcast) && conn.GetConnectionType() == channeldpb.ConnectionType_CLIENT {
Expand All @@ -473,9 +470,9 @@ func (ch *Channel) Broadcast(ctx MessageContext) {
// Goroutine-safe read of the subscribed connections
func (ch *Channel) GetAllConnections() map[ConnectionInChannel]struct{} {
defer func() {
ch.connectionsLock.RUnlock()
ch.subLock.RUnlock()
}()
ch.connectionsLock.RLock()
ch.subLock.RLock()

conns := make(map[ConnectionInChannel]struct{})
for conn := range ch.subscribedConnections {
Expand All @@ -487,10 +484,10 @@ func (ch *Channel) GetAllConnections() map[ConnectionInChannel]struct{} {
// Return true if the connection can 1)remove; 2)sub/unsub another connection to/from; the channel.
func (c *Connection) HasAuthorityOver(ch *Channel) bool {
// The global owner has authority over everything.
if globalChannel.ownerConnection == c {
if globalChannel.GetOwner() == c {
return true
}
if ch.ownerConnection == c {
if ch.GetOwner() == c {
return true
}
return false
Expand All @@ -505,29 +502,59 @@ func (ch *Channel) Logger() *Logger {
}

func (ch *Channel) HasOwner() bool {
conn, ok := ch.ownerConnection.(*Connection)
return ok && conn != nil && !conn.IsClosing()
conn := ch.GetOwner()
return conn != nil && !conn.IsClosing()
}

func (chA *Channel) IsSameOwner(chB *Channel) bool {
return chA.HasOwner() && chB.HasOwner() && chA.ownerConnection == chB.ownerConnection
connA := chA.GetOwner()
return connA != nil && !connA.IsClosing() && connA == chB.GetOwner()
}

func (ch *Channel) SendToOwner(msgType uint32, msg common.Message) {
if !ch.HasOwner() {
ch.Logger().Warn("channel has no owner to send message", zap.Uint32("msgType", msgType))
return
func (ch *Channel) SendMessageToOwner(msgType uint32, msg common.Message) bool {
conn := ch.GetOwner()
if conn != nil && !conn.IsClosing() {
conn.Send(MessageContext{
MsgType: channeldpb.MessageType(msgType),
Msg: msg,
ChannelId: uint32(ch.id),
Broadcast: 0,
StubId: 0,
})
return true
}
ch.ownerConnection.Send(MessageContext{
MsgType: channeldpb.MessageType(msgType),
Msg: msg,
ChannelId: uint32(ch.id),
Broadcast: 0,
StubId: 0,
})

return false
}

func (ch *Channel) SendToOwner(ctx MessageContext) bool {
conn := ch.GetOwner()
if conn != nil && !conn.IsClosing() {
conn.Send(ctx)
return true
}

return false
}

// Implementation for ConnectionInChannel interface
func (c *Connection) IsNil() bool {
return c == nil
}

func (c *Channel) GetOwner() ConnectionInChannel {
c.ownerLock.RLock()
defer c.ownerLock.RUnlock()

return c.ownerConnection
}

func (c *Channel) SetOwner(conn ConnectionInChannel) {
c.ownerLock.Lock()
defer c.ownerLock.Unlock()

// Race condition:
// 1. set to nil when the owner unsubscribes from the entity channel.
// 2. set to dst server conn when the entity of the channel is handed over to the dst server.
c.ownerConnection = conn
}
4 changes: 2 additions & 2 deletions pkg/channeld/channel_acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ func (ch *Channel) CheckACL(c ConnectionInChannel, accessType ChannelAccessType)
return false, ErrNoneAccess

case ChannelAccessLevel_OwnerOnly:
if ch.ownerConnection == c {
if ch.GetOwner() == c {
return true, nil
} else {
return false, ErrOwnerOnlyAccess
}
case ChannelAccessLevel_OwnerAndGlobalOwner:
if ch.ownerConnection == c || globalChannel.ownerConnection == c {
if ch.GetOwner() == c || globalChannel.GetOwner() == c {
return true, nil
} else {
return false, ErrOwnerAndGlobalOwnerAccess
Expand Down
46 changes: 37 additions & 9 deletions pkg/channeld/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,25 @@ func (s *queuedMessagePackSender) Send(c *Connection, ctx MessageContext) {
return
}

c.sendQueue <- &channeldpb.MessagePack{
mp := &channeldpb.MessagePack{
ChannelId: ctx.ChannelId,
Broadcast: ctx.Broadcast,
StubId: ctx.StubId,
MsgType: uint32(ctx.MsgType),
MsgBody: msgBody,
}

// Check the message pack size before adding to the queue
size := proto.Size(mp)
if size >= MaxPacketSize-PacketHeaderSize {
c.logger.Warn("failed to send the message and its size exceeds the limit", zap.Int("size", size))
return
}

// Double check
if !c.IsClosing() {
c.sendQueue <- mp
}
}

type Connection struct {
Expand All @@ -82,6 +94,7 @@ type Connection struct {
// writer *bufio.Writer
sender MessageSender
sendQueue chan *channeldpb.MessagePack //MessageContext
oversizedMsgPack *channeldpb.MessagePack
pit string
fsm *fsm.FiniteStateMachine
fsmDisallowedCounter int
Expand Down Expand Up @@ -366,7 +379,7 @@ func (c *Connection) receive() {
if err != nil {
switch err := err.(type) {
case *net.OpError:
c.Logger().Warn("read bytes",
c.Logger().Info("net op error",
zap.String("op", err.Op),
zap.String("remoteAddr", c.conn.RemoteAddr().String()),
zap.Error(err),
Expand Down Expand Up @@ -429,6 +442,12 @@ func readSize(tag []byte) int {
}

func (c *Connection) readPacket(bufPos *int) (*channeldpb.Packet, error) {
if c.readPos-*bufPos < PacketHeaderSize {
// Unfinished header
fragmentedPacketCount.WithLabelValues(c.connectionType.String()).Inc()
return nil, nil
}

tag := c.readBuffer[*bufPos : *bufPos+PacketHeaderSize]

packetSize := readSize(tag)
Expand Down Expand Up @@ -517,10 +536,13 @@ func (c *Connection) isPacketRecordingEnabled() bool {
func (c *Connection) receiveMessage(mp *channeldpb.MessagePack) {
channel := GetChannel(common.ChannelId(mp.ChannelId))
if channel == nil {
c.Logger().Warn("can't find channel",
zap.Uint32("channelId", mp.ChannelId),
zap.Uint32("msgType", mp.MsgType),
)
// Sub to/unsub from a removed channel is allowed
if mp.MsgType != uint32(channeldpb.MessageType_SUB_TO_CHANNEL) && mp.MsgType != uint32(channeldpb.MessageType_UNSUB_FROM_CHANNEL) {
c.Logger().Warn("can't find channel",
zap.Uint32("channelId", mp.ChannelId),
zap.Uint32("msgType", mp.MsgType),
)
}
return
}

Expand Down Expand Up @@ -598,6 +620,13 @@ func (c *Connection) flush() {
p := channeldpb.Packet{Messages: make([]*channeldpb.MessagePack, 0, len(c.sendQueue))}
size := 0

// Add the oversided message pack first if any
if c.oversizedMsgPack != nil {
p.Messages = append(p.Messages, c.oversizedMsgPack)
c.oversizedMsgPack = nil
// No need to check the packet size now, as each message pack is already checked before adding to the queue.
}

// For now we don't limit the message numbers per packet
for len(c.sendQueue) > 0 {
mp := <-c.sendQueue
Expand All @@ -615,9 +644,8 @@ func (c *Connection) flush() {
// Revert adding the message that causes the oversize
p.Messages = p.Messages[:len(p.Messages)-1]

// Put the message back to the queue
// FIXME: order may matter
c.sendQueue <- mp
// Store the message pack that causes the overside
c.oversizedMsgPack = mp
break
}

Expand Down
Loading

0 comments on commit 5188b23

Please sign in to comment.