Skip to content

Commit

Permalink
addd pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed May 1, 2024
1 parent f374c21 commit 374f5c7
Showing 6 changed files with 159 additions and 10 deletions.
2 changes: 1 addition & 1 deletion exchange/fx_exchange.go
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ func NewFxExchange(h host.Host, ls ipld.LinkSystem, o ...Option) (*FxExchange, e
return nil, err
}
tr := &http.Transport{
DisableKeepAlives: true, // Ensure connections are not reused
DisableKeepAlives: false, // Ensure connections are not reused
MaxIdleConns: 500,
MaxConnsPerHost: 2000,
IdleConnTimeout: 20 * time.Second,
5 changes: 5 additions & 0 deletions exchange/noop_exchange.go
Original file line number Diff line number Diff line change
@@ -41,6 +41,11 @@ func (n NoopExchange) Shutdown(context.Context) error {
return nil
}

func (n NoopExchange) ShutdownIpfs(context.Context) error {
log.Debug("Shut down ipfs noop exchange.")
return nil
}

func (n NoopExchange) IpniNotifyLink(l ipld.Link) {
log.Debugw("IpniNotifyLink noop exchange.", "link", l)
}
119 changes: 119 additions & 0 deletions mobile/blockchain.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"math/big"
"strconv"
"strings"
"time"

"github.com/functionland/go-fula/blockchain"
wifi "github.com/functionland/go-fula/wap/pkg/wifi"
@@ -181,6 +182,124 @@ func (c *Client) ReplicateInPool(cidsBytes []byte, account string, poolID int) [
/////////////////////HARDWARE/////////////////////
//////////////////////////////////////////////////

// PreparePubSub initializes or reinitializes the pubsub system
func (c *Client) PreparePubSub(ctx context.Context) error {
topicName := "fula-global-channel"
// Check if already subscribed and topic is active
if c.sub == nil || c.topic == nil {
var err error
// Join the topic if not already joined or if topic was closed
if c.topic == nil {
c.topic, err = c.ps.Join(topicName)
if err != nil {
return fmt.Errorf("failed to join topic: %v", err)
}
}

// Subscribe to the topic if not already subscribed
c.sub, err = c.topic.Subscribe()
if err != nil {
return fmt.Errorf("failed to subscribe to topic: %v", err)
}
}
return nil
}

// ShutdownPubSub cleanly closes the pubsub topic and subscription
func (c *Client) ShutdownPubSub() error {
if c.sub != nil {
c.sub.Cancel()
c.sub = nil
}
if c.topic != nil {
if err := c.topic.Close(); err != nil {
return fmt.Errorf("failed to close topic: %v", err)
}
c.topic = nil
}
return nil
}

// This function should be implemented to listen to the responses and match the correct response to the request
func (c *Client) waitForResponse(ctx context.Context, responseType string) ([]byte, error) {
// This channel might be part of the client struct or managed globally depending on your architecture
responseChan := make(chan []byte, 1)
go func() {
for {
select {
case <-ctx.Done():
return
default:
msg, err := c.sub.Next(ctx)
if err != nil {
continue // Handle error or break as needed
}
if msg.ReceivedFrom != c.bloxPid {
continue
}
var response struct {
Type string `json:"type"`
Data []byte `json:"data"`
PeerID string `json:"peerID"`
}
if err := json.Unmarshal(msg.Data, &response); err != nil {
continue // Log or handle the error
}
if response.Type == responseType && response.PeerID == c.h.ID().String() {
responseChan <- response.Data
return
}
}
}
}()

select {
case data := <-responseChan:
return data, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

// BloxFreeSpace requests the blox avail/used free space information.
func (c *Client) BloxFreeSpaceIpfs() ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Ensure that the pubsub system is ready
if err := c.PreparePubSub(ctx); err != nil {
return nil, fmt.Errorf("failed to prepare pubsub: %w", err)
}

// Create a message to request free space, targeted at the blox peer
request := struct {
TargetPeerID string `json:"targetPeerID"`
Command string `json:"command"`
}{
TargetPeerID: c.bloxPid.String(), // Assuming c.bloxPid is the peer ID of the blox
Command: "requestFreeSpace",
}

requestData, err := json.Marshal(request)
if err != nil {
return nil, fmt.Errorf("failed to marshal request data: %w", err)
}

// Publish the request
if err := c.topic.Publish(ctx, requestData); err != nil {
return nil, fmt.Errorf("failed to publish request: %w", err)
}

// Listen for a response
// This part assumes your ListenForMessages handles responses and can return them
response, err := c.waitForResponse(ctx, "freeSpaceResponse")
if err != nil {
return nil, err
}

return response, nil
}

// BloxFreeSpace requests the blox avail/used free space information.
func (c *Client) BloxFreeSpace() ([]byte, error) {
ctx := context.TODO()
18 changes: 15 additions & 3 deletions mobile/client.go
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
@@ -70,6 +71,9 @@ type Client struct {
relays []string
ipfsAPI iface.CoreAPI
ipfsNode *core.IpfsNode
ps *pubsub.PubSub
topic *pubsub.Topic
sub *pubsub.Subscription
}

type DatastoreConfigSpec struct {
@@ -92,9 +96,9 @@ type Child struct {
Compression string `json:"compression,omitempty"`
}

func CustomHostOption(opts []libp2p.Option) kubolibp2p.HostOption {
func CustomHostOption(h host.Host) kubolibp2p.HostOption {
return func(id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) {
return libp2p.New(opts...)
return h, nil
}
}

@@ -242,9 +246,11 @@ func CreateCustomRepo(ctx context.Context, cfg *Config, basePath string, h host.
"/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
"/ip4/104.131.131.82/udp/4001/quic-v1/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
"/dns4/1.pools.functionyard.fula.network/tcp/9096/p2p/12D3KooWS79EhkPU7ESUwgG4vyHHzW9FDNZLoWVth9b5N5NSrvaj",
}
conf.Swarm.RelayService.Enabled = 1
conf.Discovery.MDNS.Enabled = true
conf.Pubsub.Enabled = 1

// Initialize the repo with the configuration

@@ -301,11 +307,14 @@ func (c *Client) ConnectToBlox() error {
}

func (c *Client) ConnectToBloxIpfs() error {
ctx := context.TODO()
if _, ok := c.ex.(exchange.NoopExchange); ok {
return nil
}

return c.ipfsAPI.Swarm().Connect(context.TODO(), c.h.Peerstore().PeerInfo(c.bloxPid))
err := c.ipfsAPI.Swarm().Connect(ctx, c.h.Peerstore().PeerInfo(c.bloxPid))
return err

}

// ID returns the libp2p peer ID of the client.
@@ -636,13 +645,16 @@ func (c *Client) ShutdownIpfs() error {
hErr := c.h.Close()
fErr := c.Flush()
dsErr := c.ds.Close()
psErr := c.ShutdownPubSub()
switch {
case hErr != nil:
return hErr
case fErr != nil:
return fErr
case dsErr != nil:
return dsErr
case psErr != nil:
return psErr
default:
return xErr
}
13 changes: 12 additions & 1 deletion mobile/config.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,8 @@ import (
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
@@ -361,7 +363,7 @@ func (cfg *Config) initIpfs(ctx context.Context, mc *Client) error {
ipfsConfig := &core.BuildCfg{
Online: true,
Permanent: false,
Host: CustomHostOption(hopts),
Host: CustomHostOption(mc.h),
Routing: kubolibp2p.DHTOption,
Repo: repo,
}
@@ -373,10 +375,12 @@ func (cfg *Config) initIpfs(ctx context.Context, mc *Client) error {
log.Print("mc ipfsNode created")
// ipfsHostId := ipfsNode.PeerHost.ID()
// ipfsId := ipfsNode.Identity.String()

ipfsAPI, err := coreapi.NewCoreAPI(ipfsNode)
if err != nil {
panic(fmt.Errorf("failed to create IPFS API: %w", err))
}

mc.ipfsNode = ipfsNode
mc.ipfsAPI = ipfsAPI
log.Print("mc ipfsAPI created")
@@ -439,6 +443,13 @@ func (cfg *Config) initIpfs(ctx context.Context, mc *Client) error {
}
}
}

ps, err := pubsub.NewGossipSub(context.Background(), mc.h)
if err != nil {
return fmt.Errorf("failed to create pubsub: %w", err)
}
mc.ps = ps

ctx3, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel() // Ensure the context cancel function is called to free resources

12 changes: 7 additions & 5 deletions mobile/example_test.go
Original file line number Diff line number Diff line change
@@ -304,9 +304,9 @@ func Example_poolExchangeDagBetweenClientBlox() {
mcfg.StorePath = "C:/Users/ehsan/.tmp/datastore"
mcfg.ConfigPath = "C:/Users/ehsan/.tmp"
mcfg.AllowTransientConnection = true
bloxAddrString := "/ip4/70.34.208.109/udp/4001/quic-v1/p2p/12D3KooWQto3ReEkHtMNByVsSnMUWxRZyxk1Ni4yfMuizmYNmcJ9"
bloxAddrString := "/ip4/192.168.1.4/udp/4001/quic-v1/webtransport/certhash/uEiAkTJtNLmP1bkTg8_yWc7mHAM1i_zZ8RcVfxakkhkKjRg/certhash/uEiAyxCIPgYELZLjtX4JsWS7SlYV8XK78N9QqXhLBcAg0QQ"
bloxId := "12D3KooWDaT8gS2zGMLGBKmW1mKhQSHxYeEX3Fr3VSjuPzmjyfZC"
mcfg.BloxAddr = bloxAddrString + "/p2p-circuit/p2p/" + bloxId
mcfg.BloxAddr = bloxAddrString + "/p2p/" + bloxId
mcfg.PoolName = "1"
mcfg.Exchange = bloxAddrString
mcfg.BlockchainEndpoint = "127.0.0.1:4004"
@@ -342,14 +342,16 @@ func Example_poolExchangeDagBetweenClientBlox() {
panic(err)
}
fmt.Println("connected to blox")
/*_, err = cIpfs1.BloxFreeSpace()

_, err = cIpfs1.BloxFreeSpaceIpfs()
if err != nil {
log.Error("An Error occurred with panic")
panic(err)
}*/
}

// Output:
//Private Key Bytes: 08011240eefe2dafe94055ecd466687390f8ba331cce1f3b65432bd4a6fc2b8a304f0deae19005ae027ef4fdcd1eb327f8b1c79a365a90cbf6f07e7022ba5d3564eee1ec
// first client created with ID: 12D3KooWQzsGtYKX62PFvTeh67H7jdU7QqcMJgwiEzKFJCbqrKw112D3KooWQzsGtYKX62PFvTeh67H7jdU7QqcMJgwiEzKFJCbqrKw1
// first client created with ID: 12D3KooWQzsGtYKX62PFvTeh67H7jdU7QqcMJgwiEzKFJCbqrKw1
// connected to blox
// Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// Original Val is: some raw data

0 comments on commit 374f5c7

Please sign in to comment.