Skip to content

Commit

Permalink
Restore IPVS loadbalancers from the database after a restart
Browse files Browse the repository at this point in the history
  • Loading branch information
liorokman committed Aug 4, 2023
1 parent 4de777d commit 43c9809
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 31 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ using this cloud controller provider:
pveum user token add root@pam ccm -privsep=0
```

Set `PROXMOX_API_TOKEN` to the generated token, and set `PROXMOX_USERNAME`
Set `PROXMOX_API_TOKEN` to the generated token, and set `PROXMOX_API_USERNAME`
to the correct username. In this example, the username would be
`root@pam!ccm`

Expand Down
4 changes: 4 additions & 0 deletions cmd/lbmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func main() {
if err != nil {
log.Fatalf("failed starting the loadbalancer manager: %v", err)
}
defer lbServer.Close()
if err := lbServer.Restore(); err != nil {
log.Fatalf("failed restoring the loadbalancer configuration: %v", err)
}
opts := []grpc.ServerOption{
grpc.Creds(creds),
}
Expand Down
108 changes: 78 additions & 30 deletions internal/loadbalancer/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type loadBalancerServer struct {
type Intf interface {
LoadBalancerServer
Close()
Restore() error
}

func (l *loadBalancerServer) Close() {
Expand Down Expand Up @@ -135,6 +136,36 @@ func toIPVSService(l *LoadBalancerInformation, srcPort int32, protocol Protocol)
}
}

// Restore applies all the settings stored in the database to the current state
func (l *loadBalancerServer) Restore() error {
iterOptions := rosedb.DefaultIteratorOptions
iter := l.db.NewIterator(iterOptions)
defer iter.Close()
for ; iter.Valid(); iter.Next() {
val, err := iter.Value()
if err != nil {
return err
}
var currLB LoadBalancerInformation
if err := json.Unmarshal(val, &currLB); err != nil {
return err
}
if err := l.addAddrAlias(currLB.IpAddr); err != nil {
return err
}
for port, targets := range currLB.Targets {
if targets != nil {
for _, target := range targets.Target {
if _, err := l.mapTarget(&currLB, port, target); err != nil {
return err
}
}
}
}
}
return nil
}

// Get all information about all defined Load Balancers
func (l *loadBalancerServer) GetLoadBalancers(_ *emptypb.Empty, stream LoadBalancer_GetLoadBalancersServer) error {
iterOptions := rosedb.DefaultIteratorOptions
Expand Down Expand Up @@ -187,15 +218,27 @@ func (l *loadBalancerServer) Create(ctx context.Context, clb *CreateLoadBalancer
IpAddr: *clb.IpAddr,
Targets: map[int32]*TargetList{},
}
log.Printf("slb is %+v", lbInfo)

externalIP := net.ParseIP(lbInfo.IpAddr)
if !l.ipam.Contains(externalIP) {
return nil, fmt.Errorf("Requested IP %s is not contained in the configured CIDR (%s)", *clb.IpAddr, l.cidr)
// Create a new alias on the interface that matches the required IP
// There could already be such an interface, since there might be another LB with the same IP and different port
if err := l.addAddrAlias(lbInfo.IpAddr); err != nil {
log.Printf("error adding an alias for the loadbalancer %s: %s", lbInfo.Name, err.Error())
return nil, err
}

// 1. Create a new alias on the interface that matches the required IP
// There could already be such an interface, since there might be another LB with the same IP and different port
lbInfoData, err := json.Marshal(lbInfo)
if err != nil {
return nil, err
}
err = l.db.Put([]byte(clb.Name), lbInfoData)
return lbInfo, err
}

func (l *loadBalancerServer) addAddrAlias(addrString string) error {
externalIP := net.ParseIP(addrString)
if !l.ipam.Contains(externalIP) {
return fmt.Errorf("Requested IP %s is not contained in the configured CIDR (%s)", addrString, l.cidr)
}
requiredAddr := &netlink.Addr{
IPNet: &net.IPNet{
IP: externalIP,
Expand All @@ -213,16 +256,10 @@ func (l *loadBalancerServer) Create(ctx context.Context, clb *CreateLoadBalancer
if !addrFound {
err = l.nlHandle.AddrAdd(l.externalLink, requiredAddr)
if err != nil {
return nil, err
return err
}
}

lbInfoData, err := json.Marshal(lbInfo)
if err != nil {
return nil, err
}
err = l.db.Put([]byte(clb.Name), lbInfoData)
return lbInfo, err
return nil
}

func (l *loadBalancerServer) Delete(ctx context.Context, lbName *LoadBalancerName) (*Error, error) {
Expand Down Expand Up @@ -303,42 +340,53 @@ func (l *loadBalancerServer) AddTarget(ctx context.Context, atr *AddTargetReques
}
}

targetIP := net.ParseIP(atr.Target.DstIP)
ret, err := l.mapTarget(lbInfo, atr.SrcPort, atr.Target)
if err != nil || ret != nil {
return ret, err
}

targetList.Target = append(targetList.Target, atr.Target)
lbInfo.Targets[atr.SrcPort] = targetList

slbData, err := json.Marshal(lbInfo)
if err != nil {
return nil, err
}
if err := l.db.Put([]byte(atr.LbName), slbData); err != nil {
return nil, err
}
return &Error{}, err
}

// mapTarget maps a given target to the provided loadbalancer on the provided
// srcPort
func (l *loadBalancerServer) mapTarget(lbInfo *LoadBalancerInformation, srcPort int32, target *Target) (*Error, error) {
targetIP := net.ParseIP(target.DstIP)
var fam uint16 = syscall.AF_INET
if targetIP.To4() == nil {
fam = syscall.AF_INET6
}
srv := toIPVSService(lbInfo, atr.SrcPort, atr.Target.Protocol)
srv := toIPVSService(lbInfo, srcPort, target.Protocol)
if !l.ipvsHandle.IsServicePresent(srv) {
if err = l.ipvsHandle.NewService(srv); err != nil {
if err := l.ipvsHandle.NewService(srv); err != nil {
return nil, err
}
}
newDestination := &ipvs.Destination{
Address: targetIP,
Port: uint16(atr.Target.DstPort),
Port: uint16(target.DstPort),
ConnectionFlags: ipvs.ConnFwdMasq,
AddressFamily: fam,
Weight: 1,
}
if err := l.ipvsHandle.NewDestination(srv, newDestination); err != nil {
log.Printf("error adding a new destination %+v to service %+v: %+v", atr.Target, lbInfo, err)
log.Printf("error adding a new destination %+v to service %+v: %+v", target, lbInfo, err)
return &Error{
Message: err.Error(),
Code: ErrAddDestinationFailed,
}, nil
}
targetList.Target = append(targetList.Target, atr.Target)
lbInfo.Targets[atr.SrcPort] = targetList

slbData, err := json.Marshal(lbInfo)
if err != nil {
return nil, err
}
if err := l.db.Put([]byte(atr.LbName), slbData); err != nil {
return nil, err
}
return &Error{}, err
return nil, nil
}

func (l *loadBalancerServer) DelTarget(ctx context.Context, dtr *DelTargetRequest) (*Error, error) {
Expand Down

0 comments on commit 43c9809

Please sign in to comment.