-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathfluxcap.c
2071 lines (1752 loc) · 57.6 KB
/
fluxcap.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include "fluxcap.h"
/*
* fluxcap: a network tap replication and aggregation tool
*
*/
struct mmsghdr bss_msgv[BATCH_PKTS];
struct {
int verbose;
char *prog;
enum {mode_none, mode_transmit, mode_receive, mode_create, mode_watch} mode;
char *file;
char dev[MAX_NIC];
unsigned long ticks;
int vlan;
int pass_vlan;
int tail;
int fd;
int tx_fd;
int rx_fd;
int signal_fd;
int timer_fd;
int epoll_fd;
char pkt[MAX_PKT];
struct shr *ring;
size_t size; /* ring create size (-cr), or snaplen (-rx/-tx) */
struct encap encap;
struct itimerspec timer;
uint16_t ip_id; /* for implementing IP fragmentation when */
int mtu; /* using gre encapsulation */
UT_vector /* of ptr */ *watch_rings;
UT_vector /* of utstring */ *watch_names;
UT_vector /* of struct ww */ *watch_win;
UT_string *tmp;
struct timeval now;
struct bb bb; /* output shr ring batch buffer; accumulates til shr_writev */
struct bb rb; /* input shr ring batch buffer; accepts many via shr_readv */
struct bb pb; /* packet buffer (Special); faux bb wrapping kernel ring */
/* fields below are for packet input from AF_PACKET socket */
struct tpacket_req req; /* linux/if_packet.h */
unsigned ring_block_sz; /* see comments in initialization below */
unsigned ring_block_nr; /* number of blocks of sz above */
unsigned ring_frame_sz; /* snaplen */
unsigned ring_curr_idx; /* slot index in ring buffer */
unsigned ring_frame_nr; /* redundant, total frame count */
int strip_vlan; /* strip VLAN on rx if present (boolean) */
int drop_pct; /* sampling % 0 (keep all)-100(drop all) */
int use_tx_ring; /* 0 = sendto-based tx; 1=packet mmap ring-based tx */
int bypass_qdisc_on_tx; /* bypass kernel qdisc layer, more risk of loss */
struct fluxcap_stats stats; /* used to periodically update rx/rd stats */
int keep; /* in mode_create, keep existing ring if present */
int losing;
struct bb gb; /* used in gre rx for recvmmsg */
struct mmsghdr *msgv; /* used in gre rx for recvmmsg */
} cfg = {
.fd = -1,
.tx_fd = -1,
.rx_fd = -1,
.signal_fd = -1,
.timer_fd = -1,
.epoll_fd = -1,
.ring_block_sz = 1 << 22, /*4 mb; want powers of two due to kernel allocator*/
.ring_block_nr = 64,
.ring_frame_sz = 1 << 11, /* 2048 for MTU & header, divisor of ring_block_sz*/
.timer = {
.it_value = { .tv_sec = 0, .tv_nsec = 1 },
.it_interval = { .tv_sec = 0, .tv_nsec = 1000000000UL / TIMER_HZ },
},
.msgv = bss_msgv,
};
extern UT_mm bb_mm;
UT_mm ww_mm = { .sz = sizeof(struct ww), };
UT_mm _utmm_ptr = {.sz = sizeof(void*)};
UT_mm* utmm_ptr = &_utmm_ptr;
/* signals that we'll accept via signalfd in epoll */
int sigs[] = {SIGHUP,SIGTERM,SIGINT,SIGQUIT,SIGALRM};
void usage() {
fprintf(stderr,
"usage: %s [-cr|-tx|-rx|-io] [options] <ring>\n"
"\n"
" create ring(s): -cr -s <size>[k|m|g|t] <ring> ...\n"
" transmit: -tx -i <eth> <ring>\n"
" receive: -rx -i <eth> <ring>\n"
" i/o view: -io <ring> ...\n"
"\n"
"Encapsulation modes:\n"
" -tx -E gretap:<host> [-K <key>] <ring> (GRETAP send)\n"
" -rx -E gretap[:<ip>]> [-K <key>] [-i <eth>] <ring> (GRETAP recv)\n"
" -tx -E gre:<host> [-K <key>] <ring> (GRE send)\n"
" -tx -E vxlan:<host> [-K <VNI>] <ring> (VXLAN send)\n"
" where:\n"
" <key> GRE key/dotted quad (optional) [rx/tx]\n"
" <ip> binds a local IP (optional) [rx]\n"
" <eth> binds a local NIC (optional) [rx]\n"
"\n"
"Other options:\n"
" -f 'vlan n' (accept packets tagged VLAN n) [tx]\n"
" -V <vlan> (inject VLAN tag) [rx/tx]\n"
" -Q (remove VLAN tag) [rx]\n"
" -d <percent> (downsample to <0-99>%% [rx/tx]\n"
" -s <length> (truncate at length) [rx/tx]\n"
" -D <n> (trim n tail bytes) [rx/tx]\n"
" -R (tpacket-based tx) [tx]\n"
" -q (bypass qdisc layer) [tx]\n"
" -v (verbose)\n"
"\n"
" Kernel buffer options (TPACKET_V2) [rx/tx]\n"
" Defaults apply if left unspecified. To use these options\n"
" the block size must be a multiple of the system page size,\n"
" and be small since it consumes physically contiguous pages.\n"
" The number of blocks can be large. Their product is the buffer\n"
" capacity. The frame size must evenly divide the block size.\n"
" The parameters are checked to satisfy these constraints.\n"
" The frame size is for one packet (with overhead) so it should\n"
" exceed the MTU for full packet handling without truncation.\n"
" -Z <frame-size> (max frame size) [2048]\n"
" -B <num-blocks> (number of blocks) [64])\n"
" -S <block-size> (block size log2) [22] (4mb)\n"
"\n",
cfg.prog);
fprintf(stderr, "fluxcap version: %s\n", FLUXCAP_VERSION);
exit(-1);
}
void hexdump(char *buf, size_t len) {
size_t i,n=0;
unsigned char c;
while(n < len) {
fprintf(stderr,"%08x ", (int)n);
for(i=0; i < 16; i++) {
c = (n+i < len) ? buf[n+i] : 0;
if (n+i < len) fprintf(stderr,"%.2x ", c);
else fprintf(stderr, " ");
}
for(i=0; i < 16; i++) {
c = (n+i < len) ? buf[n+i] : ' ';
if (c < 0x20 || c > 0x7e) c = '.';
fprintf(stderr,"%c",c);
}
fprintf(stderr,"\n");
n += 16;
}
}
int new_epoll(int events, int fd) {
int rc;
struct epoll_event ev;
memset(&ev,0,sizeof(ev)); // placate valgrind
ev.events = events;
ev.data.fd= fd;
rc = epoll_ctl(cfg.epoll_fd, EPOLL_CTL_ADD, fd, &ev);
if (rc == -1) {
fprintf(stderr,"epoll_ctl: %s\n", strerror(errno));
}
return rc;
}
/*
* read_proc
*
* read a complete file from the /proc filesystem
* this is special because its size is not known a priori
* so a read/realloc loop is needed
*
* size into len, returning buffer or NULL on error.
* caller should free the buffer eventually.
*/
char *read_proc(char *file, size_t *len) {
char *buf=NULL, *b, *tmp;
int fd = -1, rc = -1, eof=0;
size_t sz, br=0, l;
ssize_t nr;
/* initial guess at a sufficient buffer size */
sz = 1000;
fd = open(file, O_RDONLY);
if (fd < 0) {
fprintf(stderr,"open: %s\n", strerror(errno));
goto done;
}
while(!eof) {
tmp = realloc(buf, sz);
if (tmp == NULL) {
fprintf(stderr, "out of memory\n");
goto done;
}
buf = tmp;
b = buf + br;
l = sz - br;
do {
nr = read(fd, b, l);
if (nr < 0) {
fprintf(stderr,"read: %s\n", strerror(errno));
goto done;
}
b += nr;
l -= nr;
br += nr;
/* out of space? double buffer size */
if (l == 0) {
sz *= 2;
break;
}
if (nr == 0) eof = 1;
} while (nr > 0);
}
*len = br;
rc = 0;
done:
if (fd != -1) close(fd);
if (rc && buf) { free(buf); buf = NULL; }
return buf;
}
/*
* find start and length of column N (one-based)
* in input buffer buf of length buflen
*
* columns must be space-or-tab delimited
* returns NULL if column not found
*
* the final column may end in newline or eob
*
* col: column index (1-based)
* len: OUTPUT parameter (column length)
* buf: buffer to find columns in
* buflen: length of buf
*
* returns:
* pointer to column N, or NULL
*/
#define ws(x) (((x) == ' ') || ((x) == '\t'))
char *get_col(int col, size_t *len, char *buf, size_t buflen) {
char *b, *start=NULL, *eob;
int num;
eob = buf + buflen;
b = buf;
num = 0; /* column number */
*len = 0; /* column length */
while (b < eob) {
if (ws(*b) && (num == col)) break; /* end of sought column */
if (*b == '\n') break; /* end of line */
if (ws(*b)) *len = 0; /* skip over whitespace */
if ((!ws(*b)) && (*len == 0)) { /* record start of column */
num++;
start = b;
}
if (!ws(*b)) (*len)++; /* increment column length */
b++;
}
if ((*len) && (num == col)) return start;
return NULL;
}
/*
* find route for a given destination IP address
*
* parameters:
* dest_ip: the destination IP address in network order
* interface: char[] to receive the output NIC interface name
* must be at least IF_NAMESIZE bytes long;
* see IF_NAMESIZE in /usr/include/net/if.h
* returns:
* 0 success
* -1 error parsing routing table
* -2 no route found
*
*/
int find_route(uint32_t dest_ip,
char *interface) {
int rc = -1, sc;
char *buf=NULL, *line, *b, *iface, *s_dest, *s_gw, *s_mask;
unsigned mask, dest, gw, best_mask=0, nroutes=0;
size_t len, sz=0, to_eob, iface_len;
buf = read_proc("/proc/net/route", &sz);
if (buf == NULL) goto done;
/* find initial newline; discard header row */
b = buf;
while ((b < buf+sz) && (*b != '\n')) b++;
line = b+1;
while (line < buf+sz) {
to_eob = sz-(line-buf);
s_dest = get_col(2, &len, line, to_eob);
if (s_dest == NULL) goto done;
sc = sscanf(s_dest, "%x", &dest);
if (sc != 1) goto done;
s_mask = get_col(8, &len, line, to_eob);
if (s_mask == NULL) goto done;
sc = sscanf(s_mask, "%x", &mask);
if (sc != 1) goto done;
iface = get_col(1, &iface_len, line, to_eob);
if (iface == NULL) goto done;
/* advance to next line */
b = line;
while ((b < buf+sz) && (*b != '\n')) b++;
line = b+1;
/* does the route apply? */
if ((dest_ip & mask) != dest) continue;
/* know a more specific route? */
if (mask < best_mask) continue;
/* this is the best route so far */
best_mask = mask;
/* copy details of this route */
if (iface_len + 1 > IF_NAMESIZE) goto done;
memcpy(interface, iface, iface_len);
interface[iface_len] = '\0';
nroutes++;
}
rc = nroutes ? 0 : -2;
done:
if (buf) free(buf);
return rc;
}
/* get the MTU for the interface, or -1 on error */
int get_if_mtu(char *eth) {
int fd = -1, sc, rc = -1;
struct ifreq ifr;
fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd == -1) {
fprintf(stderr, "socket: %s\n", strerror(errno));
goto done;
}
strncpy(ifr.ifr_name, eth, sizeof(ifr.ifr_name));
sc = ioctl(fd, SIOCGIFMTU, &ifr);
if (sc < 0) {
fprintf(stderr, "ioctl: %s\n", strerror(errno));
goto done;
}
rc = ifr.ifr_mtu;
done:
if (fd != -1) close(fd);
return rc;
}
int check_ring_parameters(void) {
int rc=-1;
unsigned page_sz;
if (cfg.ring_block_sz % cfg.ring_frame_sz) {
fprintf(stderr,"-S block_sz must be multiple of -F frame_sz\n");
goto done;
}
page_sz = (unsigned)sysconf(_SC_PAGESIZE);
if (cfg.ring_block_sz % page_sz) {
fprintf(stderr,"-S block_sz must be multiple of page_sz %u\n", page_sz);
goto done;
}
if (cfg.ring_frame_sz <= TPACKET2_HDRLEN) {
fprintf(stderr,"-Z frame_sz must exceed %lu\n", TPACKET2_HDRLEN);
goto done;
}
if (cfg.ring_frame_sz % TPACKET_ALIGNMENT) {
fprintf(stderr,"-Z frame_sz must be a multiple of %u\n", TPACKET_ALIGNMENT);
goto done;
}
cfg.ring_frame_nr = (cfg.ring_block_sz / cfg.ring_frame_sz) * cfg.ring_block_nr;
rc = 0;
done:
return rc;
}
/* print the ring capacity in MB and packets
*
* here in userspace, the ring is nothing but a regular flat buffer.
* it is comprised of contiguous slots - all of which have the same size.
*
* in kernel space, the ring is a set of blocks; each block is a number of
* physically contiguous pages. since physically contiguous pages are
* limited, the kernel only gets small allocations of them. it forms the
* blocks into a virtually contiguous buffer for our benefit in user space.
*
* these kernel memory considerations are why the ring is specified as
* a number of blocks (cfg.ring_block_nr) of a given size (cfg.ring_block_sz).
* the other parameter (cfg.ring_frame_sz) is the max size of a packet structure
* (struct tpacket_hdr, struct sockaddr_ll, packet itself, and padding). so
* to deal with full packet data it needs to be the MTU plus all that overhead.
*
* we require block size to be a multiple of frame size, so there are no gaps
* in the userspace view of the packet ring. it is a simple array of slots.
*
*/
void describe_ring(char *label) {
double block_size_mb = cfg.ring_block_sz / (1024.0 * 1024);
double mb = cfg.ring_block_nr * block_size_mb;
fprintf(stderr, "%s: %.1f megabytes (max %u packets)\n",
label, mb, cfg.ring_frame_nr);
if (cfg.verbose) {
double bps = 10000000000.0; /* 10 gigabit/sec network */
double mbytes_per_sec = bps / ( 8 * 1024 * 1024);
double sec = mb / mbytes_per_sec;
fprintf(stderr,
" RING: (%u blocks * %u bytes per block) = %.1f megabytes\n"
" PACKETS: @(%u bytes/packet) = %u packets\n"
" TIME TO QUENCH @ 10Gigabit/s: %.1f seconds\n",
cfg.ring_block_nr, cfg.ring_block_sz, mb,
cfg.ring_frame_sz, cfg.ring_frame_nr, sec);
}
}
/* set up as a GRE receiver */
int setup_rx_encap(void) {
struct sockaddr *sa;
int i, sc, rc = -1;
struct iovec *iov;
socklen_t sz;
cfg.rx_fd = socket(AF_INET, SOCK_RAW, IPPROTO_GRE);
if (cfg.rx_fd == -1) {
fprintf(stderr,"socket: %s\n", strerror(errno));
goto done;
}
/* bind local IP; defaults to INADDR_ANY */
struct sockaddr_in in;
memset(&in, 0, sizeof(in));
in.sin_addr = cfg.encap.dst;
sa = (struct sockaddr*)∈
sz = sizeof(in);
sc = bind(cfg.rx_fd, sa, sz);
if (sc < 0) {
fprintf(stderr, "bind: %s\n", strerror(errno));
goto done;
}
/* bind specific RX NIC if requested */
sz = strlen(cfg.dev);
sc = sz ? setsockopt(cfg.rx_fd, SOL_SOCKET, SO_BINDTODEVICE, cfg.dev, sz) : 0;
if (sc < 0) {
fprintf(stderr, "setsockopt: %s\n", strerror(errno));
goto done;
}
/* set up recvmmsg buffers */
assert(BATCH_SIZE == BATCH_PKTS * MAX_PKT);
assert(cfg.gb.n == BATCH_PKTS * MAX_PKT);
assert(cfg.gb.iov && (cfg.gb.iov->n == BATCH_PKTS));
cfg.gb.iov->i = cfg.gb.iov->n; /* mark slots used */
iov = (struct iovec*)utvector_head(cfg.gb.iov);
for(i=0; i < BATCH_PKTS; i++) {
iov[i].iov_base = cfg.gb.d + i * MAX_PKT;
iov[i].iov_len = MAX_PKT;
cfg.msgv[i].msg_hdr.msg_iov = &iov[i];
cfg.msgv[i].msg_hdr.msg_iovlen = 1;
}
rc = 0;
done:
return rc;
}
/*
* Prepare to read packets using a AF_PACKET socket with PACKET_RX_RING
*
* see packet(7)
*
* also see
* sudo apt-get install linux-doc
* zless /usr/share/doc/linux-doc/networking/packet_mmap.txt.gz
*
* With PACKET_RX_RING (in TPACKET_V2)
* the ring buffer consists of an array of packet slots.
*
* Each packet is preceded by a metadata structure in the slot.
* The application and kernel communicate the head and tail of
* the ring through tp_status field (TP_STATUS_[USER|KERNEL]).
*
*/
int setup_rx(void) {
int rc=-1, ec;
if (check_ring_parameters() < 0) goto done;
/* any link layer protocol packets (linux/if_ether.h) */
int protocol = htons(ETH_P_ALL);
/* create the packet socket */
cfg.fd = socket(AF_PACKET, SOCK_RAW, protocol);
if (cfg.fd == -1) {
fprintf(stderr,"socket: %s\n", strerror(errno));
goto done;
}
/* convert interface name to index (in ifr.ifr_ifindex) */
struct ifreq ifr;
strncpy(ifr.ifr_name, cfg.dev, sizeof(ifr.ifr_name));
ec = ioctl(cfg.fd, SIOCGIFINDEX, &ifr);
if (ec < 0) {
fprintf(stderr,"failed to find interface %s\n", cfg.dev);
goto done;
}
/* PACKET_RX_RING comes in multiple versions. TPACKET_V2 is used here */
int v = TPACKET_V2;
ec = setsockopt(cfg.fd, SOL_PACKET, PACKET_VERSION, &v, sizeof(v));
if (ec < 0) {
fprintf(stderr,"setsockopt PACKET_VERSION: %s\n", strerror(errno));
goto done;
}
/* fill out the struct tpacket_req describing the ring buffer */
memset(&cfg.req, 0, sizeof(cfg.req));
cfg.req.tp_block_size = cfg.ring_block_sz; /* Min sz of contig block */
cfg.req.tp_frame_size = cfg.ring_frame_sz; /* Size of frame/snaplen */
cfg.req.tp_block_nr = cfg.ring_block_nr; /* Number of blocks */
cfg.req.tp_frame_nr = cfg.ring_frame_nr; /* Total number of frames */
describe_ring("PACKET_RX_RING");
ec = setsockopt(cfg.fd, SOL_PACKET, PACKET_RX_RING, &cfg.req, sizeof(cfg.req));
if (ec < 0) {
fprintf(stderr,"setsockopt PACKET_RX_RING: %s\n", strerror(errno));
goto done;
}
/* now map the ring buffer we described above. lock in unswappable memory */
cfg.pb.n = cfg.req.tp_block_size * cfg.req.tp_block_nr;
cfg.pb.d = mmap(NULL, cfg.pb.n, PROT_READ|PROT_WRITE,
MAP_SHARED|MAP_LOCKED, cfg.fd, 0);
if (cfg.pb.d == MAP_FAILED) {
fprintf(stderr,"mmap: %s\n", strerror(errno));
goto done;
}
/* bind to receive the packets from just one interface */
struct sockaddr_ll sl;
memset(&sl, 0, sizeof(sl));
sl.sll_family = AF_PACKET;
sl.sll_protocol = protocol;
sl.sll_ifindex = ifr.ifr_ifindex;
ec = bind(cfg.fd, (struct sockaddr*)&sl, sizeof(sl));
if (ec < 0) {
fprintf(stderr,"socket: %s\n", strerror(errno));
goto done;
}
/* set promiscuous mode to get all packets. */
struct packet_mreq m;
memset(&m, 0, sizeof(m));
m.mr_ifindex = ifr.ifr_ifindex;
m.mr_type = PACKET_MR_PROMISC;
ec = setsockopt(cfg.fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP, &m, sizeof(m));
if (ec < 0) {
fprintf(stderr,"setsockopt PACKET_ADD_MEMBERSHIP: %s\n", strerror(errno));
goto done;
}
rc = 0;
done:
return rc;
}
/*
* create the transmit socket
*
* There are two fundamentally different types of sockets here, only one
* of which is created, based on whether we are doing *encapsulated* transmit
* (of the packet into a GRE tunnel that then rides over regular IP); or
* "regular" packet transmission where we inject the packet to the NIC.
*
* MODE SOCKET TYPE SEE ALSO
* -------- ---------------- ---------------
* ENCAPSULATE RAW IP ip(7) and raw(7)
* REGULAR RAW PACKET packet(7)
*
* Within REGULAR mode we further distinguish between sendto()-based
* transmit, versus packet tx ring mode. The latter uses the kernel ring
* buffer mechanism described in packet_mmap.txt.
*
*/
int setup_tx(void) {
char interface[IF_NAMESIZE], *ip;
int rc=-1, ec, one = 1;
if (cfg.encap.enable) {
/* in encapsulation mode, use raw IP socket. */
cfg.tx_fd = socket(AF_INET, SOCK_RAW, IPPROTO_GRE);
if (cfg.tx_fd == -1) {
fprintf(stderr,"socket: %s\n", strerror(errno));
goto done;
}
/* IP_HDRINCL means WE form the IP headers.. with some help; see raw(7) */
ec = setsockopt(cfg.tx_fd, IPPROTO_IP, IP_HDRINCL, &one, sizeof(one));
if (ec < 0) {
fprintf(stderr,"setsockopt IP_HDRINCL: %s\n", strerror(errno));
goto done;
}
/* we need the mtu of the egress NIC to implement IP fragmentation,
* if needed, since raw sockets do not do that for us. to get the
* interface mtu, we need the egress interface, based on routing */
ec = find_route( cfg.encap.dst.s_addr, interface);
if (ec < 0) {
ip = inet_ntoa(cfg.encap.dst);
fprintf(stderr, "can't determine route to %s\n", ip);
goto done;
}
cfg.mtu = get_if_mtu(interface);
if (cfg.mtu < 0) {
fprintf(stderr, "mtu lookup failed: %s\n", interface);
goto done;
}
if (cfg.verbose) {
ip = inet_ntoa(cfg.encap.dst);
fprintf(stderr, "encapsulating to %s on interface %s mtu %d\n",
ip, interface, cfg.mtu);
}
rc = 0;
goto done;
}
/*
* standard tx mode
*/
/* use a raw PACKET (link-level) socket */
cfg.tx_fd = socket(AF_PACKET, SOCK_RAW, 0 /* tx only */);
if (cfg.tx_fd == -1) {
fprintf(stderr,"socket: %s\n", strerror(errno));
goto done;
}
/* convert interface name to index (in ifr.ifr_ifindex) */
struct ifreq ifr;
strncpy(ifr.ifr_name, cfg.dev, sizeof(ifr.ifr_name));
ec = ioctl(cfg.tx_fd, SIOCGIFINDEX, &ifr);
if (ec < 0) {
fprintf(stderr,"failed to find interface %s\n", cfg.dev);
goto done;
}
/* bind interface for tx */
struct sockaddr_ll sl;
memset(&sl, 0, sizeof(sl));
sl.sll_family = AF_PACKET;
sl.sll_ifindex = ifr.ifr_ifindex;
ec = bind(cfg.tx_fd, (struct sockaddr*)&sl, sizeof(sl));
if (ec < 0) {
fprintf(stderr,"socket: %s\n", strerror(errno));
goto done;
}
/* when qdisc bypass is enabled, to quote packet_mmap.txt, "packets sent
* through PF_PACKET will bypass the kernel's qdisc layer and are ...
* pushed to the driver directly. Meaning, packet are not buffered, tc
* disciplines are ignored, increased loss can occur and such packets are
* not visible to other PF_PACKET sockets anymore."
*/
#ifdef PACKET_QDISC_BYPASS
ec = cfg.bypass_qdisc_on_tx ?
setsockopt(cfg.tx_fd, SOL_PACKET, PACKET_QDISC_BYPASS, &one, sizeof(one)) : 0;
if (ec < 0) {
fprintf(stderr,"setsockopt PACKET_QDISC_BYPASS: %s\n", strerror(errno));
goto done;
}
#else
if (cfg.bypass_qdisc_on_tx) {
fprintf(stderr,"setsockopt PACKET_QDISC_BYPASS: unsupported\n");
goto done;
}
#endif
/* if we are using standard, sendto-based transmit, we are done */
if (cfg.use_tx_ring == 0) {
rc = 0;
goto done;
}
/*************************************************************
* packet tx ring setup
************************************************************/
if (check_ring_parameters() < 0) goto done;
int v = TPACKET_V2;
ec = setsockopt(cfg.tx_fd, SOL_PACKET, PACKET_VERSION, &v, sizeof(v));
if (ec < 0) {
fprintf(stderr,"setsockopt PACKET_VERSION: %s\n", strerror(errno));
goto done;
}
/* fill out the struct tpacket_req describing the ring buffer */
memset(&cfg.req, 0, sizeof(cfg.req));
cfg.req.tp_block_size = cfg.ring_block_sz; /* Min sz of contig block */
cfg.req.tp_frame_size = cfg.ring_frame_sz; /* Size of frame/snaplen */
cfg.req.tp_block_nr = cfg.ring_block_nr; /* Number of blocks */
cfg.req.tp_frame_nr = cfg.ring_frame_nr; /* Total number of frames */
describe_ring("PACKET_TX_RING");
ec = setsockopt(cfg.tx_fd, SOL_PACKET, PACKET_TX_RING, &cfg.req, sizeof(cfg.req));
if (ec < 0) {
fprintf(stderr,"setsockopt PACKET_TX_RING: %s\n", strerror(errno));
goto done;
}
/* map the tx ring buffer into unswappable memory */
cfg.pb.n = cfg.req.tp_block_size * cfg.req.tp_block_nr;
cfg.pb.d = mmap(NULL, cfg.pb.n, PROT_READ|PROT_WRITE,
MAP_SHARED|MAP_LOCKED, cfg.tx_fd, 0);
if (cfg.pb.d == MAP_FAILED) {
fprintf(stderr,"mmap: %s\n", strerror(errno));
goto done;
}
rc = 0;
done:
return rc;
}
int bb_flush(struct shr *s, struct bb *b) {
int rc = -1;
struct iovec *iov;
size_t n;
ssize_t wr;
n = utvector_len(b->iov);
if (n == 0) { rc = 0; goto done; }
iov = (struct iovec*)utvector_head(b->iov);
wr = shr_writev(s, iov, n);
if (wr < 0) {
fprintf(stderr,"shr_write: error code %ld\n", (long)wr);
goto done;
}
b->u = 0;
utvector_clear(b->iov);
rc = 0;
done:
return rc;
}
/* store the message into the batch buffer */
ssize_t bb_write(struct shr *s, struct bb *b, char *buf, size_t len) {
struct iovec io;
int rc = -1;
if (b->n - b->u < len) {
if (bb_flush(s,b) < 0) goto done;
}
assert((b->n - b->u) >= len);
io.iov_base = &b->d[b->u];
io.iov_len = len;
memcpy(io.iov_base, buf, len);
utvector_push(b->iov, &io);
b->u += len;
rc = 0;
done:
return (rc < 0) ? (ssize_t)-1 : len;
}
/* add rx drops to the counter in the ring app data
*
* see /usr/include/linux/if_packet.h
* see packet(7)
* "Receiving statistics resets the internal counters."
*
*/
int update_rx_drops(void) {
struct tpacket_stats stats;
struct fluxcap_stats st;
size_t st_sz;
void *stp;
int sc, rc = -1;
assert(cfg.mode == mode_receive);
if (cfg.losing == 0) return 0;
/* packet(7): "Receiving statistics resets the internal counters." */
socklen_t len = sizeof(stats);
sc = getsockopt(cfg.fd, SOL_PACKET, PACKET_STATISTICS, &stats, &len);
if (sc < 0) {
fprintf(stderr,"getsockopt: %s\n", strerror(errno));
return -1;
}
if (cfg.verbose) {
fprintf(stderr, "Received packets: %u\n", stats.tp_packets);
fprintf(stderr, "Dropped packets: %u\n", stats.tp_drops);
}
stp = &st;
st_sz = sizeof(st);
sc = shr_appdata(cfg.ring, &stp, NULL, &st_sz); /* "get" */
if (sc < 0) {
fprintf(stderr, "shr_appdata: error %d\n", sc);
goto done;
}
st.rx_drops += stats.tp_drops;
sc = shr_appdata(cfg.ring, NULL, stp, &st_sz); /* "set" */
if (sc < 0) {
fprintf(stderr, "shr_appdata: error %d\n", sc);
goto done;
}
cfg.losing = 0;
rc = 0;
done:
return rc;
}
/* add ring read drops to the counter in the ring app data */
int update_rd_drops(void) {
struct fluxcap_stats st;
size_t st_sz;
void *stp;
int sc, rc = -1;
stp = &st;
st_sz = sizeof(st);
sc = shr_appdata(cfg.ring, &stp, NULL, &st_sz); /* "get" */
if (sc < 0) {
fprintf(stderr, "shr_appdata: error %d\n", sc);
goto done;
}
st.rd_drops += shr_farm_stat(cfg.ring, 1);
sc = shr_appdata(cfg.ring, NULL, stp, &st_sz); /* "set" */
if (sc < 0) {
fprintf(stderr, "shr_appdata: error %d\n", sc);
goto done;
}
rc = 0;
done:
return rc;
}
/* returns volatile memory - use immediately or copy.
* takes bits-per-second as input, returns like "20 Mbit/s"
* where "bit" is the unit, can also be "pkt" etc.
* using whatever SI unit is most readable (K,M,G,T)
*/
char *format_rate(unsigned long bps, char *unit) {
double b = bps;
char *c = "";
if (b > 1024) { b /= 1024; c = "K"; }
if (b > 1024) { b /= 1024; c = "M"; }
if (b > 1024) { b /= 1024; c = "G"; }
if (b > 1024) { b /= 1024; c = "T"; }
utstring_clear(cfg.tmp);
utstring_printf(cfg.tmp, "%.0f %s%s/s", b, c, unit);
return utstring_body(cfg.tmp);
}
/*
* status_rings
*
* update i/o metrics for each ring
*
*/
int status_rings(void) {
unsigned long start_tick, st, ct;
struct shr_stat *ss;
double elapsed_sec, lg10_b;
size_t sz;
int rc = -1, sc, i;
char *name, *c;
struct shr **r;
struct ww *w;
UT_string *s;
ssize_t nr;
void *fs;
/* unicode 1/8 width box progression */
char *blocks[] = { "", "▏", "▎", "▍", "▌", "▋", "▊", "▉", "█"};
printf("\033[1;1H"); /* position at line 0, col 0 */
printf("\033[1m"); /* bold */
printf(" %-20s | %-12s | %-12s | %-12s \n\n",
"name", "rx-rate", "rx-drop", "tx-drop");
printf("\033[m"); /* reset attributes */
/* go through the rings to obtain their in/out counters */
s = NULL;
r = NULL;
w = NULL;
while ( (r = (struct shr**)utvector_next(cfg.watch_rings, r))) {
s = (UT_string*)utvector_next(cfg.watch_names, s);
w = (struct ww*)utvector_next(cfg.watch_win, w);
assert(s);
assert(w);
name = utstring_body(s);
ss = &w->win[ cfg.ticks % NWIN ].ss;
sc = shr_stat(*r, ss, NULL);
if (sc < 0) goto done;
fs = &w->win[ cfg.ticks % NWIN ].fs;
sz = sizeof(struct fluxcap_stats);
sc = shr_appdata(*r, &fs, NULL, &sz);
if (sc < 0) {
fprintf(stderr, "shr_appdata: error %d\n", sc);
goto done;
}
/* for this ring, compute intake & drops over the windows */
start_tick = (cfg.ticks < NWIN) ? 0 : (cfg.ticks - (NWIN - 1));
st = start_tick % NWIN;
ct = cfg.ticks % NWIN;
w->bw = w->win[ ct ].ss.bw -
w->win[ st ].ss.bw;
w->mw = w->win[ ct ].ss.mw -
w->win[ st ].ss.mw;
w->rx = w->win[ ct ].fs.rx_drops -
w->win[ st ].fs.rx_drops;
w->rd = w->win[ ct ].fs.rd_drops -
w->win[ st ].fs.rd_drops;
/* compute per second rates, log and strings */
elapsed_sec = (cfg.ticks - start_tick) * 1.0 / TIMER_HZ;
memset( &w->ps, 0, sizeof(w->ps) );
if (elapsed_sec > 0) {
w->ps.B = w->bw / elapsed_sec;
w->ps.b = w->ps.B * 8;
lg10_b = w->ps.b ? log10(w->ps.b) : 0;
w->ps.lg10_b = (unsigned)floor(lg10_b); /* integer part */
w->ps.lg10_bf = (lg10_b - w->ps.lg10_b) * 8; /* fraction n/8 */
w->ps.rx = w->rx / elapsed_sec;
w->ps.rd = w->rd / elapsed_sec;
}
/* render strings */
strncpy(w->name, name, NAME_MAX);
w->name[NAME_MAX - 1] = '\0';
snprintf(w->ps.str.b, RATE_MAX, "%lu", w->ps.b);
snprintf(w->ps.str.rx, RATE_MAX, "%lu", w->ps.rx);