From 685f691057ae2709d4590010e3338c75726bb768 Mon Sep 17 00:00:00 2001 From: kcross Date: Wed, 11 Dec 2024 21:57:41 +0000 Subject: [PATCH] feat(registry): added self contained registries so you can run a back to back test in process --- EmptyApplication.go | 29 +++++++ Makefile | 2 +- acceptor.go | 19 ++++- go.mod | 27 ++++--- go.sum | 73 +++++++++-------- initiator.go | 18 ++++- message_router.go | 19 ++++- message_router_test.go | 7 +- registry.go | 131 ++++++++++++++++++++++-------- service_test.go | 174 ++++++++++++++++++++++++++++++++++++++++ session_factory.go | 3 +- session_factory_test.go | 2 +- slog_log.go | 82 +++++++++++++++++++ 13 files changed, 490 insertions(+), 96 deletions(-) create mode 100644 EmptyApplication.go create mode 100644 service_test.go create mode 100644 slog_log.go diff --git a/EmptyApplication.go b/EmptyApplication.go new file mode 100644 index 000000000..00a3932c6 --- /dev/null +++ b/EmptyApplication.go @@ -0,0 +1,29 @@ +package quickfix + +type EmptyApplication struct{} + +var _ Application = EmptyApplication{} + +func (e EmptyApplication) OnCreate(_ SessionID) { +} + +func (e EmptyApplication) OnLogon(_ SessionID) { +} + +func (e EmptyApplication) OnLogout(_ SessionID) { +} + +func (e EmptyApplication) ToAdmin(_ *Message, _ SessionID) { +} + +func (e EmptyApplication) ToApp(_ *Message, _ SessionID) error { + return nil +} + +func (e EmptyApplication) FromAdmin(_ *Message, _ SessionID) MessageRejectError { + return nil +} + +func (e EmptyApplication) FromApp(_ *Message, _ SessionID) MessageRejectError { + return nil +} diff --git a/Makefile b/Makefile index 8d3c2c6cc..693fb2cad 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ vet: go vet `go list ./... | grep -v quickfix/gen` test: - MONGODB_TEST_CXN=mongodb://db:27017 go test -v -cover `go list ./... | grep -v quickfix/gen` + MONGODB_TEST_CXN=mongodb://db:27017 go test -v -race -timeout 20s -cover `go list ./... | grep -v quickfix/gen` linters-install: @golangci-lint --version >/dev/null 2>&1 || { \ diff --git a/acceptor.go b/acceptor.go index 2f9f6c48b..1833317fc 100644 --- a/acceptor.go +++ b/acceptor.go @@ -150,7 +150,7 @@ func (a *Acceptor) Stop() { a.sessionGroup.Wait() for sessionID := range a.sessions { - err := UnregisterSession(sessionID) + err := a.UnregisterSession(sessionID) if err != nil { return } @@ -167,8 +167,16 @@ func (a *Acceptor) RemoteAddr(sessionID SessionID) (net.Addr, bool) { return val, ok } +type AcceptorOption func(*Acceptor) + +func WithAcceptorRegistry(registry *Registry) AcceptorOption { + return func(a *Acceptor) { + a.sessionFactory.Registry = registry + } +} + // NewAcceptor creates and initializes a new Acceptor. -func NewAcceptor(app Application, storeFactory MessageStoreFactory, settings *Settings, logFactory LogFactory) (a *Acceptor, err error) { +func NewAcceptor(app Application, storeFactory MessageStoreFactory, settings *Settings, logFactory LogFactory, opts ...AcceptorOption) (a *Acceptor, err error) { a = &Acceptor{ app: app, storeFactory: storeFactory, @@ -178,6 +186,11 @@ func NewAcceptor(app Application, storeFactory MessageStoreFactory, settings *Se sessionHostPort: make(map[SessionID]int), listeners: make(map[string]net.Listener), } + a.sessionFactory.Registry = defaultRegistry + for _, opt := range opts { + opt(a) + } + if a.settings.GlobalSettings().HasSetting(config.DynamicSessions) { if a.dynamicSessions, err = settings.globalSettings.BoolSetting(config.DynamicSessions); err != nil { return @@ -386,7 +399,7 @@ LOOP: sessions[sessionID] = session go func() { session.run() - err := UnregisterSession(session.sessionID) + err := a.UnregisterSession(session.sessionID) if err != nil { a.globalLog.OnEventf("Unregister dynamic session %v failed: %v", session.sessionID, err) return diff --git a/go.mod b/go.mod index 8b36cf024..18570b233 100644 --- a/go.mod +++ b/go.mod @@ -4,29 +4,34 @@ go 1.21 require ( github.com/mattn/go-sqlite3 v1.14.22 - github.com/pires/go-proxyproto v0.7.0 + github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 + github.com/pires/go-proxyproto v0.8.0 github.com/pkg/errors v0.9.1 + github.com/quickfixgo/enum v0.1.0 + github.com/quickfixgo/field v0.1.0 + github.com/quickfixgo/fix44 v0.1.0 + github.com/quickfixgo/tag v0.1.0 github.com/shopspring/decimal v1.4.0 - github.com/stretchr/testify v1.8.4 - go.mongodb.org/mongo-driver v1.15.0 - golang.org/x/net v0.24.0 + github.com/stretchr/testify v1.10.0 + go.mongodb.org/mongo-driver v1.17.1 + golang.org/x/net v0.32.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/klauspost/compress v1.15.12 // indirect - github.com/kr/text v0.2.0 // indirect - github.com/montanaflynn/stats v0.6.6 // indirect + github.com/kr/pretty v0.3.0 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/objx v0.5.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect - github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect - golang.org/x/crypto v0.22.0 // indirect - golang.org/x/sync v0.1.0 // indirect - golang.org/x/text v0.14.0 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect + golang.org/x/crypto v0.30.0 // indirect + golang.org/x/sync v0.10.0 // indirect + golang.org/x/text v0.21.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index a7e47992c..ce3a22fc0 100644 --- a/go.sum +++ b/go.sum @@ -1,68 +1,74 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= -github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM= github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= -github.com/montanaflynn/stats v0.6.6 h1:Duep6KMIDpY4Yo11iFsvyqJDyfzLF9+sndUKT+v64GQ= -github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= -github.com/pires/go-proxyproto v0.7.0 h1:IukmRewDQFWC7kfnb66CSomk2q/seBuilHBYFwyq0Hs= -github.com/pires/go-proxyproto v0.7.0/go.mod h1:Vz/1JPY/OACxWGQNIRY2BeyDmpoaWmEP40O9LbuiFR4= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI= +github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE= +github.com/pires/go-proxyproto v0.8.0 h1:5unRmEAPbHXHuLjDg01CxJWf91cw3lKHc/0xzKpXEe0= +github.com/pires/go-proxyproto v0.8.0/go.mod h1:iknsfgnH8EkjrMeMyvfKByp9TiBZCKZM0jx2xmKqnVY= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/quickfixgo/enum v0.1.0 h1:TnCPOqxAWA5/IWp7lsvj97x7oyuHYgj3STBJlBzZGjM= +github.com/quickfixgo/enum v0.1.0/go.mod h1:65gdG2/8vr6uOYcjZBObVHMuTEYc5rr/+aKVWTrFIrQ= +github.com/quickfixgo/field v0.1.0 h1:JVO6fVD6Nkyy8e/ROYQtV/nQhMX/BStD5Lq7XIgYz2g= +github.com/quickfixgo/field v0.1.0/go.mod h1:Zu0qYmpj+gljlB2HgpUt9EcTIThs2lIQb8C57qbJr8o= +github.com/quickfixgo/fix44 v0.1.0 h1:g/rTl6mXDlG7iIMbY7zaPbHcj9N/B+tteOZ01yGzeSQ= +github.com/quickfixgo/fix44 v0.1.0/go.mod h1:d6Ia02Eq/JYgKCn/2V9FHxguAl1Alp/yu/xVpry82dA= +github.com/quickfixgo/tag v0.1.0 h1:R2A1Zf7CBE903+mOQlmTlfTmNZQz/yh7HunMbgcsqsA= +github.com/quickfixgo/tag v0.1.0/go.mod h1:l/drB1eO3PwN9JQTDC9Vt2EqOcaXk3kGJ+eeCQljvAI= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= -github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= -github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.mongodb.org/mongo-driver v1.15.0 h1:rJCKC8eEliewXjZGf0ddURtl7tTVy1TK3bfl0gkUSLc= -go.mongodb.org/mongo-driver v1.15.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= +go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM= +go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= -golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= +golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= -golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= +golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= +golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -73,17 +79,16 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/initiator.go b/initiator.go index 18451477e..7325ff795 100644 --- a/initiator.go +++ b/initiator.go @@ -78,15 +78,23 @@ func (i *Initiator) Stop() { i.wg.Wait() for sessionID := range i.sessionSettings { - err := UnregisterSession(sessionID) + err := i.UnregisterSession(sessionID) if err != nil { return } } } +type InitiatorOption func(*Initiator) + +func WithInitiatorRegistry(registry *Registry) InitiatorOption { + return func(i *Initiator) { + i.sessionFactory.Registry = registry + } +} + // NewInitiator creates and initializes a new Initiator. -func NewInitiator(app Application, storeFactory MessageStoreFactory, appSettings *Settings, logFactory LogFactory) (*Initiator, error) { +func NewInitiator(app Application, storeFactory MessageStoreFactory, appSettings *Settings, logFactory LogFactory, opts ...InitiatorOption) (*Initiator, error) { i := &Initiator{ app: app, storeFactory: storeFactory, @@ -94,7 +102,11 @@ func NewInitiator(app Application, storeFactory MessageStoreFactory, appSettings sessionSettings: appSettings.SessionSettings(), logFactory: logFactory, sessions: make(map[SessionID]*session), - sessionFactory: sessionFactory{true}, + sessionFactory: sessionFactory{BuildInitiators: true, Registry: defaultRegistry}, + } + + for _, opt := range opts { + opt(i) } var err error diff --git a/message_router.go b/message_router.go index c640f3e4d..e372a3b5f 100644 --- a/message_router.go +++ b/message_router.go @@ -40,11 +40,24 @@ type MessageRoute func(msg *Message, sessionID SessionID) MessageRejectError // A MessageRouter is a mutex for MessageRoutes. type MessageRouter struct { routes map[routeKey]MessageRoute + *Registry +} +type RouterOption func(*MessageRouter) + +func WithMessageRouterRegistry(registry *Registry) RouterOption { + return func(r *MessageRouter) { + r.Registry = registry + } } // NewMessageRouter returns an initialized MessageRouter instance. -func NewMessageRouter() *MessageRouter { - return &MessageRouter{routes: make(map[routeKey]MessageRoute)} +func NewMessageRouter(opts ...RouterOption) *MessageRouter { + router := MessageRouter{routes: make(map[routeKey]MessageRoute)} + router.Registry = defaultRegistry + for _, opt := range opts { + opt(&router) + } + return &router } // AddRoute adds a route to the MessageRouter instance keyed to begin string and msgType. @@ -74,7 +87,7 @@ func (c MessageRouter) tryRoute(beginString string, msgType string, msg *Message if beginString == BeginStringFIXT11 && !isAdminMsg { var applVerID FIXString if err := msg.Header.GetField(tagApplVerID, &applVerID); err != nil { - session, _ := lookupSession(sessionID) + session, _ := c.lookupSession(sessionID) applVerID = FIXString(session.TargetDefaultApplicationVersionID()) } diff --git a/message_router_test.go b/message_router_test.go index b2e7b59f5..bf4961125 100644 --- a/message_router_test.go +++ b/message_router_test.go @@ -70,7 +70,7 @@ func (suite *MessageRouterTestSuite) givenTargetDefaultApplVerIDForSession(defau sessionID: sessionID, targetDefaultApplVerID: defaultApplVerID, } - suite.Nil(registerSession(s)) + suite.Nil(suite.registerSession(s)) } func (suite *MessageRouterTestSuite) givenAFIX42NewOrderSingle() { @@ -107,10 +107,7 @@ func (suite *MessageRouterTestSuite) resetRouter() { func (suite *MessageRouterTestSuite) SetupTest() { suite.resetRouter() - sessionsLock.Lock() - defer sessionsLock.Unlock() - - sessions = make(map[SessionID]*session) + suite.Registry = NewRegistry() suite.msg = NewMessage() } diff --git a/registry.go b/registry.go index 5f8e69fc2..b5ba5ca99 100644 --- a/registry.go +++ b/registry.go @@ -20,8 +20,21 @@ import ( "sync" ) -var sessionsLock sync.RWMutex -var sessions = make(map[SessionID]*session) +var defaultRegistry = NewRegistry() + +func SetDefaultRegistry(r *Registry) { + defaultRegistry = r +} + +func NewRegistry() *Registry { + return &Registry{sessions: make(map[SessionID]*session)} +} + +type Registry struct { + sessionsLock sync.RWMutex + sessions map[SessionID]*session +} + var errDuplicateSessionID = errors.New("Duplicate SessionID") var errUnknownSession = errors.New("Unknown session") @@ -31,7 +44,7 @@ type Messagable interface { } // Send determines the session to send Messagable using header fields BeginString, TargetCompID, SenderCompID. -func Send(m Messagable) (err error) { +func (r *Registry) Send(m Messagable) (err error) { msg := m.ToMessage() var beginString FIXString if err := msg.Header.GetField(tagBeginString, &beginString); err != nil { @@ -50,13 +63,13 @@ func Send(m Messagable) (err error) { sessionID := SessionID{BeginString: string(beginString), TargetCompID: string(targetCompID), SenderCompID: string(senderCompID)} - return SendToTarget(msg, sessionID) + return r.SendToTarget(msg, sessionID) } // SendToTarget sends a message based on the sessionID. Convenient for use in FromApp since it provides a session ID for incoming messages. -func SendToTarget(m Messagable, sessionID SessionID) error { +func (r *Registry) SendToTarget(m Messagable, sessionID SessionID) error { msg := m.ToMessage() - session, ok := lookupSession(sessionID) + session, ok := r.lookupSession(sessionID) if !ok { return errUnknownSession } @@ -65,8 +78,8 @@ func SendToTarget(m Messagable, sessionID SessionID) error { } // ResetSession resets session's sequence numbers. -func ResetSession(sessionID SessionID) error { - session, ok := lookupSession(sessionID) +func (r *Registry) ResetSession(sessionID SessionID) error { + session, ok := r.lookupSession(sessionID) if !ok { return errUnknownSession } @@ -81,12 +94,12 @@ func ResetSession(sessionID SessionID) error { } // UnregisterSession removes a session from the set of known sessions. -func UnregisterSession(sessionID SessionID) error { - sessionsLock.Lock() - defer sessionsLock.Unlock() +func (r *Registry) UnregisterSession(sessionID SessionID) error { + r.sessionsLock.Lock() + defer r.sessionsLock.Unlock() - if _, ok := sessions[sessionID]; ok { - delete(sessions, sessionID) + if _, ok := r.sessions[sessionID]; ok { + delete(r.sessions, sessionID) return nil } @@ -94,8 +107,8 @@ func UnregisterSession(sessionID SessionID) error { } // SetNextTargetMsgSeqNum set the next expected target message sequence number for the session matching the session id. -func SetNextTargetMsgSeqNum(sessionID SessionID, seqNum int) error { - session, ok := lookupSession(sessionID) +func (r *Registry) SetNextTargetMsgSeqNum(sessionID SessionID, seqNum int) error { + session, ok := r.lookupSession(sessionID) if !ok { return errUnknownSession } @@ -103,8 +116,8 @@ func SetNextTargetMsgSeqNum(sessionID SessionID, seqNum int) error { } // SetNextSenderMsgSeqNum sets the next outgoing message sequence number for the session matching the session id. -func SetNextSenderMsgSeqNum(sessionID SessionID, seqNum int) error { - session, ok := lookupSession(sessionID) +func (r *Registry) SetNextSenderMsgSeqNum(sessionID SessionID, seqNum int) error { + session, ok := r.lookupSession(sessionID) if !ok { return errUnknownSession } @@ -112,8 +125,8 @@ func SetNextSenderMsgSeqNum(sessionID SessionID, seqNum int) error { } // GetExpectedSenderNum retrieves the expected sender sequence number for the session matching the session id. -func GetExpectedSenderNum(sessionID SessionID) (int, error) { - session, ok := lookupSession(sessionID) +func (r *Registry) GetExpectedSenderNum(sessionID SessionID) (int, error) { + session, ok := r.lookupSession(sessionID) if !ok { return 0, errUnknownSession } @@ -121,8 +134,8 @@ func GetExpectedSenderNum(sessionID SessionID) (int, error) { } // GetExpectedTargetNum retrieves the next target sequence number for the session matching the session id. -func GetExpectedTargetNum(sessionID SessionID) (int, error) { - session, ok := lookupSession(sessionID) +func (r *Registry) GetExpectedTargetNum(sessionID SessionID) (int, error) { + session, ok := r.lookupSession(sessionID) if !ok { return 0, errUnknownSession } @@ -130,8 +143,8 @@ func GetExpectedTargetNum(sessionID SessionID) (int, error) { } // GetMessageStore returns the MessageStore interface for session matching the session id. -func GetMessageStore(sessionID SessionID) (MessageStore, error) { - session, ok := lookupSession(sessionID) +func (r *Registry) GetMessageStore(sessionID SessionID) (MessageStore, error) { + session, ok := r.lookupSession(sessionID) if !ok { return nil, errUnknownSession } @@ -139,30 +152,80 @@ func GetMessageStore(sessionID SessionID) (MessageStore, error) { } // GetLog returns the Log interface for session matching the session id. -func GetLog(sessionID SessionID) (Log, error) { - session, ok := lookupSession(sessionID) +func (r *Registry) GetLog(sessionID SessionID) (Log, error) { + session, ok := r.lookupSession(sessionID) if !ok { return nil, errUnknownSession } return session.log, nil } -func registerSession(s *session) error { - sessionsLock.Lock() - defer sessionsLock.Unlock() +func (r *Registry) registerSession(s *session) error { + r.sessionsLock.Lock() + defer r.sessionsLock.Unlock() - if _, ok := sessions[s.sessionID]; ok { + if _, ok := r.sessions[s.sessionID]; ok { return errDuplicateSessionID } - sessions[s.sessionID] = s + r.sessions[s.sessionID] = s return nil } -func lookupSession(sessionID SessionID) (s *session, ok bool) { - sessionsLock.RLock() - defer sessionsLock.RUnlock() +func (r *Registry) lookupSession(sessionID SessionID) (s *session, ok bool) { + r.sessionsLock.RLock() + defer r.sessionsLock.RUnlock() - s, ok = sessions[sessionID] + s, ok = r.sessions[sessionID] return } + +// Send determines the session to send Messagable using header fields BeginString, TargetCompID, SenderCompID. +func Send(m Messagable) (err error) { + return defaultRegistry.Send(m) +} + +// SendToTarget sends a message based on the sessionID. Convenient for use in FromApp since it provides a session ID for incoming messages. +func SendToTarget(m Messagable, sessionID SessionID) error { + return defaultRegistry.SendToTarget(m, sessionID) +} + +// ResetSession resets session's sequence numbers. +func ResetSession(sessionID SessionID) error { + return defaultRegistry.ResetSession(sessionID) +} + +// UnregisterSession removes a session from the set of known sessions. +func UnregisterSession(sessionID SessionID) error { + return defaultRegistry.UnregisterSession(sessionID) +} + +// SetNextTargetMsgSeqNum set the next expected target message sequence number for the session matching the session id. +func SetNextTargetMsgSeqNum(sessionID SessionID, seqNum int) error { + return defaultRegistry.SetNextTargetMsgSeqNum(sessionID, seqNum) +} + +// SetNextSenderMsgSeqNum sets the next outgoing message sequence number for the session matching the session id. +func SetNextSenderMsgSeqNum(sessionID SessionID, seqNum int) error { + return defaultRegistry.SetNextSenderMsgSeqNum(sessionID, seqNum) +} + +// GetExpectedSenderNum retrieves the expected sender sequence number for the session matching the session id. +func GetExpectedSenderNum(sessionID SessionID) (int, error) { + return defaultRegistry.GetExpectedSenderNum(sessionID) +} + +// GetExpectedTargetNum retrieves the next target sequence number for the session matching the session id. +func GetExpectedTargetNum(sessionID SessionID) (int, error) { + return defaultRegistry.GetExpectedTargetNum(sessionID) +} + +// GetMessageStore returns the MessageStore interface for session matching the session id. +func GetMessageStore(sessionID SessionID) (MessageStore, error) { + return defaultRegistry.GetMessageStore(sessionID) +} + +// GetLog returns the Log interface for session matching the session id. +func GetLog(sessionID SessionID) (Log, error) { + return defaultRegistry.GetLog(sessionID) +} diff --git a/service_test.go b/service_test.go new file mode 100644 index 000000000..a8fdf87af --- /dev/null +++ b/service_test.go @@ -0,0 +1,174 @@ +package quickfix_test + +import ( + "strconv" + "sync/atomic" + "testing" + "time" + + "github.com/phayes/freeport" + "github.com/quickfixgo/enum" + "github.com/quickfixgo/field" + report "github.com/quickfixgo/fix44/tradecapturereport" + ack "github.com/quickfixgo/fix44/tradecapturereportack" + "github.com/quickfixgo/quickfix" + "github.com/quickfixgo/tag" + "github.com/shopspring/decimal" + "github.com/stretchr/testify/require" +) + +type Client struct { + *quickfix.Initiator + quickfix.EmptyApplication + isLoggedIn atomic.Int32 + t *testing.T + *quickfix.MessageRouter + reports atomic.Int32 + sessionID quickfix.SessionID +} + +func (c *Client) OnLogon(_ quickfix.SessionID) { + c.isLoggedIn.Add(1) +} +func (c *Client) OnLogoff(_ quickfix.SessionID) { + c.isLoggedIn.Add(-1) +} + +func (c *Client) FromApp(msg *quickfix.Message, id quickfix.SessionID) quickfix.MessageRejectError { + err := c.Route(msg, id) + require.NoError(c.t, err) + return nil +} + +var _ quickfix.Application = &Client{} + +type Server struct { + *quickfix.Acceptor + quickfix.EmptyApplication + t *testing.T + isLoggedIn atomic.Int32 + *quickfix.MessageRouter + acks atomic.Int32 + sessionID quickfix.SessionID +} + +func (s *Server) OnLogon(_ quickfix.SessionID) { + s.isLoggedIn.Add(1) +} +func (s *Server) OnLogoff(_ quickfix.SessionID) { + s.isLoggedIn.Add(-1) +} +func (s *Server) FromApp(msg *quickfix.Message, id quickfix.SessionID) quickfix.MessageRejectError { + err := s.Route(msg, id) + require.NoError(s.t, err) + return nil +} + +var _ quickfix.Application = &Server{} + +func TestBackToBackTest(t *testing.T) { + port, err := freeport.GetFreePort() + require.NoError(t, err) + //This is to catch any default registry that might be used by getting a segv + quickfix.SetDefaultRegistry(nil) + s := NewServer(t, port).Start() + c := NewClient(t, port).Start() + t.Cleanup(c.Stop) + t.Cleanup(s.Stop) + + require.Eventually(t, func() bool { return c.isLoggedIn.Load() > 0 }, 10*time.Second, 100*time.Millisecond) + + m := report.New( + field.NewTradeReportID(strconv.Itoa(1)), + field.NewPreviouslyReported(false), + field.NewLastQty(decimal.New(1, 1), 2), + field.NewLastPx(decimal.New(1, 1), 2), + field.NewTradeDate(time.Now().Format("20060102")), + field.NewTransactTime(time.Now()), + ) + m.SetString(tag.Account, "account") + m.SetExecID("execid") + m.SetSymbol("symbol") + m.SetSendingTime(time.Now()) + m.SetOrderQty(decimal.New(1, 1), 2) + + err = s.SendToTarget(m, s.sessionID) + require.NoError(t, err) + + require.Eventually(t, func() bool { return s.acks.Load() > 0 }, 10*time.Second, 100*time.Millisecond) + require.Equal(t, int32(1), s.acks.Load()) + require.Equal(t, int32(1), c.isLoggedIn.Load()) + require.Equal(t, int32(1), s.isLoggedIn.Load()) + require.Equal(t, int32(1), c.reports.Load()) + +} + +func NewServer(t *testing.T, port int) *Server { + var err error + s := &Server{t: t} + settings := quickfix.NewSettings() + settings.GlobalSettings().Set("HeartBtInt", "1") + session := quickfix.NewSessionSettings() + session.Set("BeginString", "FIX.4.4") + session.Set("SocketAcceptPort", strconv.Itoa(port)) + session.Set("TargetCompID", "sender") + session.Set("SenderCompID", "target") + s.sessionID, err = settings.AddSession(session) + require.NoError(t, err) + logger := quickfix.NewSlogLogger() + logger.Name = "server" + registry := quickfix.NewRegistry() + + s.Acceptor, err = quickfix.NewAcceptor(s, quickfix.NewMemoryStoreFactory(), settings, logger, quickfix.WithAcceptorRegistry(registry)) + s.MessageRouter = quickfix.NewMessageRouter(quickfix.WithMessageRouterRegistry(registry)) + require.NoError(t, err) + s.AddRoute(ack.Route(s.OnAck)) + return s +} +func (s *Server) Start() *Server { + s.t.Logf("Starting server") + require.NoError(s.t, s.Acceptor.Start()) + return s +} + +func (s *Server) OnAck(_ ack.TradeCaptureReportAck, _ quickfix.SessionID) quickfix.MessageRejectError { + s.acks.Add(1) + return nil +} + +func NewClient(t *testing.T, port int) *Client { + var err error + c := &Client{t: t} + settings := quickfix.NewSettings() + session := quickfix.NewSessionSettings() + session.Set("BeginString", "FIX.4.4") + session.Set("SocketConnectPort", strconv.Itoa(port)) + session.Set("SocketConnectHost", "localhost") + session.Set("TargetCompID", "target") + session.Set("SenderCompID", "sender") + session.Set("HeartBtInt", "1") + c.sessionID, err = settings.AddSession(session) + require.NoError(t, err) + + logger := quickfix.NewSlogLogger() + logger.Name = "client" + registry := quickfix.NewRegistry() + c.Initiator, err = quickfix.NewInitiator(c, quickfix.NewMemoryStoreFactory(), settings, logger, quickfix.WithInitiatorRegistry(registry)) + require.NoError(t, err) + c.MessageRouter = quickfix.NewMessageRouter(quickfix.WithMessageRouterRegistry(registry)) + c.AddRoute(report.Route(c.OnReport)) + + return c +} +func (c *Client) Start() *Client { + c.t.Logf("Starting client") + require.NoError(c.t, c.Initiator.Start()) + return c +} + +func (c *Client) OnReport(_ report.TradeCaptureReport, id quickfix.SessionID) quickfix.MessageRejectError { + err := c.SendToTarget(ack.New(field.NewTradeReportID("1"), field.NewExecType(enum.ExecType_NEW)), id) + require.NoError(c.t, err) + c.reports.Add(1) + return nil +} diff --git a/session_factory.go b/session_factory.go index effe6ed35..02d5b9508 100644 --- a/session_factory.go +++ b/session_factory.go @@ -60,6 +60,7 @@ var applVerIDLookup = map[string]string{ type sessionFactory struct { // True if building sessions that initiate logon. BuildInitiators bool + *Registry } // Creates Session, associates with internal session registry. @@ -72,7 +73,7 @@ func (f sessionFactory) createSession( return } - if err = registerSession(session); err != nil { + if err = f.registerSession(session); err != nil { return } application.OnCreate(session.sessionID) diff --git a/session_factory_test.go b/session_factory_test.go index 349e7580c..35caaf4f2 100644 --- a/session_factory_test.go +++ b/session_factory_test.go @@ -484,7 +484,7 @@ func (s *SessionFactorySuite) TestDuplicateSession() { _, err = s.createSession(s.SessionID, s.MessageStoreFactory, s.SessionSettings, s.LogFactory, s.App) s.NotNil(err) s.Equal("Duplicate SessionID", err.Error()) - UnregisterSession(s.SessionID) + s.UnregisterSession(s.SessionID) _, err = s.createSession(s.SessionID, s.MessageStoreFactory, s.SessionSettings, s.LogFactory, s.App) s.Nil(err) } diff --git a/slog_log.go b/slog_log.go new file mode 100644 index 000000000..1c32db749 --- /dev/null +++ b/slog_log.go @@ -0,0 +1,82 @@ +package quickfix + +import ( + "fmt" + "log/slog" + "strings" +) + +type SlogLog struct { + Name string + *slog.Logger +} + +var _ LogFactory = &SlogLog{} +var _ Log = &SlogLog{} + +func (l *SlogLog) OnIncoming(s []byte) { + l.logMessage("<-"+l.Name, s) +} + +func (l *SlogLog) OnOutgoing(s []byte) { + l.logMessage("->"+l.Name, s) +} + +func (l *SlogLog) OnEvent(s string) { + slog.Info("->"+l.Name+":event", "event", s) +} + +func (l *SlogLog) OnEventf(format string, a ...interface{}) { + l.OnEvent(fmt.Sprintf(format, a...)) +} + +// NewLogger creates an instance of LogFactory that writes messages and events to stdout. +func NewSlogLogger() *SlogLog { + return &SlogLog{Logger: slog.Default()} +} + +func (l *SlogLog) Create() (Log, error) { + return l, nil +} + +func (l *SlogLog) CreateSessionLog(sessionID SessionID) (Log, error) { + return &SlogLog{Name: l.Name, Logger: l.Logger.With("session", sessionID.String())}, nil +} + +func (l *SlogLog) logMessage(msg string, data []byte) { + slog.Info(msg, "message", ToValues(data)) +} + +func ToValues(s []byte) slog.Value { + fields := AsFields(s) + var attrs []slog.Attr + for _, f := range fields { + attrs = append(attrs, slog.String(f.Tag, f.Value)) + } + return slog.GroupValue(attrs...) +} + +const sohCode = "\001" + +func SplitParts(msg []byte) []string { + return strings.Split(string(msg), sohCode) +} +func ToFields(splitMsg []string) []LField { + var fields []LField + for _, part := range splitMsg { + parts := strings.Split(part, "=") + if len(parts) == 2 { + fields = append(fields, LField{Tag: parts[0], Value: parts[1]}) + } + } + return fields +} + +type LField struct { + Tag string + Value string +} + +func AsFields(s []byte) []LField { + return ToFields(SplitParts(s)) +}