Skip to content

Commit

Permalink
support image(video) message and laserscan, phase 1 (#4)
Browse files Browse the repository at this point in the history
* Start refactoring sender to support image(video) message and laserscan

* Finish receiver modification

* I don't know why that exists

* start dispatch goroutine in ros_channel

* Add serialization and deserialization of sensor msg and send them
through data channel

* Add consts and update config

* fix test fail

* debug ci

* debug ci, this should fail

* debug ci

* debug ci

* fix: go list fails in shallow clone

* debug ci, this should pass
  • Loading branch information
3DRX authored Dec 7, 2024
1 parent 7d11f06 commit 9656ba5
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 102 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ jobs:
restore-keys: |
${{ runner.os }}-go-
- name: Source ROS
run: |
- name: Install Go dependencies
run: go mod download

- name: Run tests
shell: bash
run: |
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ webrtc-ros-bridge-client: receiver/peer_connection_channel/libvp8decoder.so rclg
receiver/peer_connection_channel/libvp8decoder.so: receiver/peer_connection_channel/vp8_decoder.c receiver/peer_connection_channel/vp8_decoder.h
cd receiver/peer_connection_channel && gcc -shared -o libvp8decoder.so -fPIC vp8_decoder.c $(pkg-config --cflags --libs vpx) $(CFLAGS) $(LDFLAGS)

receiver/ros_channel/msgs cgo-flags.env:
rclgo_gen cgo-flags.env:
go run github.com/tiiuae/rclgo/cmd/rclgo-gen generate -d rclgo_gen

test: rclgo_gen cgo-flags.env
CGO_CFLAGS=$(CGO_CFLAGS) CGO_LDFLAGS=$(CGO_LDFLAGS) go test `go list ./... | grep -v "/rclgo_gen"`
CGO_CFLAGS=$(CGO_CFLAGS) CGO_LDFLAGS=$(CGO_LDFLAGS) go test `go list -buildvcs=false ./... | grep -v "/rclgo_gen"`

clean:
rm -rf wrb peer_connection_channel/libvp8decoder.so ros_channel/msgs cgo-flags.env rclgo_gen
Expand Down
29 changes: 11 additions & 18 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"os"
"regexp"
"strings"

"github.com/3DRX/webrtc-ros-bridge/consts"
)

type TopicConfig struct {
Expand All @@ -33,7 +35,7 @@ func isTopicNameValid(topic_name *string) bool {
return re.MatchString(*topic_name)
}

func isValidIp(addr *string) bool {
func isValidAddr(addr *string) bool {
// Try to separate hostname and port
host, _, err := net.SplitHostPort(*addr)
if err != nil {
Expand Down Expand Up @@ -103,30 +105,21 @@ func isValidHostname(host string) bool {
}

func checkCfg(c *Config) error {
if c == nil {
return fmt.Errorf("get a null pointer!")
}
res := true
// Mode
res = res && (c.Mode == "sender" || c.Mode == "receiver")
if res == false {
if !(c.Mode == "sender" || c.Mode == "receiver") {
return fmt.Errorf("wrong Mode syntax, expected \"sender\" or \"receiver\", but find \"" + c.Mode + "\"")
}
// ip addr
res = res && isValidIp(&c.Addr)
if res == false {
if !isValidAddr(&c.Addr) {
return fmt.Errorf("invalid ipv4 addr \"" + c.Addr + "\"")
}
// Topic
for _, topic := range c.Topics {
res = res && isTopicNameValid(&(topic.NameIn))
res = res && isTopicNameValid(&(topic.NameOut))
if res == false {
if !isTopicNameValid(&topic.NameIn) || !isTopicNameValid(&topic.NameOut) {
return fmt.Errorf("wrong topic name format: \"" + topic.NameIn + "\" or \"" + topic.NameOut + "\"")
}
res = res && (topic.Type == "sensor_msgs/msg/Image")
if !res {
return fmt.Errorf("wrong topic msg type, expected \"sensor_msgs/msg/Image\", but find \"" + topic.Type + "\"")
switch topic.Type {
case consts.MSG_IMAGE, consts.MSG_LASER_SCAN:
// check passed
default:
return fmt.Errorf("unsupported topic type: \"" + topic.Type + "\"")
}
}
return nil
Expand Down
6 changes: 6 additions & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package consts

const (
MSG_IMAGE = "sensor_msgs/msg/Image"
MSG_LASER_SCAN = "sensor_msgs/msg/LaserScan"
)
38 changes: 12 additions & 26 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@ package main

import (
"github.com/3DRX/webrtc-ros-bridge/config"
sensor_msgs_msg "github.com/3DRX/webrtc-ros-bridge/rclgo_gen/sensor_msgs/msg"
recv_peerconnectionchannel "github.com/3DRX/webrtc-ros-bridge/receiver/peer_connection_channel"
recv_roschannel "github.com/3DRX/webrtc-ros-bridge/receiver/ros_channel"
recv_signalingchannel "github.com/3DRX/webrtc-ros-bridge/receiver/signaling_channel"
send_peerconnectionchannel "github.com/3DRX/webrtc-ros-bridge/sender/peer_connection_channel"
send_roschannel "github.com/3DRX/webrtc-ros-bridge/sender/ros_channel"
send_signalingchannel "github.com/3DRX/webrtc-ros-bridge/sender/signaling_channel"
"github.com/pion/webrtc/v3"
"github.com/tiiuae/rclgo/pkg/rclgo/types"
)

func videoReceiver(cfg *config.Config, topicIdx int) {
func receiver(cfg *config.Config) {
messageChan := make(chan types.Message)
sdpChan := make(chan webrtc.SessionDescription)
sdpReplyChan := make(chan webrtc.SessionDescription)
candidateChan := make(chan webrtc.ICECandidateInit)
imgChan := make(chan *sensor_msgs_msg.Image)
topicIdx := 0 // TODO: refactor similar to sender
sc := recv_signalingchannel.InitSignalingChannel(
cfg,
topicIdx,
Expand All @@ -29,28 +30,27 @@ func videoReceiver(cfg *config.Config, topicIdx int) {
sdpReplyChan,
candidateChan,
sc.SignalCandidate,
imgChan,
messageChan,
)
rc := recv_roschannel.InitROSChannel(
cfg,
topicIdx,
imgChan,
messageChan,
)
go sc.Spin()
go pc.Spin()
go rc.Spin()
select {}
}

func videoSender(cfg *config.Config, topicIdx int) {
imgChan := make(chan *sensor_msgs_msg.Image)
func sender(cfg *config.Config) {
messageChan := make(chan types.Message)
sendSDPChan := make(chan webrtc.SessionDescription)
recvSDPChan := make(chan webrtc.SessionDescription)
sendCandidateChan := make(chan webrtc.ICECandidateInit)
recvCandidateChan := make(chan webrtc.ICECandidateInit)
sc := send_signalingchannel.InitSignalingChannel(
cfg,
topicIdx,
sendSDPChan,
recvSDPChan,
sendCandidateChan,
Expand All @@ -61,12 +61,11 @@ func videoSender(cfg *config.Config, topicIdx int) {
actions := sc.GetActions()
rc := send_roschannel.InitROSChannel(
cfg,
topicIdx,
imgChan,
messageChan,
)
go rc.Spin()
pc := send_peerconnectionchannel.InitPeerConnectionChannel(
imgChan,
messageChan,
sendSDPChan,
recvSDPChan,
sendCandidateChan,
Expand All @@ -80,23 +79,10 @@ func videoSender(cfg *config.Config, topicIdx int) {
func main() {
cfg := config.LoadCfg()
if cfg.Mode == "receiver" {
for i, t := range cfg.Topics {
if t.Type == "sensor_msgs/msg/Image" {
go videoReceiver(cfg, i)
} else {
panic("unsupported type")
}
}
receiver(cfg)
} else if cfg.Mode == "sender" {
for i, t := range cfg.Topics {
if t.Type == "sensor_msgs/msg/Image" {
go videoSender(cfg, i)
} else {
panic("unsupported type")
}
}
sender(cfg)
} else {
panic("unsupported mode")
}
select {}
}
30 changes: 26 additions & 4 deletions receiver/peer_connection_channel/peer_connection_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/tiiuae/rclgo/pkg/rclgo"
"github.com/tiiuae/rclgo/pkg/rclgo/types"
)

type PeerConnectionChannel struct {
Expand All @@ -17,7 +19,7 @@ type PeerConnectionChannel struct {
candidateChan <-chan webrtc.ICECandidateInit
peerConnection *webrtc.PeerConnection
signalCandidate func(c webrtc.ICECandidateInit) error
imgChan chan<- *sensor_msgs_msg.Image
messageChan chan<- types.Message
}

func registerHeaderExtensionURI(m *webrtc.MediaEngine, uris []string) {
Expand All @@ -40,7 +42,7 @@ func InitPeerConnectionChannel(
sdpReplyChan chan<- webrtc.SessionDescription,
candidateChan <-chan webrtc.ICECandidateInit,
signalCandidate func(c webrtc.ICECandidateInit) error,
imgChan chan<- *sensor_msgs_msg.Image,
messageChan chan<- types.Message,
) *PeerConnectionChannel {
m := &webrtc.MediaEngine{}
// Register VP8
Expand Down Expand Up @@ -84,7 +86,7 @@ func InitPeerConnectionChannel(
candidateChan: candidateChan,
peerConnection: peerConnection,
signalCandidate: signalCandidate,
imgChan: imgChan,
messageChan: messageChan,
}
}

Expand Down Expand Up @@ -117,7 +119,7 @@ func handleSignalingMessage(pc *PeerConnectionChannel) {
}

func (pc *PeerConnectionChannel) Spin() {
webmSaver := newWebmSaver(pc.imgChan)
webmSaver := newWebmSaver(pc.messageChan)
_, err := pc.peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
Expand Down Expand Up @@ -156,5 +158,25 @@ func (pc *PeerConnectionChannel) Spin() {
webmSaver.PushVP8(rtp)
}
})
pc.peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
d.OnMessage(func(msg webrtc.DataChannelMessage) {
serializedMsg := msg.Data
sensorMsg, err := rclgo.Deserialize(serializedMsg, sensor_msgs_msg.LaserScanTypeSupport)
if err != nil {
slog.Error("failed to deserialize sensor message", "error", err)
return
}
laserScanMsg, ok := sensorMsg.(*sensor_msgs_msg.LaserScan)
if !ok {
slog.Error("failed to cast sensor message to LaserScan")
return
}
slog.Info("datachannel message", "frame ID", laserScanMsg.Header.FrameId)
})
d.OnOpen(func() {
slog.Info("datachannel open", "label", d.Label(), "ID", d.ID())
d.SendText("Hello from receiver!")
})
})
go handleSignalingMessage(pc)
}
7 changes: 3 additions & 4 deletions receiver/peer_connection_channel/webm_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/pion/webrtc/v3/pkg/media/samplebuilder"
"github.com/tiiuae/rclgo/pkg/rclgo/types"
)

type WebmSaver struct {
Expand All @@ -26,10 +27,10 @@ type WebmSaver struct {
lastVideoTimestamp uint32
codecCtx C.vpx_codec_ctx_t
codecCreated bool
imgChan chan<- *sensor_msgs_msg.Image
imgChan chan<- types.Message
}

func newWebmSaver(imgChan chan<- *sensor_msgs_msg.Image) *WebmSaver {
func newWebmSaver(imgChan chan<- types.Message) *WebmSaver {
return &WebmSaver{
vp8Builder: samplebuilder.New(200, &codecs.VP8Packet{}, 90000),
h264JitterBuffer: jitterbuffer.New(),
Expand Down Expand Up @@ -87,8 +88,6 @@ func (s *WebmSaver) PushVP8(rtpPacket *rtp.Packet) {
}
}

func vpxToROSImage()

func (s *WebmSaver) InitWriter(width, height int) {
if errCode := C.init_decoder(&s.codecCtx, C.uint(width), C.uint(height)); errCode != 0 {
slog.Error("failed to initialize decoder", "error", errCode)
Expand Down
33 changes: 26 additions & 7 deletions receiver/ros_channel/ros_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,46 @@ import (
"github.com/3DRX/webrtc-ros-bridge/config"
sensor_msgs_msg "github.com/3DRX/webrtc-ros-bridge/rclgo_gen/sensor_msgs/msg"
"github.com/tiiuae/rclgo/pkg/rclgo"
"github.com/tiiuae/rclgo/pkg/rclgo/types"
)

type ROSChannel struct {
imgChan <-chan *sensor_msgs_msg.Image
cfg *config.Config
topicIdx int
chanDispatcher func()
imgChan <-chan *sensor_msgs_msg.Image
sensorChan <-chan types.Message
cfg *config.Config
topicIdx int
}

func InitROSChannel(
cfg *config.Config,
topicIdx int,
imgChan <-chan *sensor_msgs_msg.Image,
messageChan <-chan types.Message,
) *ROSChannel {
imgChan := make(chan *sensor_msgs_msg.Image, 10)
sensorChan := make(chan types.Message, 10)
return &ROSChannel{
cfg: cfg,
topicIdx: topicIdx,
imgChan: imgChan,
cfg: cfg,
topicIdx: topicIdx,
imgChan: imgChan,
sensorChan: sensorChan,
chanDispatcher: func() {
for {
msg := <-messageChan
switch msg.(type) {
case *sensor_msgs_msg.Image:
imgChan <- msg.(*sensor_msgs_msg.Image)
default:
sensorChan <- msg
}
}
},
}
}

func (r *ROSChannel) Spin() {
go r.chanDispatcher()

err := rclgo.Init(nil)
if err != nil {
panic(err)
Expand Down
Loading

0 comments on commit 9656ba5

Please sign in to comment.