Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New network connection #1547

Merged
merged 43 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
470cae5
TaskGroup goroutine manager added in pkg/execution package.
alexeykiselev Nov 14, 2024
561302a
Networking package with a new connection handler Session added.
alexeykiselev Nov 15, 2024
3d5e202
Logger interface removed from networking package. Standard slog packa…
alexeykiselev Nov 18, 2024
60d0178
WIP. Simple connection replaced with NetClient.
alexeykiselev Nov 25, 2024
eb14f66
Merge branch 'master' into new-network-connection
alexeykiselev Nov 25, 2024
948cc56
Fixed NetClient closing issue.
alexeykiselev Nov 25, 2024
67f4b85
Redundant log removed.
alexeykiselev Nov 25, 2024
d44fa7f
Move save int conversion to safecast lib.
alexeykiselev Nov 25, 2024
65414fa
Merge branch 'master' into new-network-connection
alexeykiselev Nov 27, 2024
7bab250
Merge branch 'master' into new-network-connection
nickeskov Nov 28, 2024
df01e60
Fix data race error in 'networking_test' package
nickeskov Nov 28, 2024
9705342
Merge branch 'master' into new-network-connection
nickeskov Dec 3, 2024
70a8c34
Merge branch 'master' into new-network-connection
alexeykiselev Dec 6, 2024
00a9ebe
Replace atomic.Uint32 with atomic.Bool and use CompareAndSwap there i…
alexeykiselev Dec 10, 2024
63a0305
Assertions added.
alexeykiselev Dec 10, 2024
5219227
Simplified closing and close logic in NetClient.
alexeykiselev Dec 11, 2024
ff41cf7
Prepare for new timer in Go 1.23
alexeykiselev Dec 11, 2024
e2f697f
Move constant into function were it used.
alexeykiselev Dec 12, 2024
edd942a
Merge branch 'master' into new-network-connection
alexeykiselev Dec 12, 2024
f832683
Better way to prevent from running multiple receiveLoops.
alexeykiselev Dec 13, 2024
c2ad101
Better data emptyness checks.
alexeykiselev Dec 13, 2024
3aa8a85
Better read error handling.
alexeykiselev Dec 13, 2024
c08bace
Use constructor.
alexeykiselev Dec 13, 2024
abced7f
Wrap heavy logging into log level checks.
alexeykiselev Dec 14, 2024
64bf7d9
Merge branch 'master' into new-network-connection
alexeykiselev Dec 14, 2024
294cf66
Merge branch 'master' into new-network-connection
alexeykiselev Dec 14, 2024
57b9ffb
Session configuration accepts slog handler to set up logging.
alexeykiselev Dec 16, 2024
14420bc
Close error channel on sending data successfully.
alexeykiselev Dec 16, 2024
412377f
Better error handling while reading.
alexeykiselev Dec 16, 2024
52a893e
Fine error assertions.
alexeykiselev Dec 16, 2024
77633f3
Fix blinking test.
alexeykiselev Dec 16, 2024
8747fc4
Merge branch 'master' into new-network-connection
alexeykiselev Dec 18, 2024
df77a57
Better configuration handling.
alexeykiselev Dec 19, 2024
dc78383
Merge branch 'master' into new-network-connection
alexeykiselev Dec 19, 2024
7b3fffb
Fixed blinking test TestCloseParentContext. Wait group added to wait …
alexeykiselev Dec 19, 2024
d2e2646
Better test workflow. Better wait group naming.
alexeykiselev Dec 19, 2024
3d8f38d
Merge branch 'master' into new-network-connection
alexeykiselev Dec 20, 2024
9599840
Fix deadlock in test by introducing wait group instead of sleep.
alexeykiselev Dec 20, 2024
b5d5173
Merge branch 'master' into new-network-connection
alexeykiselev Dec 24, 2024
16a32cb
Merge branch 'master' into new-network-connection
alexeykiselev Dec 26, 2024
63ade4e
Internal sendPacket reimplemented using io.Reader. Data restoration f…
alexeykiselev Dec 27, 2024
78579ca
Itest network client handler updated.
alexeykiselev Dec 27, 2024
de29ff8
Changed the way OnReceive passes the receiveBuffer. Test updated.
alexeykiselev Dec 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/networking/handler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package networking

import "io"

// Handler is an interface for handling new messages, handshakes and session close events.
type Handler interface {
// OnReceive fired on new message received.
OnReceive(*Session, []byte)
OnReceive(*Session, io.Reader)

// OnHandshake fired on new Handshake received.
OnHandshake(*Session, Handshake)
Expand Down
20 changes: 11 additions & 9 deletions pkg/networking/mocks/handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/networking/mocks/header.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/networking/mocks/protocol.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 17 additions & 40 deletions pkg/networking/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@ func (s *Session) waitForSend(data []byte) error {
if s.logger.Enabled(s.ctx, slog.LevelDebug) {
s.logger.Debug("Sending data", "data", base64.StdEncoding.EncodeToString(data))
}
ready := &sendPacket{data: data, err: errCh}
select {
case s.sendCh <- ready:
case s.sendCh <- newSendPacket(data, errCh):
s.logger.Debug("Data written into send channel")
case <-s.ctx.Done():
s.logger.Debug("Session shutdown while sending data")
Expand All @@ -174,24 +173,6 @@ func (s *Session) waitForSend(data []byte) error {
return ErrConnectionWriteTimeout
}

dataCopy := func() {
if len(data) == 0 {
return // An empty data is ignored.
}

// In the event of session shutdown or connection write timeout, we need to prevent `send` from reading
// the body buffer after returning from this function since the caller may re-use the underlying array.
ready.mu.Lock()
defer ready.mu.Unlock()

if ready.data == nil {
return // data was already copied in `send`.
}
newData := make([]byte, len(data))
copy(newData, data)
ready.data = newData
}

select {
case err, ok := <-errCh:
if !ok {
Expand All @@ -201,11 +182,9 @@ func (s *Session) waitForSend(data []byte) error {
s.logger.Debug("Error sending data", "error", err)
return err
case <-s.ctx.Done():
dataCopy()
s.logger.Debug("Session shutdown while waiting send error")
return ErrSessionShutdown
case <-timer.C:
dataCopy()
s.logger.Debug("Connection write timeout while waiting send error")
return ErrConnectionWriteTimeout
}
Expand All @@ -224,22 +203,16 @@ func (s *Session) sendLoop() error {

case packet := <-s.sendCh:
packet.mu.Lock()
_, rErr := dataBuf.ReadFrom(packet.r)
if rErr != nil {
packet.mu.Unlock()
s.logger.Error("Failed to copy data into buffer", "error", rErr)
s.asyncSendErr(packet.err, rErr)
return rErr
}
if s.logger.Enabled(s.ctx, slog.LevelDebug) {
s.logger.Debug("Sending data to connection",
"data", base64.StdEncoding.EncodeToString(packet.data))
}
if len(packet.data) != 0 {
// Copy the data into the buffer to avoid holding a mutex lock during the writing.
_, err := dataBuf.Write(packet.data)
if err != nil {
packet.data = nil
packet.mu.Unlock()
s.logger.Error("Failed to copy data into buffer", "error", err)
s.asyncSendErr(packet.err, err)
return err // TODO: Do we need to return here?
}
s.logger.Debug("Data copied into buffer")
packet.data = nil
"data", base64.StdEncoding.EncodeToString(dataBuf.Bytes()))
}
packet.mu.Unlock()

Expand Down Expand Up @@ -375,7 +348,7 @@ func (s *Session) readMessagePayload(hdr Header, conn io.Reader) error {
s.logger.Debug("Invoking OnReceive handler", "message",
base64.StdEncoding.EncodeToString(s.receiveBuffer.Bytes()))
}
s.config.handler.OnReceive(s, s.receiveBuffer.Bytes()) // Invoke OnReceive handler.
s.config.handler.OnReceive(s, bytes.NewReader(s.receiveBuffer.Bytes())) // Invoke OnReceive handler.
nickeskov marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down Expand Up @@ -405,9 +378,13 @@ func (s *Session) keepaliveLoop() error {

// sendPacket is used to send data.
type sendPacket struct {
mu sync.Mutex // Protects data from unsafe reads.
data []byte
err chan<- error
mu sync.Mutex // Protects data from unsafe reads.
r io.Reader
err chan<- error
}

func newSendPacket(data []byte, ch chan<- error) *sendPacket {
return &sendPacket{r: bytes.NewReader(data), err: ch}
}

// asyncSendErr is used to try an async send of an error.
Expand Down
6 changes: 4 additions & 2 deletions pkg/networking/session_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package networking_test

import (
"bytes"
"context"
"encoding/binary"
"errors"
Expand Down Expand Up @@ -55,7 +56,8 @@ func TestSuccessfulSession(t *testing.T) {
require.NoError(t, wErr)
assert.Equal(t, 5, n)
})
sc2 := serverHandler.On("OnReceive", ss, encodeMessage("Hello session")).Once().Return()
sc2 := serverHandler.On("OnReceive", ss, bytes.NewReader(encodeMessage("Hello session"))).
Once().Return()
sc2.NotBefore(sc1).
Run(func(_ mock.Arguments) {
n, wErr := ss.Write(encodeMessage("Hi"))
Expand All @@ -73,7 +75,7 @@ func TestSuccessfulSession(t *testing.T) {
require.NoError(t, wErr)
assert.Equal(t, 17, n)
})
cl2 := clientHandler.On("OnReceive", cs, encodeMessage("Hi")).Once().Return()
cl2 := clientHandler.On("OnReceive", cs, bytes.NewReader(encodeMessage("Hi"))).Once().Return()
cl2.NotBefore(cl1).
Run(func(_ mock.Arguments) {
cWG.Done()
Expand Down
Loading