Skip to content

Commit

Permalink
Version 3
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn committed Aug 7, 2019
1 parent f5dccb5 commit 2c9c0a6
Show file tree
Hide file tree
Showing 33 changed files with 2,280 additions and 1,306 deletions.
15 changes: 15 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
language: go
go_import_path: github.com/cloudflare/goflow
go:
- 1.12.x

script:
- GO111MODULE=on make

notifications:
email:
recipients:
- louis@cloudflare.com
on_success: never
on_failure: change

17 changes: 6 additions & 11 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
ARG src_dir="/go/src/github.com/cloudflare/goflow"

FROM golang:alpine as builder
ARG src_dir
ARG VERSION=""

RUN apk --update --no-cache add git && \
mkdir -p ${src_dir}
RUN apk --update --no-cache add git build-base gcc

WORKDIR ${src_dir}
COPY . .
COPY . /build
WORKDIR /build

RUN go get -u github.com/golang/dep/cmd/dep && \
dep ensure && \
go build
RUN go build -ldflags "-X main.version=${VERSION}" -o goflow cmd/goflow/goflow.go

FROM alpine:latest
ARG src_dir

RUN apk update --no-cache && \
adduser -S -D -H -h / flow
USER flow
COPY --from=builder ${src_dir}/goflow /
COPY --from=builder /build/goflow /

ENTRYPOINT ["./goflow"]
46 changes: 0 additions & 46 deletions Gopkg.toml

This file was deleted.

48 changes: 46 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,48 @@
IMAGE ?= cloudflare/goflow
VERSION ?= $(shell git describe --tags --always --dirty)
VERSION_DOCKER ?= $(shell git describe --tags --abbrev=0 --always --dirty)

GOOS ?= linux
ARCH ?= $(shell uname -m)

.PHONY: all
all: test-race vet test

.PHONY: clean
clean:
rm -rf bin

.PHONY: build
build:
@echo compiling code
mkdir bin
GOOS=$(GOOS) go build -ldflags '-X main.version=$(VERSION)' -o bin/goflow-$(GOOS)-$(ARCH) cmd/goflow/goflow.go
GOOS=$(GOOS) go build -ldflags '-X main.version=$(VERSION)' -o bin/goflow-sflow-$(GOOS)-$(ARCH) cmd/csflow/csflow.go
GOOS=$(GOOS) go build -ldflags '-X main.version=$(VERSION)' -o bin/goflow-netflow-$(GOOS)-$(ARCH) cmd/cnetflow/cnetflow.go
GOOS=$(GOOS) go build -ldflags '-X main.version=$(VERSION)' -o bin/goflow-nflegacy-$(GOOS)-$(ARCH) cmd/cnflegacy/cnflegacy.go


.PHONY: container
container:
@echo build docker container
docker build --build-arg VERSION=$(VERSION) -t $(IMAGE):$(VERSION_DOCKER) .

.PHONY: proto
proto:
@echo generating protobuf
protoc --go_out=. --plugin=$(PROTOCPATH)protoc-gen-go pb/*.proto

.PHONY: test
test:
@echo testing code
go test ./...

.PHONY: vet
vet:
@echo checking code is vetted
go vet $(shell go list ./...)

proto:
protoc $$PROTO_PATH --go_out=. pb/flow.proto
.PHONY: test-race
test-race:
@echo testing code for races
go test -race ./...
138 changes: 77 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ which contains the fields a network engineer is interested in.
The flow packets usually contains multiples samples
This acts as an abstraction of a sample.

The `transport` provides different way of processing the protobuf. Either sending it via Kafka or
print it on the console.

Finally, `utils` provide functions that are directly used by the CLI utils.
GoFlow is a wrapper of all the functions and chains thems into producing bytes into Kafka.
There is also one CLI tool per protocol.

You can build your own collector using this base and replace parts:
* Use different transport (eg: RabbitMQ instead of Kafka)
* Convert to another format (eg: Cap'n Proto, Avro, instead of protobuf)
* Decode different samples (eg: not only IP networks, add MPLS)
* Different metrics system (eg: use [expvar](https://golang.org/pkg/expvar/) instead of Prometheus)

Starting on v2.0.0: you have an increased flexibility and less interdependence in the code.

### Protocol difference

The sampling protocols can be very different:
Expand All @@ -50,13 +53,15 @@ protocols (eg: per ASN or per port, rather than per (ASN, router) and (port, rou
## Features

Collection:
* NetFlow v5
* IPFIX/NetFlow v9
* Handles sampling rate provided by the Option Data Set
* sFlow v5: RAW, IPv4, IPv6, Ethernet samples, Gateway data, router data, switch data

Production:
* Convert to protobuf
* Sends to Kafka producer
* Prints to the console

Monitoring:
* Prometheus metrics
Expand All @@ -73,24 +78,24 @@ Download the latest release and just run the following command:
./goflow -h
```

Enable or disable a protocol using `-netflow=false` or `-sflow=false`.
Define the port and addresses of the protocols using `-faddr`, `-fport` for NetFlow and `-saddr`, `-sport` for sFlow.

Set the `-loglevel` to `debug` mode to see what is received.
Enable or disable a protocol using `-nf=false` or `-sflow=false`.
Define the port and addresses of the protocols using `-nf.addr`, `-nf.port` for NetFlow and `-sflow.addr`, `-slow.port` for sFlow.

Set the brokers or the Kafka brokers SRV record using: `-kafka.out.brokers 127.0.0.1:9092,[::1]:9092` or `-kafka.out.srv`.
Disable Kafka sending `-kafka=false`.
You can hash the protobuf by key when you send it to Kafka.

You can collect NetFlow/IPFIX and sFlow using the same.
You can collect NetFlow/IPFIX, NetFlow v5 and sFlow using the same collector
or use the single-protocol collectors.

You can define the number of workers per protocol using `-fworkers` and `-sworkers`.
You can define the number of workers per protocol using `-workers` .

## Docker

We also provide a all-in-one Docker container. To run it in debug mode without sending into Kafka:

```
$ sudo docker run --net=host -ti cloudflare/goflow:latest -kafka=false -loglevel debug
$ sudo docker run --net=host -ti cloudflare/goflow:latest -kafka=false
```

## Environment
Expand All @@ -109,17 +114,14 @@ is preserved when adding new fields (thus the fields will be lost if re-serializ

Once the updated flows are back into Kafka, they are **consumed** by **database inserters** (Clickhouse, Amazon Redshift, Google BigTable...)
to allow for static analysis. Other teams access the network data just like any other log (SQL query).
They are also consumed by a Flink cluster in order to be **aggregated** and give live traffic information.

### Output format

If you want to develop applications, build `pb/flow.proto` into the language you want:

Example in Go:
```
export SRC_DIR="path/to/goflow-pb"
protoc --proto_path=$SRC_DIR --plugin=/path/to/bin/protoc-gen-go $SRC_DIR/flow.proto --go_out=$SRC_DIR
PROTOCPATH=$HOME/go/bin/ make proto
```

Example in Java:
Expand All @@ -128,65 +130,79 @@ Example in Java:
export SRC_DIR="path/to/goflow-pb"
export DST_DIR="path/to/java/app/src/main/java"
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/flow.proto
```

The format is the following:

| Field | Description |
| ----- | ----------- |
| FlowType | Indicates the protocol (IPFIX, NetFlow v9, sFlow v5) |
| TimeRecvd | Timestamp the packet was received by the collector |
| TimeFlow | Timestamp of the packet (same as TimeRecvd in sFlow, in NetFlow it's the uptime of the router minus LAST_SWITCHED field, in IPFIX it's flowEnd* field), meant to be replaced by TimeFlowEnd |
| SamplingRate | Sampling rate of the flow, used to extrapolate the number of bytes and packets |
| SequenceNum | Sequence number of the packet |
| SrcIP | Source IP (sequence of bytes, can be IPv4 or IPv6) |
| DstIP | Destination IP (sequence of bytes, can be IPv4 or IPv6) |
| IPType | Indicates if IPv4 or IPv6), meant to be replaced by Etype |
| Bytes | Number of bytes in the sample |
| Packets | Number of packets in the sample |
| RouterAddr | Address of the router (UDP source in NetFlow/IPFIX, Agent IP in sFlow) |
| NextHop | Next-hop IP |
| NextHopAS | Next-hop ASN when the next-hop is a BGP neighbor (not all the flows) |
| SrcAS | Source ASN (provided by BGP) |
| DstAS | Destination ASN (provided by BGP) |
| SrcNet | Network mask of the source IP (provided by BGP) |
| DstNet | Network mask of the destination IP (provided by BGP) |
| SrcIf | Source interface ID (SNMP id) |
| DstIf | Destination interface ID (SNMP id) |
| Proto | Protocol code: TCP, UDP, etc. |
| SrcPort | Source port when proto is UDP/TCP |
| DstPort | Destination port when proto is UDP/TCP |
| IPTos | IPv4 type of service / Traffic class in IPv6 |
| ForwardingStatus | If the packet has been [dropped, consumed or forwarded](https://www.iana.org/assignments/ipfix/ipfix.xhtml#forwarding-status) |
| IPTTL | Time to Live of the IP packet |
| TCPFlags | Flags of the TCP Packet (SYN, ACK, etc.) |
| SrcMac | Source Mac Address |
| DstMac | Destination Mac Address |
| VlanId | Vlan when 802.1q |
| Etype | Ethernet type (IPv4, IPv6, ARP, etc.) |
| IcmpType | ICMP Type |
| IcmpCode | ICMP Code |
| SrcVlan | Source VLAN |
| DstVlan | Destination VLAN |
| FragmentId | IP Fragment Identifier |
| FragmentOffset | IP Fragment Offset |
| IPv6FlowLabel | IPv6 Flow Label |
| TimeFlowStart | Start Timestamp of the flow (this field is empty for sFlow, in NetFlow it's the uptime of the router minus FIRST_SWITCHED field, in IPFIX it's flowStart* field) |
| TimeFlowEnd | End Timestamp of the flow (same as TimeRecvd in sFlow, in NetFlow it's the uptime of the router minus LAST_SWITCHED field, in IPFIX it's flowEnd* field) |
| IngressVrfId | Ingress VRF ID |
| EgressVrfId | Egress VRF ID |
The fields are listed in the following table.

You can find information on how they are populated from the original source:
* For [sFlow](https://sflow.org/developers/specifications.php)
* For [NetFlow v5](https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html)
* For [NetFlow v9](https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html)
* For [IPFIX](https://www.iana.org/assignments/ipfix/ipfix.xhtml)

| Field | Description | NetFlow v5 | sFlow | NetFlow v9 | IPFIX |
| - | - | - | - | - | - |
|Type|Type of flow message|NETFLOW_V5|SFLOW_5|NETFLOW_V9|IPFIX|
|TimeReceived|Timestamp of when the message was received|Included|Included|Included|Included|
|SequenceNum|Sequence number of the flow packet|Included|Included|Included|Included|
|SamplingRate|Sampling rate of the flow|Included|Included|Included|Included|
|FlowDirection|Direction of the flow| | |DIRECTION (61)|flowDirection (61)|
|SamplerAddress|Address of the device that generated the packet|IP source of packet|Agent IP|IP source of packet|IP source of packet|
|TimeFlowStart|Time the flow started|System uptime and first|=TimeReceived|System uptime and FIRST_SWITCHED (22)|flowStartXXX (150, 152, 154, 156)|
|TimeFlowEnd|Time the flow ended|System uptime and last|=TimeReceived|System uptime and LAST_SWITCHED (23)|flowEndXXX (151, 153, 155, 157)|
|Bytes|Number of bytes in flow|dOctets|Length of sample|IN_BYTES (1) OUT_BYTES (23)|octetDeltaCount (1) postOctetDeltaCount (23)|
|Packets|Number of packets in flow|dPkts|=1|IN_PKTS (2) OUT_PKTS (24)|packetDeltaCount (1) postPacketDeltaCount (24)|
|SrcAddr|Source address (IP)|srcaddr (IPv4 only)|Included|Included|IPV4_SRC_ADDR (8) IPV6_SRC_ADDR (27)|sourceIPv4Address/sourceIPv6Address (8/27)|
|DstAddr|Destination address (IP)|dstaddr (IPv4 only)|Included|Included|IPV4_DST_ADDR (12) IPV6_DST_ADDR (28)|destinationIPv4Address (12)destinationIPv6Address (28)|
|Etype|Ethernet type (0x86dd for IPv6...)|IPv4|Included|Included|Included|
|Proto|Protocol (UDP, TCP, ICMP...)|prot|Included|PROTOCOL (4)|protocolIdentifier (4)|
|SrcPort|Source port (when UDP/TCP/SCTP)|srcport|Included|L4_DST_PORT (11)|destinationTransportPort (11)|
|DstPort|Destination port (when UDP/TCP/SCTP)|dstport|Included|L4_SRC_PORT (7)|sourceTransportPort (7)|
|SrcIf|Source interface|input|Included|INPUT_SNMP (10)|ingressInterface (10)|
|DstIf|Destination interface|output|Included|OUTPUT_SNMP (14)|egressInterface (14)|
|SrcMac|Source mac address| |Included|IN_SRC_MAC (56)|sourceMacAddress (56)|
|DstMac|Destination mac address| |Included|OUT_DST_MAC (57)|postDestinationMacAddress (57)|
|SrcVlan|Source VLAN ID| |From ExtendedSwitch|SRC_VLAN (59)|vlanId (58)|
|DstVlan|Destination VLAN ID| |From ExtendedSwitch|DST_VLAN (59)|postVlanId (59)|
|VlanId|802.11q VLAN ID| |Included|SRC_VLAN (59)|postVlanId (59)|
|IngressVrfID|VRF ID| | | |ingressVRFID (234)|
|EgressVrfID|VRF ID| | | |egressVRFID (235)|
|IPTos|IP Type of Service|tos|Included|SRC_TOS (5)|ipClassOfService (5)|
|ForwardingStatus|Forwarding status| | |FORWARDING_STATUS (89)|forwardingStatus (89)|
|IPTTL|IP Time to Live| |Included|IPTTL (52)|minimumTTL (52|
|TCPFlags|TCP flags|tcp_flags|Included|TCP_FLAGS (6)|tcpControlBits (6)|
|IcmpType|ICMP Type| |Included|ICMP_TYPE (32)|icmpTypeXXX (176, 178) icmpTypeCodeXXX (32, 139)|
|IcmpCode|ICMP Code| |Included|ICMP_TYPE (32)|icmpCodeXXX (177, 179) icmpTypeCodeXXX (32, 139)|
|IPv6FlowLabel|IPv6 Flow Label| |Included|IPV6_FLOW_LABEL (31)|flowLabelIPv6 (31)|
|FragmentId|IP Fragment ID| |Included|IPV4_IDENT (54)|fragmentIdentification (54)|
|FragmentOffset|IP Fragment Offset| |Included|FRAGMENT_OFFSET (88)|fragmentOffset (88)|
|BiFlowDirection|BiFlow Identification| | | |biflowDirection (239)|
|SrcAS|Source AS number|src_as|From ExtendedGateway|SRC_AS (16)|bgpSourceAsNumber (16)|
|DstAS|Destination AS number|dst_as|From ExtendedGateway|DST_AS (17)|bgpDestinationAsNumber (17)|
|NextHop|Nexthop address|nexthop|From ExtendedGateway|IPV4_NEXT_HOP (15) BGP_IPV4_NEXT_HOP (18) IPV6_NEXT_HOP (62) BGP_IPV6_NEXT_HOP (63)|ipNextHopIPv4Address (15) bgpNextHopIPv4Address (18) ipNextHopIPv6Address (62) bgpNextHopIPv6Address (63)|
|NextHopAS|Nexthop AS number| |From ExtendedGateway| | |
|SrcNet|Source address mask|src_mask|From ExtendedRouter|SRC_MASK (9) IPV6_SRC_MASK (29)|sourceIPv4PrefixLength (9) sourceIPv6PrefixLength (29)|
|DstNet|Destination address mask|dst_mask|From ExtendedRouter|DST_MASK (13) IPV6_DST_MASK (30)|destinationIPv4PrefixLength (13) destinationIPv6PrefixLength (30)|

If you are implementing flow processors to add more data to the protobuf,
we suggest you use field IDs ≥ 1000.

### Implementation notes

At Cloudflare, we aggregate the flows in Flink using a
The pipeline at Cloudflare is connecting collectors with flow processors
that will add more information: with IP address, add country, ASN, etc.

For aggregation, we are using Materialized tables in Clickhouse.
Dictionaries help correlating flows with country and ASNs.
A few collectors can treat hundred of thousands of samples.

We also experimented successfully flow aggregation with Flink using a
[Keyed Session Window](https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#session-windows):
this sums the `Bytes x SamplingRate` and `Packets x SamplingRate` received during a 5 minutes **window** while allowing 2 more minutes
in the case where some flows were delayed before closing the **session**.

The BGP information provided by routers can be unreliable (if the router does not have a BGP full-table or it is a static route).
You can use Maxmind [prefix to ASN](https://dev.maxmind.com/geoip/geoip2/geolite2/) in order to solve this issue.
We also gather the next-hops ASN using a custom BGP collector using [fgbgp library](https://github.com/cloudflare/fgbgp).

## License

Expand Down
Loading

0 comments on commit 2c9c0a6

Please sign in to comment.