-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgetrib.py
163 lines (146 loc) · 5.55 KB
/
getrib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#! -*- coding:utf-8 -*-
from __future__ import print_function
from __future__ import unicode_literals
from builtins import str
import grpc
from google.protobuf.any_pb2 import Any
import gobgp_pb2
import gobgp_pb2_grpc
import attribute_pb2
import sys
import socket
import ipaddress
import re
from functools import cmp_to_key
import argparse
_AF_NAME = dict()
_AF_NAME[4] = gobgp_pb2.Family(afi=gobgp_pb2.Family.AFI_IP, safi=gobgp_pb2.Family.SAFI_UNICAST)
_AF_NAME[6] = gobgp_pb2.Family(afi=gobgp_pb2.Family.AFI_IP6, safi=gobgp_pb2.Family.SAFI_UNICAST)
_TT = dict()
_TT['global'] = gobgp_pb2.GLOBAL
_TT['in'] = gobgp_pb2.ADJ_IN
_TT['out'] = gobgp_pb2.ADJ_OUT
_ATTR_ORIGIN = dict()
_ATTR_ORIGIN[0] = "igp"
_ATTR_ORIGIN[1] = "egp"
_ATTR_ORIGIN[2] = "incomplete"
def compare_destinations(af):
# sort nlri
afcls = getattr(ipaddress, "IPv"+str(af)+"Network")
def func(a, b):
net_a = afcls(a.destination.prefix)
net_b = afcls(b.destination.prefix)
if not net_a.network_address == net_b.network_address:
return int(net_a.network_address) - int(net_b.network_address)
return int(net_a.netmask) - int(net_b.netmask)
return func
def pb_msg_attrs(m):
# return list of attr names
slice_ind = -1 * len('_FIELD_NUMBER')
attrs = [ attr[:slice_ind].lower() for attr in dir(m) if attr.endswith('_FIELD_NUMBER') ]
if attrs:
return attrs
# temporary workaround for an issue with python3 generated message classes that include no field number constants.
return [ attr for attr in dir(m) if re.match(r'[a-z]', attr) ]
def print_path(path):
# print each Path message, unpack its attributes if appropriate
nlri = attribute_pb2.IPAddressPrefix()
path.nlri.Unpack(nlri)
print("{}/{}".format(nlri.prefix, nlri.prefix_len))
pattrs = []
for attr_name in pb_msg_attrs(path):
if attr_name == "nlri":
continue
if attr_name == "pattrs":
for pattr in path.pattrs:
pattr_name = pattr.type_url.split(".")[-1]
pattr_cls = getattr(attribute_pb2, pattr_name, None)
if pattr_cls:
pattr_obj = pattr_cls()
pattr.Unpack(pattr_obj)
for k in pb_msg_attrs(pattr_obj):
if k == "origin":
# print corresponding value
v = _ATTR_ORIGIN.get(getattr(pattr_obj, k, -1))
elif k == "communities":
# convert to a list of colon-delimited sets
v = [ str("{}:{}".format(int("0xffff",16)&c>>16, int("0xffff",16)&c)) for c in getattr(pattr_obj, k, []) ]
elif k == "nlris":
# supress output for now
continue
else:
# printing as is
v = str(getattr(pattr_obj, k, "")).strip().replace("\n", ", ")
print(" {}: {}".format(k, v))
else:
print(" {}: {}".format(attr_name, str(getattr(path, attr_name, "")).strip().replace("\n", ", ")))
def invalidate(k, v):
print("invalid {}: {}".format(k, v), file=sys.stderr)
sys.exit(-1)
def run(af, gobgpd_addr, timeout, *network, **kw):
# family
try:
family = _AF_NAME[af]
except:
invalidate("address family", af)
# table_type, name
if kw.get("rib_in_neighbor"):
table_type = _TT['in']
name = kw["rib_in_neighbor"]
elif kw.get("rib_out_neighbor"):
table_type = _TT['out']
name = kw["rib_out_neighbor"]
else:
table_type = _TT["global"]
name = None
# prefixes
prefixes = []
for n in network:
try:
getattr(ipaddress, 'IPv'+str(af)+'Network')(str(n))
prefixes.append(gobgp_pb2.TableLookupPrefix(prefix=n))
except:
invalidate("prefix", n)
channel = grpc.insecure_channel(gobgpd_addr + ":50051")
stub = gobgp_pb2_grpc.GobgpApiStub(channel)
res = stub.ListPath(
gobgp_pb2.ListPathRequest(
table_type=table_type,
name=name,
prefixes=prefixes,
family=family,
),
timeout,
)
destinations = [ d for d in res ]
cmp_func = compare_destinations(af)
destinations.sort(key=cmp_to_key(cmp_func))
for p in [ p for d in destinations for p in d.destination.paths ]:
print_path(p)
def main():
parser = argparse.ArgumentParser()
parser_afg = parser.add_mutually_exclusive_group()
parser_afg.add_argument('-4', action='store_const', dest="af", const=4, help="Address-family ipv4-unicast (default)")
parser_afg.add_argument('-6', action='store_const', dest="af", const=6, help="Address-family ipv6-unicast")
parser_tg = parser.add_mutually_exclusive_group()
parser_tg.add_argument('-l', action='store_true', dest="rib_local", help="Show local rib (default: true)")
parser_tg.add_argument('-i', action='store', dest="rib_in_neighbor", help="Routes received from peer")
parser_tg.add_argument('-o', action='store', dest="rib_out_neighbor", help="Routes advertised to peer")
parser.add_argument('-r', action='store', default="localhost", dest="gobgpd_addr", help="GoBGPd address (default: localhost)")
parser.add_argument('-t', action='store', dest="timeout", type=int, default=1, help="Timeout second (default: 1)")
parser.add_argument('network', action='store', nargs='*')
argopts = parser.parse_args()
try:
for a in ['gobgpd_addr', 'rib_in_neighbor', 'rib_out_neighbor', ]:
if getattr(argopts, a):
socket.gethostbyname(getattr(argopts, a))
except socket.gaierror as e:
invalidate("host", getattr(argopts,a))
run(argopts.af or 4,
argopts.gobgpd_addr,
argopts.timeout,
*argopts.network,
rib_in_neighbor=argopts.rib_in_neighbor,
rib_out_neighbor=argopts.rib_out_neighbor)
if __name__ == '__main__':
main()