Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Feb 19, 2025. It is now read-only.
/goflowPublic archive

The high-scalability sFlow/NetFlow/IPFIX collector used internally at Cloudflare.

License

NotificationsYou must be signed in to change notification settings

cloudflare/goflow

Warning

This software is no longer maintained. We advise replacing your production use of this software with the forkgoflow2.

This application is a NetFlow/IPFIX/sFlow collector in Go.

It gathers network information (IP, interfaces, routers) from different flow protocols,serializes it in a protobuf format and sends the messages to Kafka using Sarama's library.

Why

The diversity of devices and the amount of network samples at Cloudflare required its own pipeline.We focused on building tools that could be easily monitored and maintained.The main goal is to have full visibility of a network while allowing other teams to develop on it.

Modularity

In order to enable load-balancing and optimizations, the GoFlow library has adecoder which convertsthe payload of a flow packet into a Go structure.

Theproducer functions (one per protocol) then converts those structures into a protobuf (pb/flow.pb)which contains the fields a network engineer is interested in.The flow packets usually contains multiples samplesThis acts as an abstraction of a sample.

Thetransport provides different way of processing the protobuf. Either sending it via Kafka orprint it on the console.

Finally,utils provide functions that are directly used by the CLI utils.GoFlow is a wrapper of all the functions and chains thems into producing bytes into Kafka.There is also one CLI tool per protocol.

You can build your own collector using this base and replace parts:

  • Use different transport (eg: RabbitMQ instead of Kafka)
  • Convert to another format (eg: Cap'n Proto, Avro, instead of protobuf)
  • Decode different samples (eg: not only IP networks, add MPLS)
  • Different metrics system (eg: useexpvar instead of Prometheus)

Protocol difference

The sampling protocols can be very different:

sFlow is a stateless protocol which sends the full header of a packet with router information(interfaces, destination AS) whileNetFlow/IPFIX rely on templates that contain fields (eg: source IPv6).

The sampling rate in NetFlow/IPFIX is provided byOption Data Sets. This is why it can take a few minutesfor the packets to be decoded until all the templates are received (Option Template andData Template).

Both of these protocols bundle multiple samples (Data Set in NetFlow/IPFIX andFlow Sample in sFlow)in one packet.

The advantages of using an abstract network flow format, such as protobuf, is it enables summing over theprotocols (eg: per ASN or per port, rather than per (ASN, router) and (port, router)).

Features

Collection:

  • NetFlow v5
  • IPFIX/NetFlow v9
    • Handles sampling rate provided by the Option Data Set
  • sFlow v5: RAW, IPv4, IPv6, Ethernet samples, Gateway data, router data, switch data

Production:

  • Convert to protobuf
  • Sends to Kafka producer
  • Prints to the console

Monitoring:

  • Prometheus metrics
  • Time to decode
  • Samples rates
  • Payload information
  • NetFlow Templates

Run

Download the latest release and just run the following command:

./goflow -h

Enable or disable a protocol using-nf=false or-sflow=false.Define the port and addresses of the protocols using-nf.addr,-nf.port for NetFlow and-sflow.addr,-slow.port for sFlow.

Set the brokers or the Kafka brokers SRV record using:-kafka.brokers 127.0.0.1:9092,[::1]:9092 or-kafka.srv.Disable Kafka sending-kafka=false.You can hash the protobuf by key when you send it to Kafka.

You can collect NetFlow/IPFIX, NetFlow v5 and sFlow using the same collectoror use the single-protocol collectors.

You can define the number of workers per protocol using-workers .

Docker

We also provide a all-in-one Docker container. To run it in debug mode without sending into Kafka:

$ sudo docker run --net=host -ti cloudflare/goflow:latest -kafka=false

Environment

To get an example of pipeline, check outflow-pipeline

How is it used at Cloudflare

The samples flowing into Kafka areprocessed and special fields are inserted using other databases:

  • User plan
  • Country
  • ASN and BGP information

The extended protobuf has the same base of the one in this repo. Thecompatibility with other softwareis preserved when adding new fields (thus the fields will be lost if re-serialized).

Once the updated flows are back into Kafka, they areconsumed bydatabase inserters (Clickhouse, Amazon Redshift, Google BigTable...)to allow for static analysis. Other teams access the network data just like any other log (SQL query).

Output format

If you want to develop applications, buildpb/flow.proto into the language you want:

Example in Go:

PROTOCPATH=$HOME/go/bin/ make proto

Example in Java:

export SRC_DIR="path/to/goflow-pb"export DST_DIR="path/to/java/app/src/main/java"protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/flow.proto

The fields are listed in the following table.

You can find information on how they are populated from the original source:

FieldDescriptionNetFlow v5sFlowNetFlow v9IPFIX
TypeType of flow messageNETFLOW_V5SFLOW_5NETFLOW_V9IPFIX
TimeReceivedTimestamp of when the message was receivedIncludedIncludedIncludedIncluded
SequenceNumSequence number of the flow packetIncludedIncludedIncludedIncluded
SamplingRateSampling rate of the flowIncludedIncludedIncludedIncluded
FlowDirectionDirection of the flowDIRECTION (61)flowDirection (61)
SamplerAddressAddress of the device that generated the packetIP source of packetAgent IPIP source of packetIP source of packet
TimeFlowStartTime the flow startedSystem uptime and first=TimeReceivedSystem uptime and FIRST_SWITCHED (22)flowStartXXX (150, 152, 154, 156)
TimeFlowEndTime the flow endedSystem uptime and last=TimeReceivedSystem uptime and LAST_SWITCHED (23)flowEndXXX (151, 153, 155, 157)
BytesNumber of bytes in flowdOctetsLength of sampleIN_BYTES (1) OUT_BYTES (23)octetDeltaCount (1) postOctetDeltaCount (23)
PacketsNumber of packets in flowdPkts=1IN_PKTS (2) OUT_PKTS (24)packetDeltaCount (1) postPacketDeltaCount (24)
SrcAddrSource address (IP)srcaddr (IPv4 only)IncludedIncludedIPV4_SRC_ADDR (8) IPV6_SRC_ADDR (27)
DstAddrDestination address (IP)dstaddr (IPv4 only)IncludedIncludedIPV4_DST_ADDR (12) IPV6_DST_ADDR (28)
EtypeEthernet type (0x86dd for IPv6...)IPv4IncludedIncludedIncluded
ProtoProtocol (UDP, TCP, ICMP...)protIncludedPROTOCOL (4)protocolIdentifier (4)
SrcPortSource port (when UDP/TCP/SCTP)srcportIncludedL4_SRC_PORT (7)sourceTransportPort (7)
DstPortDestination port (when UDP/TCP/SCTP)dstportIncludedL4_DST_PORT (11)destinationTransportPort (11)
InIfInput interfaceinputIncludedINPUT_SNMP (10)ingressInterface (10)
OutIfOutput interfaceoutputIncludedOUTPUT_SNMP (14)egressInterface (14)
SrcMacSource mac addressIncludedIN_SRC_MAC (56)sourceMacAddress (56)
DstMacDestination mac addressIncludedOUT_DST_MAC (57)postDestinationMacAddress (57)
SrcVlanSource VLAN IDFrom ExtendedSwitchSRC_VLAN (59)vlanId (58)
DstVlanDestination VLAN IDFrom ExtendedSwitchDST_VLAN (59)postVlanId (59)
VlanId802.11q VLAN IDIncludedSRC_VLAN (59)postVlanId (59)
IngressVrfIDVRF IDingressVRFID (234)
EgressVrfIDVRF IDegressVRFID (235)
IPTosIP Type of ServicetosIncludedSRC_TOS (5)ipClassOfService (5)
ForwardingStatusForwarding statusFORWARDING_STATUS (89)forwardingStatus (89)
IPTTLIP Time to LiveIncludedIPTTL (52)minimumTTL (52
TCPFlagsTCP flagstcp_flagsIncludedTCP_FLAGS (6)tcpControlBits (6)
IcmpTypeICMP TypeIncludedICMP_TYPE (32)icmpTypeXXX (176, 178) icmpTypeCodeXXX (32, 139)
IcmpCodeICMP CodeIncludedICMP_TYPE (32)icmpCodeXXX (177, 179) icmpTypeCodeXXX (32, 139)
IPv6FlowLabelIPv6 Flow LabelIncludedIPV6_FLOW_LABEL (31)flowLabelIPv6 (31)
FragmentIdIP Fragment IDIncludedIPV4_IDENT (54)fragmentIdentification (54)
FragmentOffsetIP Fragment OffsetIncludedFRAGMENT_OFFSET (88)fragmentOffset (88) and fragmentFlags (197)
BiFlowDirectionBiFlow IdentificationbiflowDirection (239)
SrcASSource AS numbersrc_asFrom ExtendedGatewaySRC_AS (16)bgpSourceAsNumber (16)
DstASDestination AS numberdst_asFrom ExtendedGatewayDST_AS (17)bgpDestinationAsNumber (17)
NextHopNexthop addressnexthopFrom ExtendedGatewayIPV4_NEXT_HOP (15) BGP_IPV4_NEXT_HOP (18) IPV6_NEXT_HOP (62) BGP_IPV6_NEXT_HOP (63)ipNextHopIPv4Address (15) bgpNextHopIPv4Address (18) ipNextHopIPv6Address (62) bgpNextHopIPv6Address (63)
NextHopASNexthop AS numberFrom ExtendedGateway
SrcNetSource address masksrc_maskFrom ExtendedRouterSRC_MASK (9) IPV6_SRC_MASK (29)sourceIPv4PrefixLength (9) sourceIPv6PrefixLength (29)
DstNetDestination address maskdst_maskFrom ExtendedRouterDST_MASK (13) IPV6_DST_MASK (30)destinationIPv4PrefixLength (13) destinationIPv6PrefixLength (30)
HasEncapIndicates if has GRE encapsulationIncluded
xxxEncap fieldsSame as field but inside GREIncluded
HasMPLSIndicates the presence of MPLS headerIncluded
MPLSCountCount of MPLS layersIncluded
MPLSxTTLTTL of the MPLS labelIncluded
MPLSxLabelMPLS labelIncluded

If you are implementing flow processors to add more data to the protobuf,we suggest you use field IDs ≥ 1000.

Implementation notes

The pipeline at Cloudflare is connecting collectors with flow processorsthat will add more information: with IP address, add country, ASN, etc.

For aggregation, we are using Materialized tables in Clickhouse.Dictionaries help correlating flows with country and ASNs.A few collectors can treat hundred of thousands of samples.

We also experimented successfully flow aggregation with Flink using aKeyed Session Window:this sums theBytes x SamplingRate andPackets x SamplingRate received during a 5 minuteswindow while allowing 2 more minutesin the case where some flows were delayed before closing thesession.

The BGP information provided by routers can be unreliable (if the router does not have a BGP full-table or it is a static route).You can use Maxmindprefix to ASN in order to solve this issue.

License

Licensed under the BSD 3 License.

About

The high-scalability sFlow/NetFlow/IPFIX collector used internally at Cloudflare.

Topics

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

No packages published

[8]ページ先頭

©2009-2025 Movatter.jp