Skip to content

Commit

Permalink
Merge pull request #101 from BoostryJP/p2p-message-atom
Browse files Browse the repository at this point in the history
p2p: Use atomic types
  • Loading branch information
YoshihitoAso authored Feb 18, 2025
2 parents d896ee1 + 4125add commit e883d29
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 19 deletions.
2 changes: 1 addition & 1 deletion p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (t *UDPv4) RequestENR(n *enode.Node) (*enode.Node, error) {
return nil, err
}
if respN.ID() != n.ID() {
return nil, fmt.Errorf("invalid ID in response record")
return nil, errors.New("invalid ID in response record")
}
if respN.Seq() < n.Seq() {
return n, nil // response record is older
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (t *UDPv5) verifyResponseNode(c *callV5, r *enr.Record, distances []uint, s
}
}
if _, ok := seen[node.ID()]; ok {
return nil, fmt.Errorf("duplicate record")
return nil, errors.New("duplicate record")
}
seen[node.ID()] = struct{}{}
return node, nil
Expand Down
4 changes: 2 additions & 2 deletions p2p/discover/v5wire/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,11 @@ func (c *Codec) makeHandshakeAuth(toID enode.ID, addr string, challenge *Whoarey
// key is part of the ID nonce signature.
var remotePubkey = new(ecdsa.PublicKey)
if err := challenge.Node.Load((*enode.Secp256k1)(remotePubkey)); err != nil {
return nil, nil, fmt.Errorf("can't find secp256k1 key for recipient")
return nil, nil, errors.New("can't find secp256k1 key for recipient")
}
ephkey, err := c.sc.ephemeralKeyGen()
if err != nil {
return nil, nil, fmt.Errorf("can't generate ephemeral key")
return nil, nil, errors.New("can't generate ephemeral key")
}
ephpubkey := EncodePubkey(&ephkey.PublicKey)
auth.pubkey = ephpubkey[:]
Expand Down
3 changes: 2 additions & 1 deletion p2p/dnsdisc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dnsdisc
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -194,7 +195,7 @@ func (c *Client) resolveEntry(ctx context.Context, domain, hash string) (entry,
func (c *Client) doResolveEntry(ctx context.Context, domain, hash string) (entry, error) {
wantHash, err := b32format.DecodeString(hash)
if err != nil {
return nil, fmt.Errorf("invalid base32 hash")
return nil, errors.New("invalid base32 hash")
}
name := hash + "." + domain
txts, err := c.cfg.Resolver.LookupTXT(ctx, hash+"."+domain)
Expand Down
4 changes: 2 additions & 2 deletions p2p/enode/idscheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package enode

import (
"crypto/ecdsa"
"fmt"
"errors"
"io"

"github.com/ethereum/go-ethereum/common/math"
Expand Down Expand Up @@ -66,7 +66,7 @@ func (V4ID) Verify(r *enr.Record, sig []byte) error {
if err := r.Load(&entry); err != nil {
return err
} else if len(entry) != 33 {
return fmt.Errorf("invalid public key")
return errors.New("invalid public key")
}

h := sha3.NewLegacyKeccak256()
Expand Down
11 changes: 5 additions & 6 deletions p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
var (
c1, c2 = make(chan Msg), make(chan Msg)
closing = make(chan struct{})
closed = new(int32)
closed = new(atomic.Bool)
rw1 = &MsgPipeRW{c1, c2, closing, closed}
rw2 = &MsgPipeRW{c2, c1, closing, closed}
)
Expand All @@ -178,13 +178,13 @@ type MsgPipeRW struct {
w chan<- Msg
r <-chan Msg
closing chan struct{}
closed *int32
closed *atomic.Bool
}

// WriteMsg sends a message on the pipe.
// It blocks until the receiver has consumed the message payload.
func (p *MsgPipeRW) WriteMsg(msg Msg) error {
if atomic.LoadInt32(p.closed) == 0 {
if !p.closed.Load() {
consumed := make(chan struct{}, 1)
msg.Payload = &eofSignal{msg.Payload, msg.Size, consumed}
select {
Expand All @@ -205,7 +205,7 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error {

// ReadMsg returns a message sent on the other end of the pipe.
func (p *MsgPipeRW) ReadMsg() (Msg, error) {
if atomic.LoadInt32(p.closed) == 0 {
if !p.closed.Load() {
select {
case msg := <-p.r:
return msg, nil
Expand All @@ -219,9 +219,8 @@ func (p *MsgPipeRW) ReadMsg() (Msg, error) {
// of the pipe. They will return ErrPipeClosed. Close also
// interrupts any reads from a message payload.
func (p *MsgPipeRW) Close() error {
if atomic.AddInt32(p.closed, 1) != 1 {
if p.closed.Swap(true) {
// someone else is already closing
atomic.StoreInt32(p.closed, 1) // avoid overflow
return nil
}
close(p.closing)
Expand Down
3 changes: 2 additions & 1 deletion p2p/nat/natpmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package nat

import (
"errors"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -46,7 +47,7 @@ func (n *pmp) ExternalIP() (net.IP, error) {

func (n *pmp) AddMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error {
if lifetime <= 0 {
return fmt.Errorf("lifetime must not be <= 0")
return errors.New("lifetime must not be <= 0")
}
// Note order of port arguments is switched between our
// AddMapping and the client's AddPortMapping.
Expand Down
5 changes: 2 additions & 3 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"net"
"sort"
"sync"
Expand Down Expand Up @@ -913,13 +912,13 @@ func (srv *Server) checkInboundConn(remoteIP net.IP) error {
}
// Reject connections that do not match NetRestrict.
if srv.NetRestrict != nil && !srv.NetRestrict.Contains(remoteIP) {
return fmt.Errorf("not whitelisted in NetRestrict")
return errors.New("not whitelisted in NetRestrict")
}
// Reject Internet peers that try too often.
now := srv.clock.Now()
srv.inboundHistory.expire(now, nil)
if !netutil.IsLAN(remoteIP) && srv.inboundHistory.contains(remoteIP.String()) {
return fmt.Errorf("too many attempts")
return errors.New("too many attempts")
}
srv.inboundHistory.add(remoteIP.String(), now.Add(inboundThrottleTime))
return nil
Expand Down
2 changes: 1 addition & 1 deletion p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func startExecNodeStack() (*node.Node, error) {
// decode the config
confEnv := os.Getenv(envNodeConfig)
if confEnv == "" {
return nil, fmt.Errorf("missing " + envNodeConfig)
return nil, fmt.Errorf("missing %s", envNodeConfig)
}
var conf execNodeConfig
if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package p2p
import (
"bytes"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -152,7 +153,7 @@ func readProtocolHandshake(rw MsgReader) (*protoHandshake, error) {
return nil, err
}
if msg.Size > baseProtocolMaxMsgSize {
return nil, fmt.Errorf("message too big")
return nil, errors.New("message too big")
}
if msg.Code == discMsg {
// Disconnect before protocol handshake is valid according to the
Expand Down

0 comments on commit e883d29

Please sign in to comment.