diff --git a/test/e2e/framework/sessionconfig.go b/test/e2e/framework/sessionconfig.go index ddde061..915fe76 100644 --- a/test/e2e/framework/sessionconfig.go +++ b/test/e2e/framework/sessionconfig.go @@ -106,6 +106,7 @@ func newVendorSpecificU8IE(itype uint16, eid uint16, val uint8) *ie.IE { } func (cfg SessionConfig) ipfixTemplateIEs() []*ie.IE { + // IEs should be created for uplink FARs only if cfg.IPFIXTemplate == "" { return nil } @@ -184,7 +185,7 @@ func (cfg SessionConfig) reverseFAR(farID uint32, flag uint8) *ie.IE { ie.NewFARID(farID), ie.NewApplyAction(flag), ie.NewForwardingParameters(fwParams...), - }, cfg.ipfixTemplateIEs()...) + }) return ie.NewCreateFAR(ies...) } diff --git a/test/e2e/ipfix_e2e.go b/test/e2e/ipfix_e2e.go index 23ae1f0..045f7be 100644 --- a/test/e2e/ipfix_e2e.go +++ b/test/e2e/ipfix_e2e.go @@ -49,33 +49,33 @@ func describeIPFIX(title string, mode framework.UPGMode, ipMode framework.UPGIPM }) }) - ginkgo.Context("default template", func() { + ginkgo.Context("NatEvent template", func() { v.withIPFIXHandler() ginkgo.It("sends IPFIX reports as requested [TCP]", func() { v.verifyIPFIX(ipfixVerifierCfg{ - farTemplate: "default", + farTemplate: "NatEvent", trafficCfg: smallVolumeHTTPConfig(nil), protocol: layers.IPProtocolTCP, expectedTrafficPort: 80, }) - v.verifyIPFIXDefaultRecords() + v.verifyIPFIXNatEventRecords() }) ginkgo.It("sends IPFIX reports as requested [TCP] [proxy]", func() { v.verifyIPFIX(ipfixVerifierCfg{ - farTemplate: "default", + farTemplate: "NatEvent", trafficCfg: smallVolumeHTTPConfig(nil), protocol: layers.IPProtocolTCP, expectedTrafficPort: 80, adf: true, }) - v.verifyIPFIXDefaultRecords() + v.verifyIPFIXNatEventRecords() }) ginkgo.It("sends IPFIX reports as requested [UDP]", func() { v.verifyIPFIX(ipfixVerifierCfg{ - farTemplate: "default", + farTemplate: "NatEvent", trafficCfg: &traffic.UDPPingConfig{ // have it span at several IPFIX reports PacketCount: 55, @@ -84,50 +84,51 @@ func describeIPFIX(title string, mode framework.UPGMode, ipMode framework.UPGIPM protocol: layers.IPProtocolUDP, expectedTrafficPort: 12345, }) - v.verifyIPFIXDefaultRecords() + v.verifyIPFIXNatEventRecords() }) ginkgo.It("doesn't recreate templates with different IDs unnecessarily", func() { v.verifyIPFIX(ipfixVerifierCfg{ - farTemplate: "default", + farTemplate: "NatEvent", trafficCfg: smallVolumeHTTPConfig(nil), protocol: layers.IPProtocolTCP, expectedTrafficPort: 80, }) - v.verifyIPFIXDefaultRecords() + v.verifyIPFIXNatEventRecords() ids := v.ipfixHandler.getTemplateIDs() v.runSession(ipfixVerifierCfg{ - farTemplate: "default", + farTemplate: "NatEvent", trafficCfg: smallVolumeHTTPConfig(nil), protocol: layers.IPProtocolTCP, expectedTrafficPort: 80, }) + time.Sleep(2 * time.Second) // just in case wait for flushing and interval framework.ExpectEqual(v.ipfixHandler.getTemplateIDs(), ids, "registered template IDs") }) }) - ginkgo.Context("dest template", func() { + ginkgo.Context("FlowUsage template", func() { v.withIPFIXHandler() ginkgo.It("sends IPFIX reports as requested [TCP]", func() { v.verifyIPFIX(ipfixVerifierCfg{ - farTemplate: "dest", + farTemplate: "FlowUsage", trafficCfg: smallVolumeHTTPConfig(nil), protocol: layers.IPProtocolTCP, expectedTrafficPort: 80, }) - v.verifyIPFIXDestRecords() + v.verifyIPFIXFlowUsageRecords() }) ginkgo.It("sends IPFIX reports as requested [UDP]", func() { v.verifyIPFIX(ipfixVerifierCfg{ - farTemplate: "dest", + farTemplate: "FlowUsage", trafficCfg: &traffic.UDPPingConfig{}, protocol: layers.IPProtocolUDP, expectedTrafficPort: 12345, }) - v.verifyIPFIXDestRecords() + v.verifyIPFIXFlowUsageRecords() }) }) @@ -149,7 +150,7 @@ func describeIPFIX(title string, mode framework.UPGMode, ipMode framework.UPGIPM ginkgo.Context("FAR override", func() { f := framework.NewDefaultFramework(mode, ipMode) v := &ipfixVerifier{f: f} - v.withNWIIPFIXPolicy("default") + v.withNWIIPFIXPolicy("NatEvent") // Templates 256 and 257 are expected early because IPFIX policy // is specified per NWI v.withIPFIXHandler() @@ -165,41 +166,41 @@ func describeIPFIX(title string, mode framework.UPGMode, ipMode framework.UPGIPM }) }) - ginkgo.Context("default template", func() { + ginkgo.Context("NatEvent template", func() { f := framework.NewDefaultFramework(mode, ipMode) v := &ipfixVerifier{f: f} - v.withNWIIPFIXPolicy("default") + v.withNWIIPFIXPolicy("NatEvent") // Templates 256 and 257 are expected early because IPFIX policy // is specified per NWI v.withIPFIXHandler() ginkgo.It("sends IPFIX reports as requested [TCP]", func() { v.verifyIPFIX(tcpCfg) - v.verifyIPFIXDefaultRecords() + v.verifyIPFIXNatEventRecords() }) ginkgo.It("sends IPFIX reports as requested [UDP]", func() { v.verifyIPFIX(udpCfg) - v.verifyIPFIXDefaultRecords() + v.verifyIPFIXNatEventRecords() }) }) - ginkgo.Context("dest template", func() { + ginkgo.Context("FlowUsage template", func() { f := framework.NewDefaultFramework(mode, ipMode) v := &ipfixVerifier{f: f} - v.withNWIIPFIXPolicy("dest") + v.withNWIIPFIXPolicy("FlowUsage") // Templates 256 and 257 are expected early because IPFIX policy // is specified per NWI v.withIPFIXHandler() ginkgo.It("sends IPFIX reports as requested [TCP]", func() { v.verifyIPFIX(tcpCfg) - v.verifyIPFIXDestRecords() + v.verifyIPFIXFlowUsageRecords() }) ginkgo.It("sends IPFIX reports as requested [UDP]", func() { v.verifyIPFIX(udpCfg) - v.verifyIPFIXDestRecords() + v.verifyIPFIXFlowUsageRecords() }) }) @@ -209,7 +210,7 @@ func describeIPFIX(title string, mode framework.UPGMode, ipMode framework.UPGIPM ginkgo.Context("[NAT fields]", func() { f := framework.NewDefaultFramework(mode, ipMode) v := &ipfixVerifier{f: f} - v.withNWIIPFIXPolicy("default") + v.withNWIIPFIXPolicy("NatEvent") v.withReportingInterval(5) v.withIPFIXHandler() @@ -232,7 +233,7 @@ func describeIPFIX(title string, mode framework.UPGMode, ipMode framework.UPGIPM postNATSourceIPv4Address: framework.MustParseIP("144.0.0.20").To4(), postNAPTSourceTransportPort: 10128, }) - v.verifyIPFIXDefaultRecords() + v.verifyIPFIXNatEventRecords() v.verifyNAT() }) }) @@ -254,7 +255,7 @@ func describeIPFIX(title string, mode framework.UPGMode, ipMode framework.UPGIPM protocol: layers.IPProtocolTCP, expectedTrafficPort: 80, }) - v.verifyIPFIXDefaultRecords() + v.verifyIPFIXNatEventRecords() }) }) @@ -280,28 +281,27 @@ func describeIPFIX(title string, mode framework.UPGMode, ipMode framework.UPGIPM f := framework.NewDefaultFramework(mode, ipMode) v := &ipfixVerifier{f: f} v.withForwardingPolicy("altIP") - v.withNWIIPFIXPolicy("dest") + v.withNWIIPFIXPolicy("FlowUsage") v.withIPFIXHandler() ginkgo.It("records forwarding policy name in VRFname", func() { v.verifyIPFIX(ipfixVerifierCfg{ - trafficCfg: smallVolumeHTTPConfig(nil), - protocol: layers.IPProtocolTCP, - expectedTrafficPort: 80, - forwardingPolicyID: "altIP", - expectedOriginVRFName: "altIP", - expectedReverseVRFName: "altIP", + trafficCfg: smallVolumeHTTPConfig(nil), + protocol: layers.IPProtocolTCP, + expectedTrafficPort: 80, + forwardingPolicyID: "altIP", + expectedUplinkVRFName: "altIP", }) - v.verifyIPFIXDestRecords() + v.verifyIPFIXFlowUsageRecords() }) }) ginkgo.Context("reporting interval", func() { f := framework.NewDefaultFramework(mode, ipMode) v := &ipfixVerifier{f: f} - v.withNWIIPFIXPolicy("dest") + v.withNWIIPFIXPolicy("FlowUsage") v.withIPFIXHandler() - const INTERVAL = 7 // 7 seconds so we receive ipfix flow description - const MIN_REPORTS_FOR_CHECK = 4 + const INTERVAL = 3 + const MIN_REPORTS_FOR_CHECK = 5 v.withReportingInterval(INTERVAL) ginkgo.It("can be set via NWI", func() { trafficCfg := smallVolumeHTTPConfig(nil) @@ -313,7 +313,7 @@ func describeIPFIX(title string, mode framework.UPGMode, ipMode framework.UPGIPM protocol: layers.IPProtocolTCP, expectedTrafficPort: 80, }) - v.verifyIPFIXDestRecords() + v.verifyIPFIXFlowUsageRecords() v.verifyReportingInterval(INTERVAL) }) }) @@ -330,8 +330,7 @@ type ipfixVerifierCfg struct { postNAPTSourceTransportPort uint16 adf bool forwardingPolicyID string - expectedOriginVRFName string - expectedReverseVRFName string + expectedUplinkVRFName string noTemplates bool } @@ -339,10 +338,8 @@ type ipfixVerifier struct { f *framework.Framework ipfixHandler *ipfixHandler beginTS time.Time - ulStartTS time.Time - ulEndTS time.Time - dlStartTS time.Time - dlEndTS time.Time + startTS time.Time + endTS time.Time seid pfcp.SEID ms *pfcp.PFCPMeasurement collectorIP net.IP @@ -401,10 +398,8 @@ func (v *ipfixVerifier) withIPFIXHandler() { ) v.ipfixHandler = setupIPFIX(v.f, v.collectorIP) v.beginTS = time.Now() - v.ulStartTS = time.Time{} - v.ulEndTS = v.beginTS - v.dlStartTS = time.Time{} - v.dlEndTS = v.beginTS + v.startTS = time.Time{} + v.endTS = v.beginTS }) ginkgo.AfterEach(func() { @@ -446,7 +441,7 @@ func (v *ipfixVerifier) withExtraExporter() { func (v *ipfixVerifier) withAltCollector() { v.modifySGi(func(nwiCfg *vpp.NWIConfig) { - nwiCfg.IPFIXPolicy = "default" + nwiCfg.IPFIXPolicy = "NatEvent" nwiCfg.GetIPFIXCollectorIP = v.getCollectorIP }) } @@ -470,12 +465,22 @@ func (v *ipfixVerifier) runSession(cfg ipfixVerifierCfg) { if cfg.forwardingPolicyID != "" { cfg.trafficCfg.AddServerIP(v.altServerIP.IP) } - runTrafficGen(v.f, cfg.trafficCfg, &traffic.PreciseTrafficRec{}) - if !v.cfg.noTemplates { - gomega.Eventually(v.ipfixHandler.getTemplateIDs, 10*time.Second, time.Second). + + if !cfg.noTemplates { + // VPP ipfix plugin has loop which executes every 5 seconds, + // only after at least once this loop iterates - new template interval will be used + const VPP_IPFIX_TEMPLATE_REACTION_TIME = 20 * time.Second + + // After session creation vpp detects that far has template id + // and should start broadcast corresponding template + gomega.Eventually(v.ipfixHandler.getTemplateIDs, VPP_IPFIX_TEMPLATE_REACTION_TIME, time.Second). ShouldNot(gomega.BeEmpty()) } + + runTrafficGen(v.f, cfg.trafficCfg, &traffic.PreciseTrafficRec{}) + v.ms = deleteSession(v.f, v.seid, true) + // Wait a bit for all the reports to arrive // FIXME: actually, we should check IPFIX report results // via Eventually(), but that's a bit too much trouble for now, @@ -499,12 +504,6 @@ func (v *ipfixVerifier) verifyNoRecs() { gomega.Expect(v.recs).To(gomega.BeEmpty()) } -func (v *ipfixVerifier) verifyIPFIXStart() { - // make sure the first report is not sent out immediately - t := v.ipfixHandler.getFirstReportTS().Sub(v.beginTS) - gomega.Expect(t.Seconds()).To(gomega.BeNumerically(">=", 2)) -} - func (v *ipfixVerifier) verifyNAT() { gomega.Expect(len(v.recs)).To(gomega.BeNumerically(">", 2)) // 1st IPFIX record for the flow, NAT44 session create @@ -518,116 +517,60 @@ func (v *ipfixVerifier) verifyNAT() { } } -func (v *ipfixVerifier) verifyIPFIXDefaultRecords() { - v.verifyIPFIXStart() - // total counts not used for now, but kept here in case if they're needed later +func (v *ipfixVerifier) verifyIPFIXSharedRecords() { var ulPacketCount, dlPacketCount, ulOctets, dlOctets uint64 - // var initiatorPackets, responderPackets uint64 - // var initiatorOctets, responderOctets uint64 - var clientPort uint16 + + srcAddressKey := "sourceIPv4Address" + dstAddressKey := "destinationIPv4Address" + if v.f.IPMode == framework.UPGIPModeV6 { + srcAddressKey = "sourceIPv6Address" + dstAddressKey = "destinationIPv6Address" + } + + serverIP := v.f.ServerIP() + if v.altServerIP != nil { + serverIP = v.altServerIP.IP + } + for _, r := range v.recs { - // The record looks like: - // mobileIMSI: 313460000000001 - // packetTotalCount: 80 - // flowStartMilliseconds: 2022-02-22 02:30:32.097 +0000 UTC - // flowEndMilliseconds: 2022-02-22 02:30:47.152 +0000 UTC - // sourceIPv4Address: 10.1.0.3 - // destinationIPv4Address: 10.0.1.3 - // protocolIdentifier: 6 - // octetTotalCount: 4262 - // sourceTransportPort: 36960 - // destinationTransportPort: 80 - gomega.Expect(r).To(gomega.HaveKeyWithValue("mobileIMSI", "313460000000001")) - // gomega.Expect(r).To(gomega.HaveKey("packetTotalCount")) + gomega.Expect(r).To(gomega.HaveKeyWithValue("observationDomainId", uint32(42))) + gomega.Expect(r).To(gomega.HaveKey("flowStartMilliseconds")) gomega.Expect(r).To(gomega.HaveKey("flowEndMilliseconds")) gomega.Expect(r["flowEndMilliseconds"]). To(gomega.BeTemporally(">=", r["flowStartMilliseconds"].(time.Time)), "flowEndMilliseconds >= flowStartMilliseconds") + gomega.Expect(r).To(gomega.HaveKeyWithValue("protocolIdentifier", uint8(v.cfg.protocol))) - srcAddressKey := "sourceIPv4Address" - dstAddressKey := "destinationIPv4Address" - if v.f.IPMode == framework.UPGIPModeV6 { - srcAddressKey = "sourceIPv6Address" - dstAddressKey = "destinationIPv6Address" - } gomega.Expect(r).To(gomega.HaveKey(srcAddressKey)) gomega.Expect(r).To(gomega.HaveKey(dstAddressKey)) - // For now, we're using octetDeltaCount / packetDeltaCount - // values instead of initator.../responder... fields. - // Unlike initiator/responder, these depend on the - // direction of the flow - // gomega.Expect(r).To(gomega.HaveKey("initiatorPackets")) - // gomega.Expect(r).To(gomega.HaveKey("responderPackets")) - // gomega.Expect(r).To(gomega.HaveKey("initiatorOctets")) - // gomega.Expect(r).To(gomega.HaveKey("responderOctets")) - // initiatorPackets += r["initiatorPackets"].(uint64) - // responderPackets += r["responderPackets"].(uint64) - // initiatorOctets += r["initiatorOctets"].(uint64) - // responderOctets += r["responderOctets"].(uint64) - gomega.Expect(r).To(gomega.HaveKey("packetDeltaCount")) - gomega.Expect(r).To(gomega.HaveKey("octetDeltaCount")) - - gomega.Expect(r).To(gomega.HaveKeyWithValue("observationDomainId", uint32(42))) + gomega.Expect(r).To(gomega.HaveKey("initiatorPackets")) + gomega.Expect(r).To(gomega.HaveKey("responderPackets")) + gomega.Expect(r).To(gomega.HaveKey("initiatorOctets")) + gomega.Expect(r).To(gomega.HaveKey("responderOctets")) - if r[srcAddressKey].(net.IP).Equal(v.f.UEIP()) { - // upload - if v.ulStartTS.IsZero() { - v.ulStartTS = r["flowStartMilliseconds"].(time.Time) - // FIXME: should be working (wrong time on the VPP side?) - // gomega.Expect(ulStartTS).To(gomega.BeTemporally(">=", beginTS)) - } else { - gomega.Expect(r["flowStartMilliseconds"]).To(gomega.Equal(v.ulStartTS)) - } - gomega.Expect(r["flowEndMilliseconds"]).To(gomega.BeTemporally(">=", v.ulEndTS)) - v.ulEndTS = r["flowEndMilliseconds"].(time.Time) - gomega.Expect(r[dstAddressKey].(net.IP).Equal(v.f.ServerIP())).To(gomega.BeTrue()) - // gomega.Expect(r["packetTotalCount"]).To(gomega.BeNumerically(">=", ulPacketCount)) - ulPacketCount += r["packetDeltaCount"].(uint64) - // gomega.Expect(r["octetTotalCount"]).To(gomega.BeNumerically(">=", ulOctets)) - ulOctets += r["octetDeltaCount"].(uint64) - gomega.Expect(r["destinationTransportPort"]).To(gomega.Equal(v.cfg.expectedTrafficPort)) - if clientPort == 0 { - clientPort = r["sourceTransportPort"].(uint16) - } else { - gomega.Expect(r["sourceTransportPort"]).To(gomega.Equal(clientPort)) - } - // gomega.Expect(r["flowDirection"]).To(gomega.Equal(uint8(1))) // egress flow - if v.cfg.postNATSourceIPv4Address != nil { - gomega.Expect(r["postNATSourceIPv4Address"]). - To(gomega.Equal(v.cfg.postNATSourceIPv4Address)) - } - if v.cfg.postNAPTSourceTransportPort != 0 { - gomega.Expect(r["postNAPTSourceTransportPort"]). - To(gomega.Equal(v.cfg.postNAPTSourceTransportPort)) - } + if v.startTS.IsZero() { + v.startTS = r["flowStartMilliseconds"].(time.Time) + // FIXME: should be working (wrong time on the VPP side?) + // gomega.Expect(ulStartTS).To(gomega.BeTemporally(">=", beginTS)) } else { - // download - if v.dlStartTS.IsZero() { - v.dlStartTS = r["flowStartMilliseconds"].(time.Time) - // FIXME: should be working (wrong time on the VPP side?) - // gomega.Expect(dlStartTS).To(gomega.BeTemporally(">=", beginTS)) - } else { - gomega.Expect(r["flowStartMilliseconds"]).To(gomega.Equal(v.dlStartTS)) - } - gomega.Expect(r["flowEndMilliseconds"]).To(gomega.BeTemporally(">=", v.dlEndTS)) - v.dlEndTS = r["flowEndMilliseconds"].(time.Time) - gomega.Expect(r[srcAddressKey].(net.IP).Equal(v.f.ServerIP())).To(gomega.BeTrue()) - gomega.Expect(r[dstAddressKey].(net.IP).Equal(v.f.UEIP())).To(gomega.BeTrue()) - // gomega.Expect(r["packetTotalCount"]).To(gomega.BeNumerically(">=", dlPacketCount)) - dlPacketCount += r["packetDeltaCount"].(uint64) - // gomega.Expect(r["octetTotalCount"]).To(gomega.BeNumerically(">=", dlOctets)) - dlOctets += r["octetDeltaCount"].(uint64) - gomega.Expect(r["sourceTransportPort"]).To(gomega.Equal(v.cfg.expectedTrafficPort)) - if clientPort == 0 { - clientPort = r["destinationTransportPort"].(uint16) - } else { - gomega.Expect(r["destinationTransportPort"]).To(gomega.Equal(clientPort)) - } - // gomega.Expect(r["flowDirection"]).To(gomega.Equal(uint8(0))) // ingress flow + gomega.Expect(r["flowStartMilliseconds"]).To(gomega.Equal(v.startTS)) } + + gomega.Expect(r["flowEndMilliseconds"]).To(gomega.BeTemporally(">=", v.endTS)) + v.endTS = r["flowEndMilliseconds"].(time.Time) + + // verify ips + gomega.Expect(r[srcAddressKey].(net.IP).Equal(v.f.UEIP())).To(gomega.BeTrue()) + gomega.Expect(r[dstAddressKey].(net.IP).Equal(serverIP)).To(gomega.BeTrue()) + + // collect stats + ulPacketCount += r["initiatorPackets"].(uint64) + ulOctets += r["initiatorOctets"].(uint64) + dlPacketCount += r["responderPackets"].(uint64) + dlOctets += r["responderOctets"].(uint64) } gomega.Expect(ulPacketCount).To(gomega.Equal(*v.ms.Reports[1][0].UplinkPacketCount), "uplink packet count") @@ -636,124 +579,69 @@ func (v *ipfixVerifier) verifyIPFIXDefaultRecords() { // gomega.Expect(responderPackets).To(gomega.Equal(*v.ms.Reports[1][0].DownlinkPacketCount), "responderPackets") gomega.Expect(ulOctets).To(gomega.Equal(*v.ms.Reports[1][0].UplinkVolume), "uplink volume") gomega.Expect(dlOctets).To(gomega.Equal(*v.ms.Reports[1][0].DownlinkVolume), "downlink volume") - - // l4UL, l4DL := getL4TrafficCountsFromCapture(v.f, v.cfg.protocol, nil) - // gomega.Expect(initiatorOctets).To(gomega.Equal(l4UL), "initiatorOctets") - // gomega.Expect(responderOctets).To(gomega.Equal(l4DL), "responderOctets") } -func (v *ipfixVerifier) verifyIPFIXDestRecords() { - v.verifyIPFIXStart() - // total counts not used for now, but kept here in case if they're needed later - var ulOctets, dlOctets uint64 - // var initiatorOctets, responderOctets uint64 +func (v *ipfixVerifier) verifyIPFIXNatEventRecords() { + var clientPort uint16 + for _, r := range v.recs { - gomega.Expect(r).To(gomega.HaveKey("flowEndMilliseconds")) + gomega.Expect(r).To(gomega.HaveKeyWithValue("mobileIMSI", "313460000000001")) - srcAddressKey := "sourceIPv4Address" - dstAddressKey := "destinationIPv4Address" - if v.f.IPMode == framework.UPGIPModeV6 { - srcAddressKey = "sourceIPv6Address" - dstAddressKey = "destinationIPv6Address" - } - gomega.Expect(r).To(gomega.HaveKey(srcAddressKey)) - gomega.Expect(r).To(gomega.HaveKey(dstAddressKey)) - gomega.Expect(r).To(gomega.HaveKey("flowDirection")) - // For now, we're using octetDeltaCount - // values instead of initator.../responder... fields. - // Unlike initiator/responder, these depend on the - // direction of the flow - // gomega.Expect(r).To(gomega.HaveKey("initiatorOctets")) - // gomega.Expect(r).To(gomega.HaveKey("responderOctets")) - // initiatorOctets += r["initiatorOctets"].(uint64) - // responderOctets += r["responderOctets"].(uint64) - gomega.Expect(r).To(gomega.HaveKey("octetDeltaCount")) - gomega.Expect(r).To(gomega.HaveKey("ingressVRFID")) - gomega.Expect(r).To(gomega.HaveKey("egressVRFID")) - originVRFName := "ipv4-VRF:200" - reverseVRFName := "ipv4-VRF:100" - if v.f.IPMode == framework.UPGIPModeV6 { - originVRFName = "ipv6-VRF:200" - reverseVRFName = "ipv6-VRF:100" + gomega.Expect(r).To(gomega.HaveKey("initiatorPackets")) + gomega.Expect(r).To(gomega.HaveKey("responderPackets")) + gomega.Expect(r).To(gomega.HaveKey("initiatorOctets")) + gomega.Expect(r).To(gomega.HaveKey("responderOctets")) + + // verify ports + gomega.Expect(r["destinationTransportPort"]).To(gomega.Equal(v.cfg.expectedTrafficPort)) + if clientPort == 0 { + clientPort = r["sourceTransportPort"].(uint16) + } else { + gomega.Expect(r["sourceTransportPort"]).To(gomega.Equal(clientPort)) } - if v.cfg.expectedOriginVRFName != "" { - originVRFName = v.cfg.expectedOriginVRFName + + // verify nat + if v.cfg.postNATSourceIPv4Address != nil { + gomega.Expect(r["postNATSourceIPv4Address"]). + To(gomega.Equal(v.cfg.postNATSourceIPv4Address)) } - if v.cfg.expectedReverseVRFName != "" { - reverseVRFName = v.cfg.expectedReverseVRFName + if v.cfg.postNAPTSourceTransportPort != 0 { + gomega.Expect(r["postNAPTSourceTransportPort"]). + To(gomega.Equal(v.cfg.postNAPTSourceTransportPort)) } + } + + v.verifyIPFIXSharedRecords() +} +func (v *ipfixVerifier) verifyIPFIXFlowUsageRecords() { + uplinkVRFName := "ipv4-VRF:200" + if v.f.IPMode == framework.UPGIPModeV6 { + uplinkVRFName = "ipv6-VRF:200" + } + if v.cfg.expectedUplinkVRFName != "" { + uplinkVRFName = v.cfg.expectedUplinkVRFName + } + + for _, r := range v.recs { gomega.Expect(r).To(gomega.HaveKey("interfaceName")) - gomega.Expect(r).To(gomega.HaveKeyWithValue("observationDomainId", uint32(42))) gomega.Expect(r).To(gomega.HaveKeyWithValue("observationDomainName", "test-domain")) gomega.Expect(r).To(gomega.HaveKeyWithValue("observationPointId", uint64(4242))) - if r[srcAddressKey].(net.IP).Equal(v.f.UEIP()) { - // upload - gomega.Expect(r["flowEndMilliseconds"]).To(gomega.BeTemporally(">=", v.ulEndTS)) - v.ulEndTS = r["flowEndMilliseconds"].(time.Time) - expectedEgressVRFID := uint32(200) - serverIP := v.f.ServerIP() - if v.altServerIP != nil { - serverIP = v.altServerIP.IP - expectedEgressVRFID = 201 - if v.f.IPMode == framework.UPGIPModeV6 { - expectedEgressVRFID = 301 - } - } - gomega.Expect(r[dstAddressKey].(net.IP).Equal(serverIP)).To(gomega.BeTrue()) - gomega.Expect(r["flowDirection"]).To(gomega.Equal(uint8(1))) // egress flow - gomega.Expect(r["ingressVRFID"]).To(gomega.Equal(uint32(100))) - gomega.Expect(r["egressVRFID"]).To(gomega.Equal(expectedEgressVRFID)) - gomega.Expect(r["VRFname"]).To(gomega.Equal(originVRFName)) - gomega.Expect(r["interfaceName"]).To(gomega.Equal("sgi0")) - ulOctets += r["octetDeltaCount"].(uint64) - } else { - // download - gomega.Expect(r["flowEndMilliseconds"]).To(gomega.BeTemporally(">=", v.dlEndTS)) - v.dlEndTS = r["flowEndMilliseconds"].(time.Time) - serverIP := v.f.ServerIP() - if v.altServerIP != nil { - serverIP = v.altServerIP.IP - } - gomega.Expect(r[srcAddressKey].(net.IP).Equal(serverIP)).To(gomega.BeTrue()) - gomega.Expect(r[dstAddressKey].(net.IP).Equal(v.f.UEIP())).To(gomega.BeTrue()) - gomega.Expect(r["flowDirection"]).To(gomega.Equal(uint8(0))) // ingress flow - gomega.Expect(r["ingressVRFID"]).To(gomega.Equal(uint32(200))) - gomega.Expect(r["egressVRFID"]).To(gomega.Equal(uint32(100))) - gomega.Expect(r["VRFname"]).To(gomega.Equal(reverseVRFName)) - expectedIfName := "access0" - if v.f.Mode == framework.UPGModePGW { - expectedIfName = "grx0" - } - gomega.Expect(r["interfaceName"]).To(gomega.Equal(expectedIfName)) - dlOctets += r["octetDeltaCount"].(uint64) - } + gomega.Expect(r["VRFname"]).To(gomega.Equal(uplinkVRFName)) + gomega.Expect(r["interfaceName"]).To(gomega.Equal("sgi0")) } - // l4UL, l4DL := getL4TrafficCountsFromCapture(v.f, v.cfg.protocol, nil) - // gomega.Expect(initiatorOctets).To(gomega.Equal(l4UL), "initiatorOctets") - // gomega.Expect(responderOctets).To(gomega.Equal(l4DL), "responderOctets") - - gomega.Expect(ulOctets).To(gomega.Equal(*v.ms.Reports[1][0].UplinkVolume), "uplink volume") - gomega.Expect(dlOctets).To(gomega.Equal(*v.ms.Reports[1][0].DownlinkVolume), "downlink volume") + v.verifyIPFIXSharedRecords() } func (v *ipfixVerifier) verifyReportingInterval(expectedSeconds int) { - var ingressTimes, egressTimes []time.Time + var times []time.Time for _, r := range v.recs { - dir := r["flowDirection"].(uint8) - ts := r["ts"].(time.Time) - if dir == 0 { - ingressTimes = append(ingressTimes, ts) - } else { - framework.ExpectEqual(dir, uint8(1)) - egressTimes = append(egressTimes, ts) - } + times = append(times, r["ts"].(time.Time)) } atLeastMs := uint64(expectedSeconds) * 1000 - verifyIntervals(ingressTimes, atLeastMs) - verifyIntervals(egressTimes, atLeastMs) + verifyIntervals(times, atLeastMs) } func verifyIntervals(times []time.Time, atLeastMs uint64) { diff --git a/test/e2e/upg_e2e.go b/test/e2e/upg_e2e.go index 98ccfdc..3cec221 100644 --- a/test/e2e/upg_e2e.go +++ b/test/e2e/upg_e2e.go @@ -1011,7 +1011,7 @@ var _ = ginkgo.Describe("UPG Binary API", func() { Nwi: util.EncodeFQDN("testing"), IP4TableID: 42000, IP6TableID: 42001, - IpfixPolicy: []byte("default"), + IpfixPolicy: []byte("NatEvent"), IpfixCollectorIP: ip, IpfixReportInterval: uint32(7), ObservationDomainID: uint32(42), @@ -1038,7 +1038,7 @@ var _ = ginkgo.Describe("UPG Binary API", func() { gomega.Expect(msg.IP4TableID).To(gomega.Equal(uint32(42000))) gomega.Expect(msg.IP6TableID).To(gomega.Equal(uint32(42001))) ipfixPolicy := string(bytes.Trim(msg.IpfixPolicy, "\x00")) - gomega.Expect(ipfixPolicy).To(gomega.Equal("default")) + gomega.Expect(ipfixPolicy).To(gomega.Equal("NatEvent")) gomega.Expect(msg.IpfixCollectorIP.String()).To(gomega.Equal("192.168.42.1")) gomega.Expect(msg.IpfixReportInterval).To(gomega.Equal(uint32(7))) gomega.Expect(msg.ObservationDomainID).To(gomega.Equal(uint32(42))) @@ -1053,7 +1053,7 @@ var _ = ginkgo.Describe("UPG Binary API", func() { gomega.Expect(err).NotTo(gomega.HaveOccurred()) gomega.Expect(out).To(gomega.ContainSubstring( "testing, ip4-table-id 42000, ip6-table-id 42001, " + - "ipfix-policy default, ipfix-collector-ip 192.168.42.1")) + "ipfix-policy NatEvent, ipfix-collector-ip 192.168.42.1")) req.Add = 0 reply = &upf.UpfNwiAddDelReply{} diff --git a/upf/flowtable.c b/upf/flowtable.c index 343d463..34a5042 100644 --- a/upf/flowtable.c +++ b/upf/flowtable.c @@ -50,7 +50,7 @@ flow_entry_free (flowtable_main_t *fm, flowtable_main_per_cpu_t *fmt, pool_put (fm->flows, f); } -int upf_ipfix_flow_remove_handler (flow_entry_t *f, u32 now); +void upf_ipfix_flow_remove_handler (flow_entry_t *f, u32 now); int upf_proxy_flow_remove_handler (flow_entry_t *flow); int flow_remove_counter_handler (flowtable_main_t *fm, flow_entry_t *flow); int session_flow_unlink_handler (flowtable_main_t *fm, flow_entry_t *flow); @@ -228,8 +228,6 @@ flowtable_entry_init_side (flow_side_t *side, u32 now) // valid teid, but not 0 side->teid = ~0; side->next = FT_NEXT_CLASSIFY; - side->ipfix.last_exported = now; - side->ipfix.info_index = ~0; side->tcp.conn_index = ~0; side->tcp.thread_index = ~0; } @@ -267,12 +265,15 @@ flowtable_entry_lookup_create (flowtable_main_t *fm, f->lifetime = flowtable_lifetime_calculate (fm, &f->key); f->active = now; f->flow_start_time = timestamp_ns; - f->flow_end_time = timestamp_ns; + f->flow_last_time = timestamp_ns; f->application_id = ~0; f->cpu_index = os_get_thread_index (); f->generation = generation; f->ps_index = ~0; f->timer_slot = ~0; + f->uplink_direction = FLOW_ENTRY_UPLINK_DIRECTION_UNDEFINED; + f->ipfix.next_export_at = 1; // do export immediately + f->ipfix.context_index = ~0; flowtable_entry_init_side (flow_side (f, FT_ORIGIN), now); flowtable_entry_init_side (flow_side (f, FT_REVERSE), now); diff --git a/upf/flowtable.h b/upf/flowtable.h index cd6873e..123f32f 100644 --- a/upf/flowtable.h +++ b/upf/flowtable.h @@ -119,8 +119,6 @@ typedef struct u32 pkts_unreported; u64 bytes; u64 bytes_unreported; - u64 l4_bytes; - u64 l4_bytes_unreported; } flow_side_stats_t; typedef enum @@ -142,16 +140,20 @@ typedef struct flow_side_tcp_t_ u32 tsval_offs; } flow_side_tcp_t; -typedef struct flow_side_ipfix_t_ +typedef struct flow_ipfix_t_ { - u32 last_exported; - u32 info_index; -} flow_side_ipfix_t; + u32 next_export_at; // in seconds, zero means no intermediate reporting + u16 context_index; + u16 forwarding_policy_index; + // up_dst means "upload destination" + u16 up_dst_nwi_index; + u16 up_dst_sw_if_index; + u32 up_dst_fib_index; +} flow_ipfix_t; typedef struct flow_side_t_ { flow_side_stats_t stats; - flow_side_ipfix_t ipfix; flow_side_tcp_t tcp; u32 pdr_id; @@ -171,6 +173,7 @@ typedef struct flow_entry // key elements indexes are flow_direction_t key_directioned_t key; u32 session_index; + u8 key_direction : 1; // flow_direction_op_t of bihash key u8 is_redirect : 1; u8 is_l3_proxy : 1; @@ -178,8 +181,18 @@ typedef struct flow_entry u8 spliced_dirty : 1; u8 dont_splice : 1; u8 app_detection_done : 1; - u8 exported : 1; - u8 tcp_state; + u8 ipfix_exported : 1; // exported at least once + + u8 tcp_state : 4; // TODO: needs only 3 bits? + // should be updated in classify and based on PDR during flow creation + u8 uplink_direction : 2; + // do not perform ipfix operations for this flow anymore + u8 ipfix_disabled : 1; + + // use macro since unsigned will not expand to ~0 + // direction impossible to detect or not yet detected +#define FLOW_ENTRY_UPLINK_DIRECTION_UNDEFINED (0b11) + u32 ps_index; /* timers */ @@ -190,13 +203,14 @@ typedef struct flow_entry // elements indexes are flow_direction_t flow_side_t side[FT_DIRECTION_MAX]; + flow_ipfix_t ipfix; u32 application_id; /* L7 app index */ u8 *app_uri; u64 flow_start_time; /* unix nanoseconds */ - u64 flow_end_time; /* unix nanoseconds */ + u64 flow_last_time; /* unix nanoseconds */ session_flows_list_anchor_t session_list_anchor; @@ -206,6 +220,9 @@ typedef struct flow_entry u16 nat_sport; } flow_entry_t; +// statically track entry size to prevent increase +STATIC_ASSERT_SIZEOF (flow_entry_t, 5 * 64); + UPF_LLIST_TEMPLATE_DEFINITIONS (session_flows_list, flow_entry_t, session_list_anchor); UPF_LLIST_TEMPLATE_DEFINITIONS (flow_timeout_list, flow_entry_t, timer_anchor); @@ -487,12 +504,9 @@ flow_update (vlib_main_t *vm, flow_entry_t *f, u8 *iph, u8 is_ip4, u16 len, } } -int upf_ipfix_flow_stats_update_handler (flow_entry_t *f, - flow_direction_t direction, u32 now); - __clib_unused always_inline void flow_update_stats (vlib_main_t *vm, vlib_buffer_t *b, flow_entry_t *f, - u8 is_ip4, u64 timestamp_ns, u32 now) + u8 is_ip4, u64 timestamp_ns) { /* * Performance note: @@ -502,34 +516,15 @@ flow_update_stats (vlib_main_t *vm, vlib_buffer_t *b, flow_entry_t *f, flow_direction_t direction = upf_buffer_opaque (b)->gtpu.direction; - u8 *iph = vlib_buffer_get_current (b); - u8 *l4h = (is_ip4 ? ip4_next_header ((ip4_header_t *) iph) : - ip6_next_header ((ip6_header_t *) iph)); - u16 l4_len = len - (l4h - iph); - u16 diff = 0; - - switch (f->key.proto) - { - case IP_PROTOCOL_TCP: - diff = tcp_header_bytes ((tcp_header_t *) l4h); - break; - case IP_PROTOCOL_UDP: - diff = sizeof (udp_header_t); - break; - } - l4_len = l4_len > diff ? l4_len - diff : 0; - flow_side_stats_t *stats = &flow_side (f, direction)->stats; stats->pkts++; stats->pkts_unreported++; stats->bytes += len; stats->bytes_unreported += len; - stats->l4_bytes += l4_len; - stats->l4_bytes_unreported += l4_len; - - f->flow_end_time = timestamp_ns; - upf_ipfix_flow_stats_update_handler (f, direction, now); + f->flow_last_time = timestamp_ns; } +void upf_ipfix_flow_stats_update_handler (flow_entry_t *f, u32 now); + #endif /* __flowtable_h__ */ diff --git a/upf/upf.c b/upf/upf.c index cc0f6d7..3004323 100644 --- a/upf/upf.c +++ b/upf/upf.c @@ -845,7 +845,7 @@ vnet_upf_policy_fn (fib_route_path_t *rpaths, u8 *policy_id, u8 action) pool_get (gtm->upf_forwarding_policies, fp_entry); fib_node_init (&fp_entry->fib_node, upf_policy_fib_node_type); fp_entry->policy_id = vec_dup (policy_id); - fp_entry->rpaths = clib_mem_alloc (sizeof (*fp_entry->rpaths)); + fp_entry->rpaths = 0; fib_path_list_create_and_child_add (fp_entry, rpaths); hash_set_mem (gtm->forwarding_policy_by_id, fp_entry->policy_id, diff --git a/upf/upf.h b/upf/upf.h index 1d38d9e..8f655ee 100644 --- a/upf/upf.h +++ b/upf/upf.h @@ -406,6 +406,16 @@ typedef struct u32 *qer_ids; } upf_pdr_t; +typedef enum +{ + UPF_IPFIX_POLICY_NONE, + UPF_IPFIX_POLICY_NAT_EVENT, + UPF_IPFIX_POLICY_FLOW_USAGE, + UPF_IPFIX_N_POLICIES, + // used only in FAR to indicate "do not override" + UPF_IPFIX_POLICY_UNSPECIFIED = UPF_IPFIX_N_POLICIES +} __clib_packed upf_ipfix_policy_t; + /* Forward Action Rules - Forwarding Parameters */ typedef struct { @@ -459,22 +469,8 @@ typedef struct * We maintain this path as we need to remove old path while policy update */ fib_route_path_t *rpaths; - /** - * Next node index (ip4-rewrite here) - * - */ - u32 forward_index; } upf_forwarding_policy_t; -typedef enum -{ - UPF_IPFIX_POLICY_NONE, - UPF_IPFIX_POLICY_DEFAULT, - UPF_IPFIX_POLICY_DEST, - UPF_IPFIX_N_POLICIES, - UPF_IPFIX_POLICY_UNSPECIFIED = UPF_IPFIX_N_POLICIES -} __clib_packed upf_ipfix_policy_t; - /* Forward Action Rules */ typedef struct { @@ -485,6 +481,7 @@ typedef struct #define FAR_BUFFER 0x0004 #define FAR_NOTIFY_CP 0x0008 #define FAR_DUPLICATE 0x0010 +#define FAR_NAT 0x8000 union { @@ -520,7 +517,9 @@ typedef enum UPF_FLOWS_NOT_STITCHED_TCP_OPS_SACK_PERMIT = 6, UPF_FLOWS_STITCHED_DIRTY_FIFOS = 7, UPF_TIMERS_MISSED = 8, - UPF_N_COUNTERS = 9, + UPF_IPFIX_RECORDS_SENT = 9, + UPF_IPFIX_MESSAGES_SENT = 10, + UPF_N_COUNTERS = 11, } upf_counters_type_t; #define foreach_upf_counter_name \ @@ -532,7 +531,9 @@ typedef enum _ (FLOWS_NOT_STITCHED_TCP_OPS_TIMESTAMP, tcp_ops_tstamp, upf) \ _ (FLOWS_NOT_STITCHED_TCP_OPS_SACK_PERMIT, tcp_ops_sack_permit, upf) \ _ (FLOWS_STITCHED_DIRTY_FIFOS, stitched_dirty_fifos, upf) \ - _ (TIMERS_MISSED, timers_missed, upf) + _ (TIMERS_MISSED, timers_missed, upf) \ + _ (IPFIX_RECORDS_SENT, ipfix_records_sent, upf) \ + _ (IPFIX_MESSAGES_SENT, ipfix_messages_sent, upf) /* TODO: measure if more optimize cache line aware layout * of the counters and quotas has any performance impcat */ @@ -808,6 +809,21 @@ typedef struct u32 mask; } upf_upip_res_t; +typedef struct +{ + /* TODO: this contexts can be used in far instead of + own bihash lookup */ + u32 contexts[FIB_PROTOCOL_IP_MAX][UPF_IPFIX_N_POLICIES]; + + upf_ipfix_policy_t default_policy; + ip_address_t collector_ip; + u32 report_interval; // zero means no intermediate reports + + u32 observation_domain_id; + u64 observation_point_id; + u8 *observation_domain_name; +} upf_nwi_ipfix_t; + typedef struct { u8 *name; @@ -817,15 +833,7 @@ typedef struct u32 sw_if_index; u32 hw_if_index; - upf_ipfix_policy_t ipfix_policy; - ip_address_t ipfix_collector_ip; - u32 ipfix_report_interval; - - u32 observation_domain_id; - u64 observation_point_id; - u8 *observation_domain_name; - - u32 *ipfix_context_indices; + upf_nwi_ipfix_t ipfix; } upf_nwi_t; typedef struct @@ -930,7 +938,7 @@ typedef struct { mhash_t pfcp_endpoint_index; - /* vector of network instances */ + /* pool of network instances */ upf_nwi_t *nwis; uword *nwi_index_by_name; diff --git a/upf/upf_api.c b/upf/upf_api.c index 55b9cc0..f4a0057 100644 --- a/upf/upf_api.c +++ b/upf/upf_api.c @@ -576,7 +576,7 @@ send_upf_nwi_details (vl_api_registration_t *reg, upf_nwi_t *nwi, u32 context) upf_main_t *sm = &upf_main; u32 name_len, ipfix_policy_len, observation_domain_name_len; u8 *ipfix_policy = - format (0, "%U", format_upf_ipfix_policy, nwi->ipfix_policy); + format (0, "%U", format_upf_ipfix_policy, nwi->ipfix.default_policy); name_len = vec_len (nwi->name); mp = vl_msg_api_alloc (sizeof (*mp) + name_len * sizeof (u8)); @@ -595,21 +595,22 @@ send_upf_nwi_details (vl_api_registration_t *reg, upf_nwi_t *nwi, u32 context) mp->ipfix_policy[ipfix_policy_len] = 0; mp->ipfix_report_interval = - clib_host_to_net_u32 (nwi->ipfix_report_interval); + clib_host_to_net_u32 (nwi->ipfix.report_interval); mp->observation_domain_id = - clib_host_to_net_u32 (nwi->observation_domain_id); + clib_host_to_net_u32 (nwi->ipfix.observation_domain_id); observation_domain_name_len = clib_min (sizeof (mp->observation_domain_name) - 1, - vec_len (nwi->observation_domain_name)); - memcpy (mp->observation_domain_name, nwi->observation_domain_name, + vec_len (nwi->ipfix.observation_domain_name)); + memcpy (mp->observation_domain_name, nwi->ipfix.observation_domain_name, observation_domain_name_len); mp->observation_domain_name[observation_domain_name_len] = 0; - mp->observation_point_id = clib_host_to_net_u64 (nwi->observation_point_id); + mp->observation_point_id = + clib_host_to_net_u64 (nwi->ipfix.observation_point_id); memcpy (mp->nwi, nwi->name, name_len); mp->nwi_len = name_len; - ip_address_encode (&ip_addr_46 (&nwi->ipfix_collector_ip), IP46_TYPE_ANY, + ip_address_encode (&ip_addr_46 (&nwi->ipfix.collector_ip), IP46_TYPE_ANY, &mp->ipfix_collector_ip); vl_api_send_msg (reg, (u8 *) mp); diff --git a/upf/upf_classify.c b/upf/upf_classify.c index b2e10a6..f7246a8 100644 --- a/upf/upf_classify.c +++ b/upf/upf_classify.c @@ -428,6 +428,66 @@ upf_acl_classify_return (vlib_main_t *vm, u32 teid, flow_entry_t *flow, return next; } +// returns uplink traffic direction, or FLOW_ENTRY_UPLINK_DIRECTION_UNDEFINED +// if impossible to detect +always_inline flow_direction_t +upf_classify_detect_flow_direction (vlib_buffer_t *b, struct rules *r, + flow_direction_t direction) +{ + // Here we rely on the fact that interface of type ACCESS is one which + // directed to UE + // TODO: it should be possible to calculate and save this value per PDR + // during session creation/modification + + u32 pdr_idx = upf_buffer_opaque (b)->gtpu.pdr_idx; + if (pdr_idx == ~0) + { + upf_debug ("failed to detect flow direction since pdr is missed"); + return FLOW_ENTRY_UPLINK_DIRECTION_UNDEFINED; + } + + upf_pdr_t *pdr = vec_elt_at_index (r->pdr, pdr_idx); + if (!pdr) + { + upf_debug ("failed to detect flow direction since pdr is not found"); + return FLOW_ENTRY_UPLINK_DIRECTION_UNDEFINED; + } + + if (pdr->pdi.src_intf == PFCP_SRC_INTF_ACCESS) + { + upf_debug ("detected uplink direction from pdr: %d matched on %v", + FTD_OP_SAME ^ direction, + pool_elt_at_index (upf_main.nwis, pdr->pdi.nwi_index)->name); + return FTD_OP_SAME ^ direction; + } + + if (pdr->far_id == (u16) ~0) + { + upf_debug ("failed to detect flow direction since far is missed"); + return FLOW_ENTRY_UPLINK_DIRECTION_UNDEFINED; + } + + upf_far_t *far = pfcp_get_far_by_id (r, pdr->far_id); + if (!far) + { + upf_debug ("failed to detect flow direction since far is not found"); + return FLOW_ENTRY_UPLINK_DIRECTION_UNDEFINED; + } + + if (far->forward.dst_intf == PFCP_SRC_INTF_ACCESS) + { + upf_debug ( + "detected uplink direction from far: %d forwarded to %v", + FTD_OP_FLIP ^ direction, + pool_elt_at_index (upf_main.nwis, far->forward.nwi_index)->name); + return FTD_OP_FLIP ^ direction; + } + + upf_debug ( + "failed to detect flow direction since access interface is not used"); + return FLOW_ENTRY_UPLINK_DIRECTION_UNDEFINED; +} + always_inline uword upf_classify_fn (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *from_frame, int is_ip4) @@ -553,6 +613,14 @@ upf_classify_fn (vlib_main_t *vm, vlib_node_runtime_t *node, flow_pdr_idx (flow, direction, active); } + if (flow->uplink_direction == FLOW_ENTRY_UPLINK_DIRECTION_UNDEFINED) + { + flow_direction_t uplink_direction = + upf_classify_detect_flow_direction (b, active, direction); + if (uplink_direction != FLOW_ENTRY_UPLINK_DIRECTION_UNDEFINED) + flow->uplink_direction = uplink_direction; + } + upf_debug ("Next: %u", next); ASSERT (flow_side (flow, FT_ORIGIN)->next != FT_NEXT_PROXY || flow_side (flow, FT_ORIGIN)->pdr_id != ~0); diff --git a/upf/upf_cli.c b/upf/upf_cli.c index 6ad3bbe..9d3734b 100644 --- a/upf/upf_cli.c +++ b/upf/upf_cli.c @@ -551,8 +551,8 @@ upf_show_nwi_command_fn (vlib_main_t *vm, unformat_input_t *main_input, "%U, ipfix-collector-ip %U\n", format_pfcp_dns_labels, nwi->name, fib4->hash.table_id, fib6->table_id, format_upf_ipfix_policy, - nwi->ipfix_policy, format_ip_address, - &nwi->ipfix_collector_ip); + nwi->ipfix.default_policy, format_ip_address, + &nwi->ipfix.collector_ip); } done: diff --git a/upf/upf_forward.c b/upf/upf_forward.c index ba82e2a..3a70f2b 100644 --- a/upf/upf_forward.c +++ b/upf/upf_forward.c @@ -321,8 +321,9 @@ upf_forward (vlib_main_t *vm, vlib_node_runtime_t *node, const char *node_name, { flow = pool_elt_at_index ( fm->flows, upf_buffer_opaque (b)->gtpu.flow_id); - flow_update_stats (vm, b, flow, is_ip4, timestamp_ns, - current_time); + flow_update_stats (vm, b, flow, is_ip4, timestamp_ns); + + upf_ipfix_flow_stats_update_handler (flow, current_time); } } diff --git a/upf/upf_ipfix.c b/upf/upf_ipfix.c index 42305ba..1bb2551 100644 --- a/upf/upf_ipfix.c +++ b/upf/upf_ipfix.c @@ -53,24 +53,21 @@ upf_ipfix_main_t upf_ipfix_main; uword upf_ipfix_walker_process (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f); -static u32 upf_ipfix_ensure_flow_ipfix_info (flow_entry_t *f, - flow_direction_t direction); - static inline ipfix_exporter_t * -upf_ipfix_get_exporter (upf_ipfix_protocol_context_t *context) +upf_ipfix_get_exporter (upf_ipfix_context_t *context) { flow_report_main_t *frm = &flow_report_main; ipfix_exporter_t *exp; - bool use_default = ip46_address_is_zero (&context->key.collector_ip); + + bool use_default = ip_address_is_zero (&context->key.collector_ip); if (context->exporter_index != (u32) ~0 && !pool_is_free_index (frm->exporters, context->exporter_index)) { /* Check if the exporter got replaced */ exp = pool_elt_at_index (frm->exporters, context->exporter_index); - if (use_default || - ip46_address_cmp (&context->key.collector_ip, - &ip_addr_46 (&exp->ipfix_collector)) == 0) + if (use_default || ip_address_cmp (&context->key.collector_ip, + &exp->ipfix_collector) == 0) return exp; } @@ -80,13 +77,7 @@ upf_ipfix_get_exporter (upf_ipfix_protocol_context_t *context) return &frm->exporters[0]; } - ip_address_t addr; - ip_address_from_46 (&context->key.collector_ip, - ip46_address_is_ip4 (&context->key.collector_ip) ? - FIB_PROTOCOL_IP4 : - FIB_PROTOCOL_IP6, - &addr); - exp = vnet_ipfix_exporter_lookup (&addr); + exp = vnet_ipfix_exporter_lookup (&context->key.collector_ip); context->exporter_index = exp ? exp - frm->exporters : (u32) ~0; return exp; @@ -114,27 +105,29 @@ upf_ipfix_template_rewrite (ipfix_exporter_t *exp, flow_report_t *fr, ipfix_template_header_t *t; ipfix_field_specifier_t *f; ipfix_field_specifier_t *first_field; - u8 *rewrite = 0; ip4_ipfix_template_packet_t *tp; - u32 field_count = 0; - flow_report_stream_t *stream; - upf_ipfix_protocol_context_t *context = - pool_elt_at_index (fm->contexts, fr->opaque.as_uword); - upf_ipfix_template_t *template = upf_ipfix_templates + context->key.policy; + u8 *rewrite = 0; + + flow_report_stream_t *stream = &exp->streams[fr->stream_index]; + upf_ipfix_context_t *context = + pool_elt_at_index (fm->contexts, fr->opaque.as_uword); ASSERT (context); - field_count = context->key.is_ip4 ? template->field_count_ipv4 : - template->field_count_ipv6; - ASSERT (field_count); - stream = &exp->streams[fr->stream_index]; + fib_protocol_t fproto = + context->key.is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6; + upf_ipfix_template_t *template = &upf_ipfix_templates[context->key.policy]; + upf_ipfix_template_proto_t *template_proto = &template->per_ip[fproto]; + + ASSERT (template_proto->field_count); /* allocate rewrite space */ - vec_validate_aligned (rewrite, - sizeof (ip4_ipfix_template_packet_t) + - field_count * sizeof (ipfix_field_specifier_t) - 1, - CLIB_CACHE_LINE_BYTES); + vec_validate_aligned ( + rewrite, + sizeof (ip4_ipfix_template_packet_t) + + template_proto->field_count * sizeof (ipfix_field_specifier_t) - 1, + CLIB_CACHE_LINE_BYTES); tp = (ip4_ipfix_template_packet_t *) rewrite; ip = (ip4_header_t *) &tp->ip4; @@ -157,8 +150,7 @@ upf_ipfix_template_rewrite (ipfix_exporter_t *exp, flow_report_t *fr, h->domain_id = clib_host_to_net_u32 (stream->domain_id); /* Add TLVs to the template */ - f = context->key.is_ip4 ? template->add_ip4_fields (f) : - template->add_ip6_fields (f); + f = template_proto->add_fields (f); /* Back to the template packet... */ ip = (ip4_header_t *) &tp->ip4; @@ -181,7 +173,7 @@ upf_ipfix_template_rewrite (ipfix_exporter_t *exp, flow_report_t *fr, vec_len (rewrite)); upf_debug ("n of fields: %u, hdr size %u, part hdr size %u, " "single field spec len %u", - field_count, sizeof (ip4_ipfix_template_packet_t), + template_proto->field_count, sizeof (ip4_ipfix_template_packet_t), sizeof (ipfix_template_packet_t), sizeof (ipfix_field_specifier_t)); ASSERT ((u8 *) f - (u8 *) ip == vec_len (rewrite)); @@ -194,12 +186,11 @@ upf_ipfix_template_rewrite (ipfix_exporter_t *exp, flow_report_t *fr, return rewrite; } -static vlib_buffer_t * -upf_ipfix_get_buffer (vlib_main_t *vm, upf_ipfix_protocol_context_t *context); +static vlib_buffer_t *upf_ipfix_get_buffer (vlib_main_t *vm, + upf_ipfix_context_t *context); static void upf_ipfix_export_send (vlib_main_t *vm, vlib_buffer_t *b0, - upf_ipfix_protocol_context_t *context, - u32 now); + upf_ipfix_context_t *context, u32 now); /** * @brief Flush accumulated data @@ -219,7 +210,7 @@ upf_ipfix_data_callback (flow_report_main_t *frm, ipfix_exporter_t *exp, upf_ipfix_main_t *fm = &upf_ipfix_main; vlib_main_t *vm = vlib_get_main (); u32 now = (u32) vlib_time_now (vm); - upf_ipfix_protocol_context_t *context = + upf_ipfix_context_t *context = pool_elt_at_index (fm->contexts, fr->opaque.as_uword); vlib_buffer_t *b = upf_ipfix_get_buffer (vm, context); if (b) @@ -233,7 +224,8 @@ upf_ipfix_report_add_del (upf_ipfix_main_t *fm, u32 domain_id, u32 context_index, u16 *template_id, bool is_ip4, bool is_add) { - upf_ipfix_protocol_context_t *context = fm->contexts + context_index; + upf_ipfix_context_t *context = + pool_elt_at_index (fm->contexts, context_index); ipfix_exporter_t *exp = upf_ipfix_get_exporter (context); if (!exp) return VNET_API_ERROR_INVALID_VALUE; @@ -248,8 +240,7 @@ upf_ipfix_report_add_del (upf_ipfix_main_t *fm, u32 domain_id, return vnet_flow_report_add_del (exp, &a, template_id); } -static void upf_ipfix_export_entry (vlib_main_t *vm, flow_entry_t *f, - flow_direction_t direction, u32 now, +static void upf_ipfix_export_entry (vlib_main_t *vm, flow_entry_t *f, u32 now, bool last); /* TBD: add trace */ @@ -263,10 +254,11 @@ upf_ipfix_get_headersize (void) static void upf_ipfix_export_send (vlib_main_t *vm, vlib_buffer_t *b0, - upf_ipfix_protocol_context_t *context, u32 now) + upf_ipfix_context_t *context, u32 now) { flow_report_main_t *frm = &flow_report_main; upf_ipfix_main_t *fm = &upf_ipfix_main; + upf_main_t *gtm = &upf_main; ipfix_exporter_t *exp = upf_ipfix_get_exporter (context); vlib_frame_t *f; ip4_ipfix_template_packet_t *tp; @@ -284,8 +276,9 @@ upf_ipfix_export_send (vlib_main_t *vm, vlib_buffer_t *b0, upf_ipfix_get_headersize ()) return; - upf_debug ("export send, context %u", context - fm->contexts); + upf_debug ("ipfix export send, context %u", context - fm->contexts); + /* TODO: WHAT WE DO HERE? */ u32 i, index = vec_len (exp->streams); for (i = 0; i < index; i++) if (exp->streams[i].domain_id == context->key.observation_domain_id) @@ -370,10 +363,14 @@ upf_ipfix_export_send (vlib_main_t *vm, vlib_buffer_t *b0, context->buffers_per_worker[my_cpu_number] = 0; context->next_record_offset_per_worker[my_cpu_number] = upf_ipfix_get_headersize (); + + vlib_increment_simple_counter ( + >m->upf_simple_counters[UPF_IPFIX_MESSAGES_SENT], + vlib_get_thread_index (), 0, 1); } static vlib_buffer_t * -upf_ipfix_get_buffer (vlib_main_t *vm, upf_ipfix_protocol_context_t *context) +upf_ipfix_get_buffer (vlib_main_t *vm, upf_ipfix_context_t *context) { ipfix_exporter_t *exp = upf_ipfix_get_exporter (context); vlib_buffer_t *b0; @@ -406,32 +403,151 @@ upf_ipfix_get_buffer (vlib_main_t *vm, upf_ipfix_protocol_context_t *context) return b0; } +// return bool if initialized +static bool +upf_ipfix_flow_init (flow_entry_t *f) +{ + upf_ipfix_main_t *fm = &upf_ipfix_main; + upf_main_t *gtm = &upf_main; + upf_session_t *sx; + struct rules *active; + upf_pdr_t *up_pdr; + upf_far_t *up_far; + upf_nwi_t *up_dst_nwi; + u32 up_forwarding_policy_index; + + if (f->uplink_direction == FLOW_ENTRY_UPLINK_DIRECTION_UNDEFINED) + return false; + + sx = pool_elt_at_index (gtm->sessions, f->session_index); + active = pfcp_get_rules (sx, PFCP_ACTIVE); + + /* Get uplink PDR,FAR and output NWI */ + + up_pdr = flow_pdr (f, FTK_EL_SRC ^ f->uplink_direction, active); + if (!up_pdr) + return false; + + up_far = pfcp_get_far_by_id (active, up_pdr->far_id); + if (!up_far) + return false; + + if ((up_far->apply_action & FAR_NAT) && f->nat_sport == 0) + return false; + + if (pool_is_free_index (gtm->nwis, up_far->forward.nwi_index)) + return false; + + up_dst_nwi = pool_elt_at_index (gtm->nwis, up_far->forward.nwi_index); + + /* Detect IPFIX policy for this flow */ + + // FAR has priority for policy + upf_ipfix_policy_t ipfix_policy = up_far->ipfix_policy; + if (ipfix_policy == UPF_IPFIX_POLICY_UNSPECIFIED) + ipfix_policy = up_dst_nwi->ipfix.default_policy; + + if (ipfix_policy == UPF_IPFIX_POLICY_NONE) + { + f->ipfix_disabled = 1; + return false; + } + + bool is_ip4 = f->key.is_ip4; + + /* Get IPFIX context for this flow */ + + /* TODO: possible to use cached contexts from nwi to avoid bihash lookup, but + * such approach has cache invalidation issues on reconfiguration */ + upf_ipfix_context_key_t context_key = { 0 }; + ip_address_copy (&context_key.collector_ip, &up_dst_nwi->ipfix.collector_ip); + context_key.observation_domain_id = up_dst_nwi->ipfix.observation_domain_id; + context_key.policy = ipfix_policy; + context_key.is_ip4 = is_ip4; + u32 ipfix_context_index = upf_ipfix_ensure_context (&context_key); + if (ipfix_context_index == ~0) + return false; + + fib_protocol_t fproto = is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6; + + /* Determine forwarding policy index */ + + if (up_far->forward.flags & FAR_F_FORWARDING_POLICY) + up_forwarding_policy_index = up_far->forward.fp_pool_index; + else + up_forwarding_policy_index = ~0; + + /* Determine output interface */ + + u32 up_sw_if_index; + u32 up_fib_index = up_dst_nwi->fib_index[fproto]; + if ((up_far->forward.flags & FAR_F_OUTER_HEADER_CREATION)) + { + up_sw_if_index = up_far->forward.dst_sw_if_index; + } + else + { + if (up_forwarding_policy_index != ~0) + { + fib_route_path_t *rpath; + upf_forwarding_policy_t *fp_entry = pool_elt_at_index ( + gtm->upf_forwarding_policies, up_forwarding_policy_index); + vec_foreach (rpath, fp_entry->rpaths) + { + if (rpath->frp_proto == (is_ip4 ? DPO_PROTO_IP4 : DPO_PROTO_IP6)) + { + up_fib_index = rpath->frp_fib_index; + break; + } + } + } + + up_sw_if_index = upf_ip46_get_resolving_interface ( + up_fib_index, &f->key.ip[FTK_EL_DST ^ f->uplink_direction], is_ip4); + } + + /* Cache detected ipfix values in flow */ + + /* TODO: to avoid caching in future we may use + * flow[uplink]->pdi->far->forwarding_policy instead, but now pdi/far access + * is slow, so do not do this. Same for nwi. */ + f->ipfix.forwarding_policy_index = up_forwarding_policy_index; + f->ipfix.up_dst_nwi_index = up_dst_nwi - gtm->nwis; + + f->ipfix.context_index = ipfix_context_index; + f->ipfix.up_dst_sw_if_index = up_sw_if_index; + f->ipfix.up_dst_fib_index = up_fib_index; + + upf_ipfix_context_t *context = + pool_elt_at_index (fm->contexts, ipfix_context_index); + ASSERT (context->key.is_ip4 == is_ip4); + ASSERT (context->key.policy == ipfix_policy); + + return true; +} + static void -upf_ipfix_export_entry (vlib_main_t *vm, flow_entry_t *f, - flow_direction_t direction, u32 now, bool last) +upf_ipfix_export_entry (vlib_main_t *vm, flow_entry_t *f, u32 now, bool last) { u32 my_cpu_number = vm->thread_index; upf_ipfix_main_t *fm = &upf_ipfix_main; vlib_buffer_t *b0; upf_main_t *gtm = &upf_main; - upf_ipfix_info_t *info; - upf_ipfix_protocol_context_t *context; + upf_ipfix_context_t *context; u16 offset; upf_ipfix_template_t *template; - upf_session_t *sx; - u32 iidx = flow_side (f, direction)->ipfix.info_index; - - if (iidx == (u32) ~0) - return; - - info = pool_elt_at_index (fm->infos, iidx); - context = pool_elt_at_index (fm->contexts, info->context_index); + vnet_main_t *vnm = vnet_get_main (); - ASSERT (f->key.is_ip4 == context->key.is_ip4); + if (f->ipfix.context_index == (u16) ~0) + if (!upf_ipfix_flow_init (f)) + { + // more info needed, or ipfix is not needed for this flow + return; + } + context = pool_elt_at_index (fm->contexts, f->ipfix.context_index); offset = context->next_record_offset_per_worker[my_cpu_number]; - template = upf_ipfix_templates + context->key.policy; - sx = pool_elt_at_index (gtm->sessions, f->session_index); + template = &upf_ipfix_templates[context->key.policy]; upf_debug ("export entry [%s], policy %u", context->key.is_ip4 ? "ip4" : "ip6", context->key.policy); @@ -446,86 +562,105 @@ upf_ipfix_export_entry (vlib_main_t *vm, flow_entry_t *f, return; } - if (context->key.is_ip4) - offset += - template->add_ip4_values (b0, f, direction, offset, sx, info, last); + bool is_ip4 = f->key.is_ip4; + ASSERT (context->key.is_ip4 == is_ip4); + + fib_protocol_t fproto = is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6; + + upf_session_t *sx = pool_elt_at_index (gtm->sessions, f->session_index); + upf_nwi_t *nwi = + pool_elt_at_index (upf_main.nwis, f->ipfix.up_dst_nwi_index); + fib_table_t *up_table = fib_table_get (f->ipfix.up_dst_fib_index, fproto); + + upf_ipfix_report_info_t info; + if (f->ipfix.forwarding_policy_index != (u16) ~0) + { + upf_forwarding_policy_t *fp_entry = pool_elt_at_index ( + gtm->upf_forwarding_policies, f->ipfix.forwarding_policy_index); + info.vrf_name = fp_entry->policy_id; + } else - offset += - template->add_ip6_values (b0, f, direction, offset, sx, info, last); + info.vrf_name = up_table->ft_desc; + + info.sw_if_name = NULL; + { + vnet_sw_interface_t *si = + vnet_get_sw_interface_or_null (vnm, f->ipfix.up_dst_sw_if_index); + if (si) + { + vnet_sw_interface_t *si_sup = + vnet_get_sup_sw_interface (vnm, si->sw_if_index); + vnet_hw_interface_t *hi_sup = + vnet_get_hw_interface (vnm, si_sup->hw_if_index); + info.sw_if_name = hi_sup->name; + } + } + + offset += template->per_ip[fproto].add_values ( + b0, offset, sx, f, f->uplink_direction, nwi, &info, last); /* Reset per flow-export counters */ - flow_side (f, direction)->ipfix.last_exported = now; - f->exported = 1; + if (nwi->ipfix.report_interval) + f->ipfix.next_export_at = now + nwi->ipfix.report_interval; + else + f->ipfix.next_export_at = 0; + f->ipfix_exported = 1; b0->current_length = offset; - context->next_record_offset_per_worker[my_cpu_number] = offset; + + vlib_increment_simple_counter ( + >m->upf_simple_counters[UPF_IPFIX_RECORDS_SENT], + vlib_get_thread_index (), 0, 1); + + ipfix_exporter_t *exp = upf_ipfix_get_exporter (context); + + if (!exp) + return; + /* Time to flush the buffer? */ - /* TODO uncomment! also: force upon removal */ - /* if (offset + context->rec_size > exp->path_mtu) */ - upf_ipfix_export_send (vm, b0, context, now); + if (offset + context->rec_size > exp->path_mtu) + upf_ipfix_export_send (vm, b0, context, now); } -int -upf_ipfix_flow_stats_update_handler (flow_entry_t *f, - flow_direction_t direction, u32 now) +void +upf_ipfix_flow_stats_update_handler (flow_entry_t *f, u32 now) { upf_ipfix_main_t *fm = &upf_ipfix_main; vlib_main_t *vm = fm->vlib_main; - u32 iidx; - upf_ipfix_info_t *info; - if (fm->disabled) - return 0; + if (f->ipfix_disabled) + return; - if ((iidx = upf_ipfix_ensure_flow_ipfix_info (f, direction)) == ~0) - return 0; + if (f->ipfix.next_export_at == 0) + return; - info = pool_elt_at_index (fm->infos, iidx); - if (info->report_interval) - if (PREDICT_FALSE (now > flow_side (f, direction)->ipfix.last_exported + - info->report_interval)) - upf_ipfix_export_entry (vm, f, direction, now, false); + if (PREDICT_FALSE (now >= f->ipfix.next_export_at)) + upf_ipfix_export_entry (vm, f, now, false); - return 0; + return; } -int +void upf_ipfix_flow_remove_handler (flow_entry_t *f, u32 now) { upf_ipfix_main_t *fm = &upf_ipfix_main; vlib_main_t *vm = fm->vlib_main; - if (fm->disabled) - return 0; - - u32 origin_iidx = flow_side (f, FT_ORIGIN)->ipfix.info_index; - u32 reverse_iidx = flow_side (f, FT_REVERSE)->ipfix.info_index; - - if (origin_iidx != ~0) - { - bool last = reverse_iidx == ~0; - upf_ipfix_export_entry (vm, f, FT_ORIGIN, now, last); - upf_unref_ipfix_info (origin_iidx); - } - - if (reverse_iidx != ~0) - { - upf_ipfix_export_entry (vm, f, FT_REVERSE, now, true); - upf_unref_ipfix_info (reverse_iidx); - } + if (f->ipfix_disabled) + return; - return 0; + upf_ipfix_export_entry (vm, f, now, true); } u32 -upf_ref_ipfix_context (upf_ipfix_context_key_t *key) +upf_ipfix_ensure_context (const upf_ipfix_context_key_t *key) { int rv; vlib_thread_main_t *tm = &vlib_thread_main; upf_ipfix_main_t *fm = &upf_ipfix_main; clib_bihash_kv_24_8_t kv, value; - upf_ipfix_protocol_context_t *context; + upf_ipfix_context_t *context; /* Decide how many worker threads we have */ u32 num_threads = 1 /* main thread */ + tm->n_threads; u32 idx = ~0; @@ -536,7 +671,6 @@ upf_ref_ipfix_context (upf_ipfix_context_key_t *key) !clib_bihash_search_24_8 (&fm->context_by_key, &kv, &value))) { context = pool_elt_at_index (fm->contexts, value.value); - clib_atomic_add_fetch (&context->refcnt, 1); return value.value; } @@ -550,7 +684,6 @@ upf_ref_ipfix_context (upf_ipfix_context_key_t *key) /* lookup the exporter a bit later */ context->exporter_index = (u32) ~0; - context->refcnt = 1; idx = context - fm->contexts; rv = upf_ipfix_report_add_del (fm, key->observation_domain_id, idx, @@ -569,187 +702,6 @@ upf_ref_ipfix_context (upf_ipfix_context_key_t *key) return idx; } -void -upf_ref_ipfix_context_by_index (u32 cidx) -{ - upf_ipfix_main_t *fm = &upf_ipfix_main; - upf_ipfix_protocol_context_t *context; - - context = pool_elt_at_index (fm->contexts, cidx); - clib_atomic_add_fetch (&context->refcnt, 1); -} - -void -upf_unref_ipfix_context_by_index (u32 cidx) -{ - int rv; - upf_ipfix_main_t *fm = &upf_ipfix_main; - clib_bihash_kv_24_8_t kv; - upf_ipfix_protocol_context_t *context; - - context = pool_elt_at_index (fm->contexts, cidx); - if (clib_atomic_sub_fetch (&context->refcnt, 1)) - return; - - clib_memcpy_fast (&kv.key, &context->key, sizeof (kv.key)); - clib_bihash_add_del_24_8 (&fm->context_by_key, &kv, 0 /* is_add */); - - rv = upf_ipfix_report_add_del (fm, context->key.observation_domain_id, cidx, - &context->template_id, context->key.is_ip4, - false); - if (rv) - clib_warning ("couldn't remove IPFIX report, perhaps " - "the exporter has been deleted?"); - - vec_free (context->buffers_per_worker); - vec_free (context->frames_per_worker); - vec_free (context->next_record_offset_per_worker); - pool_put (fm->contexts, context); -} - -u32 -upf_ensure_ref_ipfix_info (upf_ipfix_info_key_t *key) -{ - upf_main_t *gtm = &upf_main; - upf_ipfix_main_t *fm = &upf_ipfix_main; - vnet_main_t *vnm = vnet_get_main (); - clib_bihash_kv_24_8_t kv, value; - upf_ipfix_info_t *info; - u32 idx = ~0; - fib_protocol_t fproto; - fib_table_t *ingress_table, *egress_table; - upf_ipfix_context_key_t context_key; - upf_nwi_t *nwi = 0; - - clib_memcpy_fast (&kv.key, key, sizeof (kv.key)); - -#ifdef UPF_FLOW_SESSION_SPINLOCK - clib_spinlock_lock (&fm->lock); -#endif - - if (PREDICT_TRUE (!clib_bihash_search_24_8 (&fm->info_by_key, &kv, &value))) - { - info = pool_elt_at_index (fm->infos, value.value); - clib_atomic_add_fetch (&info->refcnt, 1); - idx = value.value; - goto done; - } - - pool_get_zero (fm->infos, info); - clib_memcpy_fast (&info->key, key, sizeof (info->key)); - info->refcnt = 1; - fproto = key->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6; - ingress_table = fib_table_get (key->ingress_fib_index, fproto); - egress_table = fib_table_get (key->egress_fib_index, fproto); - info->ingress_vrf_id = ingress_table->ft_table_id; - info->egress_vrf_id = egress_table->ft_table_id; - - if (key->forwarding_policy_index == ~0) - info->vrf_name = vec_dup (egress_table->ft_desc); - else - { - upf_forwarding_policy_t *fp_entry = pool_elt_at_index ( - gtm->upf_forwarding_policies, key->forwarding_policy_index); - info->vrf_name = vec_dup (fp_entry->policy_id); - } - - clib_memset (&context_key, 0, sizeof (context_key)); - /* FIXME: introduce refcounting for NWIs */ - if (!pool_is_free_index (gtm->nwis, key->info_nwi_index)) - { - nwi = pool_elt_at_index (gtm->nwis, key->info_nwi_index); - info->report_interval = nwi->ipfix_report_interval; - context_key.observation_domain_id = nwi->observation_domain_id; - info->observation_point_id = nwi->observation_point_id; - info->observation_domain_name = vec_dup (nwi->observation_domain_name); - ip_address_to_46 (&nwi->ipfix_collector_ip, &context_key.collector_ip); - } - else - clib_warning ("non-existent egress NWI at index %u", key->info_nwi_index); - - if (info->report_interval == ~0) - info->report_interval = 0; - - context_key.policy = key->policy; - context_key.is_ip4 = key->is_ip4; - info->context_index = upf_ref_ipfix_context (&context_key); - - if (info->context_index == ~0) - { - clib_warning ("failed to allocate IPFIX context"); - pool_put (fm->infos, info); - goto done; /* will return ~0 */ - } - - if (nwi) - { - u32 *cur_index; - vec_foreach (cur_index, nwi->ipfix_context_indices) - { - if (*cur_index == info->context_index) - break; - } - - if (cur_index == vec_end (nwi->ipfix_context_indices)) - { - /* - * Reference the context from NWI to prevent it from being - * deleted till the NWI is deleted. This way, we avoid - * ever-increasing template IDs during intermittent traffic. - */ - upf_ref_ipfix_context_by_index (info->context_index); - vec_add1 (nwi->ipfix_context_indices, info->context_index); - } - } - - if (key->sw_if_index != ~0) - info->interface_name = - format (0, "%U", format_vnet_sw_if_index_name, vnm, key->sw_if_index); - - idx = info - fm->infos; - kv.value = idx; - clib_bihash_add_del_24_8 (&fm->info_by_key, &kv, 1); - -done: -#ifdef UPF_FLOW_SESSION_SPINLOCK - clib_spinlock_unlock (&fm->lock); -#endif - return idx; -} - -void -upf_unref_ipfix_info (u32 iidx) -{ - upf_ipfix_main_t *fm = &upf_ipfix_main; - clib_bihash_kv_24_8_t kv; - upf_ipfix_info_t *info; - -#ifdef UPF_FLOW_SESSION_SPINLOCK - clib_spinlock_lock (&fm->lock); -#endif - info = pool_elt_at_index (fm->infos, iidx); - if (clib_atomic_sub_fetch (&info->refcnt, 1)) -#ifdef UPF_FLOW_SESSION_SPINLOCK - goto done; -#else - return; -#endif - - clib_memcpy_fast (&kv.key, &info->key, sizeof (kv.key)); - clib_bihash_add_del_24_8 (&fm->info_by_key, &kv, 0 /* is_add */); - - /* TODO: unref forwarding policy object */ - upf_unref_ipfix_context_by_index (info->context_index); - vec_free (info->vrf_name); - vec_free (info->observation_domain_name); - vec_free (info->interface_name); - -#ifdef UPF_FLOW_SESSION_SPINLOCK -done: - clib_spinlock_unlock (&fm->lock); -#endif -} - /** * @brief Set up the API message handling tables * @param vm vlib_main_t * vlib main data structure pointer @@ -761,9 +713,6 @@ upf_ipfix_init (vlib_main_t *vm) upf_ipfix_main_t *fm = &upf_ipfix_main; clib_error_t *error = 0; - clib_spinlock_init (&fm->lock); - - fm->vnet_main = vnet_get_main (); fm->vlib_main = vm; /* FIXME: shouldn't need that */ /* Set up time reference pair */ @@ -775,11 +724,6 @@ upf_ipfix_init (vlib_main_t *vm) UPF_IPFIX_MAPPING_MEMORY_SIZE); /* clib_bihash_set_kvp_format_fn_24_8 (&fm->context_by_key, */ /* format_ipfix_context_key); */ - clib_bihash_init_24_8 (&fm->info_by_key, "info_by_key", - UPF_IPFIX_MAPPING_BUCKETS, - UPF_IPFIX_MAPPING_MEMORY_SIZE); - /* clib_bihash_set_kvp_format_fn_24_8 (&fm->info_by_key, */ - /* format_ipfix_info_key); */ return error; } @@ -845,128 +789,3 @@ format_upf_ipfix_policy (u8 *s, va_list *args) format (s, "%s", upf_ipfix_templates[policy].name) : format (s, "", policy); } - -static u32 -upf_ipfix_ensure_flow_ipfix_info (flow_entry_t *f, flow_direction_t direction) -{ - upf_ipfix_main_t *fm = &upf_ipfix_main; - upf_main_t *gtm = &upf_main; - upf_session_t *sx; - struct rules *active; - u32 pdr_id; - upf_pdr_t *pdr; - upf_far_t *far; - upf_ipfix_info_key_t info_key; - upf_nwi_t *ingress_nwi, *egress_nwi; - fib_protocol_t fproto; - upf_ipfix_info_t *other_info = 0; - u32 iidx; - - if ((iidx = flow_side (f, direction)->ipfix.info_index) != ~0) - return iidx; - - sx = pool_elt_at_index (gtm->sessions, f->session_index); - active = pfcp_get_rules (sx, PFCP_ACTIVE); - - pdr_id = flow_side (f, direction)->pdr_id; - if (pdr_id == ~0) - return ~0; - - pdr = pfcp_get_pdr_by_id (active, pdr_id); - if (!pdr || pool_is_free_index (gtm->nwis, pdr->pdi.nwi_index)) - return ~0; - - far = pfcp_get_far_by_id (active, pdr->far_id); - if (!far || pool_is_free_index (gtm->nwis, far->forward.nwi_index)) - return ~0; - - egress_nwi = pool_elt_at_index (gtm->nwis, far->forward.nwi_index); - - /* - * IPFIX policy specified in the FAR itself, if any, takes - * precedence over the policy specified in the egress NWI. Note that - * it can be UPF_IPFIX_POLICY_NONE which is specified as an empty - * string on the PFCP level. - */ - info_key.policy = far->ipfix_policy != UPF_IPFIX_POLICY_UNSPECIFIED ? - far->ipfix_policy : - egress_nwi->ipfix_policy; - - /* - * For the reverse direction, we reuse IPFIX settings specified for - * the forward direction, except for the policy if it's specified - * for the reverse direction, too - */ - if (direction == FT_REVERSE) - { - /* - * If this is the reverse flow direction, use IPFIX settings for the - * forward direction; - */ - if (flow_side (f, FT_ORIGIN)->ipfix.info_index != ~0) - { - other_info = pool_elt_at_index ( - fm->infos, flow_side (f, FT_ORIGIN)->ipfix.info_index); - if (info_key.policy == UPF_IPFIX_POLICY_NONE) - info_key.policy = other_info->key.policy; - info_key.info_nwi_index = other_info->key.info_nwi_index; - } - else - return ~0; - } - else - { - if (info_key.policy == UPF_IPFIX_POLICY_NONE) - return ~0; - info_key.info_nwi_index = far->forward.nwi_index; - } - - ingress_nwi = pool_elt_at_index (gtm->nwis, pdr->pdi.nwi_index); - - info_key.is_ip4 = f->key.is_ip4; - fproto = info_key.is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6; - info_key.ingress_fib_index = ingress_nwi->fib_index[fproto]; - info_key.egress_fib_index = egress_nwi->fib_index[fproto]; - info_key.sw_if_index = ~0; - info_key.forwarding_policy_index = ~0; - - /* - * If there's a forwarding policy specified in FAR, try to find the - * proper fib_index from it - */ - if (direction == FT_ORIGIN) - { - if ((far->forward.flags & FAR_F_FORWARDING_POLICY)) - { - fib_route_path_t *rpath; - upf_forwarding_policy_t *fp_entry = pool_elt_at_index ( - gtm->upf_forwarding_policies, far->forward.fp_pool_index); - /* TODO: add ref for the forwarding policy object */ - info_key.forwarding_policy_index = far->forward.fp_pool_index; - vec_foreach (rpath, fp_entry->rpaths) - { - if (rpath->frp_proto == - (info_key.is_ip4 ? DPO_PROTO_IP4 : DPO_PROTO_IP6)) - { - info_key.egress_fib_index = rpath->frp_fib_index; - break; - } - } - } - } - else if (other_info) - info_key.forwarding_policy_index = other_info->key.forwarding_policy_index; - - if ((far->forward.flags & FAR_F_OUTER_HEADER_CREATION)) - info_key.sw_if_index = far->forward.dst_sw_if_index; - else - info_key.sw_if_index = upf_ip46_get_resolving_interface ( - info_key.egress_fib_index, &f->key.ip[FTK_EL_DST ^ direction], - info_key.is_ip4); - - iidx = upf_ensure_ref_ipfix_info (&info_key); - if (iidx != ~0) - flow_side (f, direction)->ipfix.info_index = iidx; - - return iidx; -} diff --git a/upf/upf_ipfix.h b/upf/upf_ipfix.h index e3857e8..7422e47 100644 --- a/upf/upf_ipfix.h +++ b/upf/upf_ipfix.h @@ -23,34 +23,10 @@ typedef struct { struct { - u32 ingress_fib_index; - u32 egress_fib_index; - /* - * the NWI used to populate observationDomain{Id,Name} - * and observationPointId - */ - u32 info_nwi_index; // initiator->far->nwi_index - u32 sw_if_index; - u32 forwarding_policy_index; + ip_address_t collector_ip; upf_ipfix_policy_t policy; - u8 is_ip4; - }; - u64 key[3]; - }; -} upf_ipfix_info_key_t; - -STATIC_ASSERT_SIZEOF (upf_ipfix_info_key_t, 24); - -typedef struct -{ - union - { - struct - { - ip46_address_t collector_ip; + bool is_ip4; u32 observation_domain_id; - upf_ipfix_policy_t policy; - u8 is_ip4; }; u64 key[3]; }; @@ -62,45 +38,25 @@ typedef struct { /** Context key */ upf_ipfix_context_key_t key; + /** ipfix buffers under construction, per-worker thread */ vlib_buffer_t **buffers_per_worker; /** frames containing ipfix buffers, per-worker thread */ vlib_frame_t **frames_per_worker; /** next record offset, per worker thread */ u16 *next_record_offset_per_worker; - /** record size */ - u16 rec_size; - /** IPFIX template id */ + + /** current record size **/ + u16 rec_size; // TODO: follow and document u16 template_id; - /** Exporter index */ u32 exporter_index; - /** Reference count */ - u32 refcnt; -} upf_ipfix_protocol_context_t; +} upf_ipfix_context_t; typedef struct { - /** info key */ - upf_ipfix_info_key_t key; - /** Context index */ - u32 context_index; - /** Report interval in seconds */ - u32 report_interval; - /** IPFIX field: ingressVRFID */ - u32 ingress_vrf_id; - /** IPFIX field: egressVRFID */ - u32 egress_vrf_id; - /** IPFIX field: VRFname */ u8 *vrf_name; - /** IPFIX field: interfaceName */ - u8 *interface_name; - /** IPFIX field: observationDomainName */ - u8 *observation_domain_name; - /** IPFIX field: observationPointId */ - u64 observation_point_id; - /** Reference count */ - u32 refcnt; -} upf_ipfix_info_t; + u8 *sw_if_name; +} upf_ipfix_report_info_t; /** * @file @@ -108,62 +64,47 @@ typedef struct */ typedef struct { - clib_spinlock_t lock; - clib_bihash_24_8_t context_by_key; - clib_bihash_24_8_t info_by_key; - upf_ipfix_protocol_context_t *contexts; - upf_ipfix_info_t *infos; - u16 template_id; - upf_ipfix_policy_t policy; + upf_ipfix_context_t *contexts; // pool of contexts + clib_bihash_24_8_t context_by_key; // reusing of contexts by key + u16 template_id; u32 vlib_time_0; - bool initialized; - bool disabled; - - u8 *flow_per_interface; - /** convenience vlib_main_t pointer */ vlib_main_t *vlib_main; - /** convenience vnet_main_t pointer */ - vnet_main_t *vnet_main; } upf_ipfix_main_t; -u8 *format_upf_ipfix_entry (u8 *s, va_list *args); - -clib_error_t *upf_ipfix_init (vlib_main_t *vm); - typedef ipfix_field_specifier_t *(*upf_ipfix_field_func_t) ( ipfix_field_specifier_t *); -typedef u32 (*upf_ipfix_value_func_t) (vlib_buffer_t *to_b, flow_entry_t *f, - flow_direction_t direction, u16 offset, - upf_session_t *sx, - upf_ipfix_info_t *info, bool last); +typedef u32 (*upf_ipfix_value_func_t) (vlib_buffer_t *to_b, u16 offset, + upf_session_t *sx, flow_entry_t *f, + flow_direction_t uplink_direction, + upf_nwi_t *uplink_nwi, + upf_ipfix_report_info_t *info, + bool last); + +typedef struct +{ + u16 field_count; + upf_ipfix_field_func_t add_fields; + upf_ipfix_value_func_t add_values; +} upf_ipfix_template_proto_t; typedef struct { char *name; - u16 field_count_ipv4; - u16 field_count_ipv6; - upf_ipfix_field_func_t add_ip4_fields; - upf_ipfix_field_func_t add_ip6_fields; - upf_ipfix_value_func_t add_ip4_values; - upf_ipfix_value_func_t add_ip6_values; + upf_ipfix_template_proto_t per_ip[FIB_PROTOCOL_IP_MAX]; } upf_ipfix_template_t; extern upf_ipfix_template_t upf_ipfix_templates[]; -u32 upf_ref_ipfix_context (upf_ipfix_context_key_t *key); -void upf_ref_ipfix_context_by_index (u32 cidx); -void upf_unref_ipfix_context_by_index (u32 cidx); +clib_error_t *upf_ipfix_init (vlib_main_t *vm); -u32 upf_ensure_ref_ipfix_info (upf_ipfix_info_key_t *key); -void upf_unref_ipfix_info (u32 iidx); +u32 upf_ipfix_ensure_context (const upf_ipfix_context_key_t *key); upf_ipfix_policy_t upf_ipfix_lookup_policy (u8 *name, bool *ok); uword unformat_ipfix_policy (unformat_input_t *i, va_list *args); u8 *format_upf_ipfix_policy (u8 *s, va_list *args); - -void upf_ipfix_ensure_flow_info (flow_entry_t *f); +u8 *format_upf_ipfix_entry (u8 *s, va_list *args); #endif diff --git a/upf/upf_ipfix_templates.c b/upf/upf_ipfix_templates.c index 9608d59..64c0654 100644 --- a/upf/upf_ipfix_templates.c +++ b/upf/upf_ipfix_templates.c @@ -31,141 +31,176 @@ #include "upf_ipfix_templates.h" #include "upf_pfcp.h" -#define IPFIX_TEMPLATE_DEFAULT_IPV4(F) \ +#define IPFIX_TEMPLATE_NAT_EVENT_IPV4(F) \ IPFIX_FIELD_SOURCE_IPV4_ADDRESS (F) \ IPFIX_FIELD_DESTINATION_IPV4_ADDRESS (F) \ - IPFIX_FIELD_PROTOCOL_IDENTIFIER (F) \ IPFIX_FIELD_POST_NAT_SOURCE_IPV4_ADDRESS (F) \ IPFIX_FIELD_POST_NAPT_SOURCE_TRANSPORT_PORT (F) \ IPFIX_FIELD_NAT_EVENT (F) -#define IPFIX_TEMPLATE_DEFAULT_IPV6(F) \ +#define IPFIX_TEMPLATE_NAT_EVENT_IPV6(F) \ IPFIX_FIELD_SOURCE_IPV6_ADDRESS (F) \ - IPFIX_FIELD_DESTINATION_IPV6_ADDRESS (F) \ - IPFIX_FIELD_PROTOCOL_IDENTIFIER (F) + IPFIX_FIELD_DESTINATION_IPV6_ADDRESS (F) -#define IPFIX_TEMPLATE_DEFAULT_COMMON(F) \ +#define IPFIX_TEMPLATE_NAT_EVENT_COMMON(F) \ + IPFIX_FIELD_PROTOCOL_IDENTIFIER (F) \ IPFIX_FIELD_MOBILE_IMSI (F) \ - IPFIX_FIELD_PACKET_DELTA_COUNT (F) \ - IPFIX_FIELD_OCTET_DELTA_COUNT (F) \ + IPFIX_FIELD_INITIATOR_OCTETS (F) \ + IPFIX_FIELD_RESPONDER_OCTETS (F) \ + IPFIX_FIELD_INITIATOR_PACKETS (F) \ + IPFIX_FIELD_RESPONDER_PACKETS (F) \ IPFIX_FIELD_FLOW_START_MILLISECONDS (F) \ IPFIX_FIELD_FLOW_END_MILLISECONDS (F) \ IPFIX_FIELD_SOURCE_TRANSPORT_PORT (F) \ - IPFIX_FIELD_DESTINATION_TRANSPORT_PORT (F) + IPFIX_FIELD_DESTINATION_TRANSPORT_PORT (F) \ + IPFIX_FIELD_BIFLOW_DIRECTION (F) static ipfix_field_specifier_t * -upf_ipfix_template_default_ip4_fields (ipfix_field_specifier_t *f) +upf_ipfix_template_nat_event_ip4_fields (ipfix_field_specifier_t *f) { - IPFIX_TEMPLATE_FIELDS (IPFIX_TEMPLATE_DEFAULT_IPV4, - IPFIX_TEMPLATE_DEFAULT_COMMON); + IPFIX_TEMPLATE_FIELDS (IPFIX_TEMPLATE_NAT_EVENT_IPV4, + IPFIX_TEMPLATE_NAT_EVENT_COMMON); } static ipfix_field_specifier_t * -upf_ipfix_template_default_ip6_fields (ipfix_field_specifier_t *f) +upf_ipfix_template_nat_event_ip6_fields (ipfix_field_specifier_t *f) { - IPFIX_TEMPLATE_FIELDS (IPFIX_TEMPLATE_DEFAULT_IPV6, - IPFIX_TEMPLATE_DEFAULT_COMMON); + IPFIX_TEMPLATE_FIELDS (IPFIX_TEMPLATE_NAT_EVENT_IPV6, + IPFIX_TEMPLATE_NAT_EVENT_COMMON); } static u32 -upf_ipfix_template_default_ip4_values (vlib_buffer_t *to_b, flow_entry_t *f, - flow_direction_t direction, u16 offset, - upf_session_t *sx, - upf_ipfix_info_t *info, bool last) +upf_ipfix_template_nat_event_ip4_values (vlib_buffer_t *to_b, u16 offset, + upf_session_t *sx, flow_entry_t *f, + flow_direction_t uplink_direction, + upf_nwi_t *uplink_nwi, + upf_ipfix_report_info_t *info, + bool last) { - IPFIX_TEMPLATE_VALUES (IPFIX_TEMPLATE_DEFAULT_IPV4, - IPFIX_TEMPLATE_DEFAULT_COMMON); + ASSERT (uplink_direction == FT_ORIGIN || uplink_direction == FT_REVERSE); + IPFIX_TEMPLATE_VALUES (IPFIX_TEMPLATE_NAT_EVENT_IPV4, + IPFIX_TEMPLATE_NAT_EVENT_COMMON); } static u32 -upf_ipfix_template_default_ip6_values (vlib_buffer_t *to_b, flow_entry_t *f, - flow_direction_t direction, u16 offset, - upf_session_t *sx, - upf_ipfix_info_t *info, bool last) +upf_ipfix_template_nat_event_ip6_values (vlib_buffer_t *to_b, u16 offset, + upf_session_t *sx, flow_entry_t *f, + flow_direction_t uplink_direction, + upf_nwi_t *uplink_nwi, + upf_ipfix_report_info_t *info, + bool last) { - IPFIX_TEMPLATE_VALUES (IPFIX_TEMPLATE_DEFAULT_IPV6, - IPFIX_TEMPLATE_DEFAULT_COMMON); + ASSERT (uplink_direction == FT_ORIGIN || uplink_direction == FT_REVERSE); + IPFIX_TEMPLATE_VALUES (IPFIX_TEMPLATE_NAT_EVENT_IPV6, + IPFIX_TEMPLATE_NAT_EVENT_COMMON); } -#define IPFIX_TEMPLATE_DEST_IPV4(F) \ +#define IPFIX_TEMPLATE_FLOW_USAGE_IPV4(F) \ IPFIX_FIELD_SOURCE_IPV4_ADDRESS (F) \ IPFIX_FIELD_DESTINATION_IPV4_ADDRESS (F) -#define IPFIX_TEMPLATE_DEST_IPV6(F) \ +#define IPFIX_TEMPLATE_FLOW_USAGE_IPV6(F) \ IPFIX_FIELD_SOURCE_IPV6_ADDRESS (F) \ IPFIX_FIELD_DESTINATION_IPV6_ADDRESS (F) -#define IPFIX_TEMPLATE_DEST_COMMON(F) \ - IPFIX_FIELD_OCTET_DELTA_COUNT (F) \ +#define IPFIX_TEMPLATE_FLOW_USAGE_COMMON(F) \ + IPFIX_FIELD_PROTOCOL_IDENTIFIER (F) \ + IPFIX_FIELD_INITIATOR_OCTETS (F) \ + IPFIX_FIELD_RESPONDER_OCTETS (F) \ + IPFIX_FIELD_INITIATOR_PACKETS (F) \ + IPFIX_FIELD_RESPONDER_PACKETS (F) \ + IPFIX_FIELD_FLOW_START_MILLISECONDS (F) \ IPFIX_FIELD_FLOW_END_MILLISECONDS (F) \ - IPFIX_FIELD_FLOW_DIRECTION (F) \ - IPFIX_FIELD_INGRESS_VRF_ID (F) \ - IPFIX_FIELD_EGRESS_VRF_ID (F) \ IPFIX_FIELD_VRF_NAME (F) \ IPFIX_FIELD_INTERFACE_NAME (F) \ IPFIX_FIELD_OBSERVATION_DOMAIN_NAME (F) \ - IPFIX_FIELD_OBSERVATION_POINT_ID (F) + IPFIX_FIELD_OBSERVATION_POINT_ID (F) \ + IPFIX_FIELD_BIFLOW_DIRECTION (F) static ipfix_field_specifier_t * -upf_ipfix_template_dest_ip4_fields (ipfix_field_specifier_t *f) +upf_ipfix_template_flow_usage_ip4_fields (ipfix_field_specifier_t *f) { - IPFIX_TEMPLATE_FIELDS (IPFIX_TEMPLATE_DEST_IPV4, IPFIX_TEMPLATE_DEST_COMMON); + IPFIX_TEMPLATE_FIELDS (IPFIX_TEMPLATE_FLOW_USAGE_IPV4, + IPFIX_TEMPLATE_FLOW_USAGE_COMMON); } static ipfix_field_specifier_t * -upf_ipfix_template_dest_ip6_fields (ipfix_field_specifier_t *f) +upf_ipfix_template_flow_usage_ip6_fields (ipfix_field_specifier_t *f) { - IPFIX_TEMPLATE_FIELDS (IPFIX_TEMPLATE_DEST_IPV6, IPFIX_TEMPLATE_DEST_COMMON); + IPFIX_TEMPLATE_FIELDS (IPFIX_TEMPLATE_FLOW_USAGE_IPV6, + IPFIX_TEMPLATE_FLOW_USAGE_COMMON); } static u32 -upf_ipfix_template_dest_ip4_values (vlib_buffer_t *to_b, flow_entry_t *f, - flow_direction_t direction, u16 offset, - upf_session_t *sx, upf_ipfix_info_t *info, - bool last) +upf_ipfix_template_flow_usage_ip4_values (vlib_buffer_t *to_b, u16 offset, + upf_session_t *sx, flow_entry_t *f, + flow_direction_t uplink_direction, + upf_nwi_t *uplink_nwi, + upf_ipfix_report_info_t *info, + bool last) { - IPFIX_TEMPLATE_VALUES (IPFIX_TEMPLATE_DEST_IPV4, IPFIX_TEMPLATE_DEST_COMMON); + ASSERT (uplink_direction == FT_ORIGIN || uplink_direction == FT_REVERSE); + IPFIX_TEMPLATE_VALUES (IPFIX_TEMPLATE_FLOW_USAGE_IPV4, + IPFIX_TEMPLATE_FLOW_USAGE_COMMON); } static u32 -upf_ipfix_template_dest_ip6_values (vlib_buffer_t *to_b, flow_entry_t *f, - flow_direction_t direction, u16 offset, - upf_session_t *sx, upf_ipfix_info_t *info, - bool last) +upf_ipfix_template_flow_usage_ip6_values (vlib_buffer_t *to_b, u16 offset, + upf_session_t *sx, flow_entry_t *f, + flow_direction_t uplink_direction, + upf_nwi_t *uplink_nwi, + upf_ipfix_report_info_t *info, + bool last) { - IPFIX_TEMPLATE_VALUES (IPFIX_TEMPLATE_DEST_IPV6, IPFIX_TEMPLATE_DEST_COMMON); + ASSERT (uplink_direction == FT_ORIGIN || uplink_direction == FT_REVERSE); + IPFIX_TEMPLATE_VALUES (IPFIX_TEMPLATE_FLOW_USAGE_IPV6, + IPFIX_TEMPLATE_FLOW_USAGE_COMMON); } upf_ipfix_template_t upf_ipfix_templates[UPF_IPFIX_N_POLICIES] = { [UPF_IPFIX_POLICY_NONE] = { .name = "none", - .field_count_ipv4 = 0, - .field_count_ipv6 = 0, + .per_ip={ + [FIB_PROTOCOL_IP4] = { + .field_count = 0, + }, + [FIB_PROTOCOL_IP6] = { + .field_count = 0, + }, + }, }, - [UPF_IPFIX_POLICY_DEFAULT] = { - .name = "default", - .field_count_ipv4 = - IPFIX_TEMPLATE_COUNT (IPFIX_TEMPLATE_DEFAULT_IPV4, - IPFIX_TEMPLATE_DEFAULT_COMMON), - .field_count_ipv6 = - IPFIX_TEMPLATE_COUNT (IPFIX_TEMPLATE_DEFAULT_IPV6, - IPFIX_TEMPLATE_DEFAULT_COMMON), - .add_ip4_fields = upf_ipfix_template_default_ip4_fields, - .add_ip6_fields = upf_ipfix_template_default_ip6_fields, - .add_ip4_values = upf_ipfix_template_default_ip4_values, - .add_ip6_values = upf_ipfix_template_default_ip6_values, + [UPF_IPFIX_POLICY_NAT_EVENT] = { + .name = "NatEvent", + .per_ip={ + [FIB_PROTOCOL_IP4] = { + .field_count = IPFIX_TEMPLATE_COUNT (IPFIX_TEMPLATE_NAT_EVENT_IPV4, + IPFIX_TEMPLATE_NAT_EVENT_COMMON), + .add_fields = upf_ipfix_template_nat_event_ip4_fields, + .add_values = upf_ipfix_template_nat_event_ip4_values, + }, + [FIB_PROTOCOL_IP6] = { + .field_count = IPFIX_TEMPLATE_COUNT (IPFIX_TEMPLATE_NAT_EVENT_IPV6, + IPFIX_TEMPLATE_NAT_EVENT_COMMON), + .add_fields = upf_ipfix_template_nat_event_ip6_fields, + .add_values = upf_ipfix_template_nat_event_ip6_values, + }, + }, }, - [UPF_IPFIX_POLICY_DEST] = { - .name = "dest", - .field_count_ipv4 = - IPFIX_TEMPLATE_COUNT (IPFIX_TEMPLATE_DEST_IPV4, - IPFIX_TEMPLATE_DEST_COMMON), - .field_count_ipv6 = - IPFIX_TEMPLATE_COUNT (IPFIX_TEMPLATE_DEST_IPV6, - IPFIX_TEMPLATE_DEST_COMMON), - .add_ip4_fields = upf_ipfix_template_dest_ip4_fields, - .add_ip6_fields = upf_ipfix_template_dest_ip6_fields, - .add_ip4_values = upf_ipfix_template_dest_ip4_values, - .add_ip6_values = upf_ipfix_template_dest_ip6_values, + [UPF_IPFIX_POLICY_FLOW_USAGE] = { + .name = "FlowUsage", + .per_ip={ + [FIB_PROTOCOL_IP4] = { + .field_count = IPFIX_TEMPLATE_COUNT (IPFIX_TEMPLATE_FLOW_USAGE_IPV4, + IPFIX_TEMPLATE_FLOW_USAGE_COMMON), + .add_fields = upf_ipfix_template_flow_usage_ip4_fields, + .add_values = upf_ipfix_template_flow_usage_ip4_values, + }, + [FIB_PROTOCOL_IP6] = { + .field_count = IPFIX_TEMPLATE_COUNT (IPFIX_TEMPLATE_FLOW_USAGE_IPV6, + IPFIX_TEMPLATE_FLOW_USAGE_COMMON), + .add_fields = upf_ipfix_template_flow_usage_ip6_fields, + .add_values = upf_ipfix_template_flow_usage_ip6_values, + }, + }, }, }; diff --git a/upf/upf_ipfix_templates.h b/upf/upf_ipfix_templates.h index 1873943..f0e2bcb 100644 --- a/upf/upf_ipfix_templates.h +++ b/upf/upf_ipfix_templates.h @@ -48,6 +48,9 @@ offset += n; \ } while (0) +#define IPFIX_VALUE_U8(v, n, c) \ + to_b->data[offset++] = v; + #define IPFIX_VALUE_U8_COND(v, n, c) \ to_b->data[offset++] = (c) ? (v) : 0 @@ -114,22 +117,22 @@ #define IPFIX_FIELD_SOURCE_IPV4_ADDRESS(F) \ F(sourceIPv4Address, 4, \ IPFIX_VALUE_MEMCPY_DIRECT, \ - &f->key.ip[FTK_EL_SRC ^ direction].ip4, \ + &f->key.ip[FTK_EL_SRC ^ uplink_direction].ip4, \ sizeof(ip4_address_t), 1) #define IPFIX_FIELD_SOURCE_IPV6_ADDRESS(F) \ F(sourceIPv6Address, 16, \ IPFIX_VALUE_MEMCPY_DIRECT, \ - &f->key.ip[FTK_EL_SRC ^ direction].ip6, \ + &f->key.ip[FTK_EL_SRC ^ uplink_direction].ip6, \ sizeof(ip6_address_t), 1) #define IPFIX_FIELD_DESTINATION_IPV4_ADDRESS(F) \ F(destinationIPv4Address, 4, \ IPFIX_VALUE_MEMCPY_DIRECT, \ - &f->key.ip[FTK_EL_DST ^ direction].ip4, \ + &f->key.ip[FTK_EL_DST ^ uplink_direction].ip4, \ sizeof(ip4_address_t), 1) #define IPFIX_FIELD_DESTINATION_IPV6_ADDRESS(F) \ F(destinationIPv6Address, 16, \ IPFIX_VALUE_MEMCPY_DIRECT, \ - &f->key.ip[FTK_EL_DST ^ direction].ip6, \ + &f->key.ip[FTK_EL_DST ^ uplink_direction].ip6, \ sizeof(ip6_address_t), 1) #define IPFIX_FIELD_PROTOCOL_IDENTIFIER(F) \ F(protocolIdentifier, 1, \ @@ -141,43 +144,23 @@ #define IPFIX_FIELD_INITIATOR_PACKETS(F) \ F(initiatorPackets, 8, \ IPFIX_VALUE_DELTA_U64, \ - flow_side(f, FT_ORIGIN)->stats.pkts_unreported, \ + flow_side(f, FTK_EL_SRC ^ uplink_direction)->stats.pkts_unreported, \ sizeof(u64), 1) #define IPFIX_FIELD_RESPONDER_PACKETS(F) \ F(responderPackets, 8, \ IPFIX_VALUE_DELTA_U64, \ - flow_side(f, FT_REVERSE)->stats.pkts_unreported, \ + flow_side(f, FTK_EL_DST ^ uplink_direction)->stats.pkts_unreported, \ sizeof(u64), 1) #define IPFIX_FIELD_INITIATOR_OCTETS(F) \ F(initiatorOctets, 8, \ IPFIX_VALUE_DELTA_U64, \ - flow_side(f, FT_ORIGIN)->stats.l4_bytes_unreported, \ + flow_side(f, FTK_EL_SRC ^ uplink_direction)->stats.bytes_unreported, \ sizeof(u64), 1) #define IPFIX_FIELD_RESPONDER_OCTETS(F) \ F(responderOctets, 8, \ IPFIX_VALUE_DELTA_U64, \ - flow_side(f, FT_REVERSE)->stats.l4_bytes_unreported, \ - sizeof(u64), 1) -#define IPFIX_FIELD_PACKET_DELTA_COUNT(F) \ - F(packetDeltaCount, 8, \ - IPFIX_VALUE_DELTA_U64, \ - flow_side(f, direction)->stats.pkts_unreported, \ - sizeof(u64), 1) -#define IPFIX_FIELD_OCTET_DELTA_COUNT(F) \ - F(octetDeltaCount, 8, \ - IPFIX_VALUE_DELTA_U64, \ - flow_side(f, direction)->stats.bytes_unreported, \ - sizeof(u64), 1) -#define IPFIX_FIELD_PACKET_TOTAL_COUNT(F) \ - F(packetTotalCount, 8, \ - IPFIX_VALUE_U64, \ - flow_side(f, direction)->stats.pkts, \ + flow_side(f, FTK_EL_DST ^ uplink_direction)->stats.bytes_unreported, \ sizeof(u64), 1) -#define IPFIX_FIELD_OCTET_TOTAL_COUNT(F) \ - F(octetTotalCount, 8, \ - IPFIX_VALUE_U64, \ - flow_side(f, direction)->stats.bytes, \ - sizeof (u64), 1) #define IPFIX_FIELD_FLOW_START_MILLISECONDS(F) \ F(flowStartMilliseconds, 8, \ IPFIX_VALUE_DATETIME_MILLISECONDS, \ @@ -186,22 +169,17 @@ #define IPFIX_FIELD_FLOW_END_MILLISECONDS(F) \ F(flowEndMilliseconds, 8, \ IPFIX_VALUE_DATETIME_MILLISECONDS, \ - f->flow_end_time, \ + f->flow_last_time, \ sizeof(u64), 1) -#define IPFIX_FIELD_FLOW_DIRECTION(F) \ - F(flowDirection, 1, \ - IPFIX_VALUE_DIRECT, \ - direction == FT_ORIGIN ? 1 /* egress */ : 0 /* ingress */, \ - 1, 1) #define IPFIX_FIELD_SOURCE_TRANSPORT_PORT(F) \ F(sourceTransportPort, 2, \ IPFIX_VALUE_MEMCPY_DIRECT, \ - &f->key.port[FTK_EL_SRC ^ direction], \ + &f->key.port[FTK_EL_SRC ^ uplink_direction], \ 2, 1) #define IPFIX_FIELD_DESTINATION_TRANSPORT_PORT(F) \ F(destinationTransportPort, 2, \ IPFIX_VALUE_MEMCPY_DIRECT, \ - &f->key.port[FTK_EL_DST ^ direction], \ + &f->key.port[FTK_EL_DST ^ uplink_direction], \ 2, 1) #define IPFIX_FIELD_POST_NAT_IPV4_ADDRESS(F) \ F(postNATSourceIPv4Address, 4, \ @@ -219,14 +197,6 @@ &sx->nat_addr->ext_addr, \ sizeof(ip4_address_t), \ sx->nat_addr) -#define IPFIX_FIELD_INGRESS_VRF_ID(F) \ - F(ingressVRFID, 4, \ - IPFIX_VALUE_U32, \ - info->ingress_vrf_id, sizeof(u32), 1) -#define IPFIX_FIELD_EGRESS_VRF_ID(F) \ - F(egressVRFID, 4, \ - IPFIX_VALUE_U32, \ - info->egress_vrf_id, sizeof(u32), 1) #define IPFIX_FIELD_VRF_NAME(F) \ F(VRFname, 65535, \ IPFIX_VALUE_STRING, \ @@ -234,25 +204,30 @@ #define IPFIX_FIELD_INTERFACE_NAME(F) \ F(interfaceName, 65535, \ IPFIX_VALUE_STRING, \ - info->interface_name, \ + info->sw_if_name, \ vec_len (info->interface_name), 1) #define IPFIX_FIELD_OBSERVATION_DOMAIN_NAME(F) \ F(observationDomainName, 65535, \ IPFIX_VALUE_STRING, \ - info->observation_domain_name, \ - vec_len (info->observation_domain_name), 1) + uplink_nwi->ipfix.observation_domain_name, \ + vec_len (nwi->ipfix.bservation_domain_name), 1) #define IPFIX_FIELD_OBSERVATION_POINT_ID(F) \ F(observationPointId, 8, \ IPFIX_VALUE_U64, \ - info->observation_point_id, \ + uplink_nwi->ipfix.observation_point_id, \ sizeof(u64), 1) +#define IPFIX_FIELD_BIFLOW_DIRECTION(F) \ + F(biflowDirection, 1,\ + IPFIX_VALUE_U8, \ + (uplink_direction == FT_ORIGIN) ? 1 /* initiator */ : 2 /* reverseInitiator */ , \ + sizeof(u8), 1) #define UPF_NAT_EVENT_NAT44_SESSION_CREATE 4 #define UPF_NAT_EVENT_NAT44_SESSION_DELETE 5 #define IPFIX_FIELD_NAT_EVENT(F) \ F(natEvent, 1, \ IPFIX_VALUE_U8_COND, \ - !f->exported ? UPF_NAT_EVENT_NAT44_SESSION_CREATE : \ + !f->ipfix_exported ? UPF_NAT_EVENT_NAT44_SESSION_CREATE : \ last ? UPF_NAT_EVENT_NAT44_SESSION_DELETE : 0, \ sizeof (u8), 1) diff --git a/upf/upf_pfcp.c b/upf/upf_pfcp.c index b5d6202..2530822 100644 --- a/upf/upf_pfcp.c +++ b/upf/upf_pfcp.c @@ -156,11 +156,12 @@ vnet_upf_create_nwi_if (u8 *name, u32 ip4_table_id, u32 ip6_table_id, memset (&nwi->fib_index, ~0, sizeof (nwi->fib_index)); nwi->name = vec_dup (name); - nwi->ipfix_policy = ipfix_policy; + nwi->ipfix.default_policy = ipfix_policy; + if (ipfix_collector_ip) - ip_address_copy (&nwi->ipfix_collector_ip, ipfix_collector_ip); + ip_address_copy (&nwi->ipfix.collector_ip, ipfix_collector_ip); else - ip_address_reset (&nwi->ipfix_collector_ip); + ip_address_reset (&nwi->ipfix.collector_ip); if_index = nwi - gtm->nwis; @@ -231,14 +232,35 @@ vnet_upf_create_nwi_if (u8 *name, u32 ip4_table_id, u32 ip6_table_id, hash_set_mem (gtm->nwi_index_by_name, nwi->name, if_index); - nwi->ipfix_report_interval = ipfix_report_interval; - nwi->observation_domain_id = observation_domain_id; - nwi->observation_domain_name = vec_dup (observation_domain_name); - nwi->observation_point_id = observation_point_id; + nwi->ipfix.report_interval = ipfix_report_interval; + nwi->ipfix.observation_domain_id = observation_domain_id; + nwi->ipfix.observation_domain_name = vec_dup (observation_domain_name); + nwi->ipfix.observation_point_id = observation_point_id; if (sw_if_idx) *sw_if_idx = nwi->sw_if_index; + for (fib_protocol_t fproto = 0; fproto < FIB_PROTOCOL_IP_MAX; fproto++) + for (upf_ipfix_policy_t pol = 0; pol < UPF_IPFIX_N_POLICIES; pol++) + nwi->ipfix.contexts[fproto][pol] = ~0; + + // Try to precreate configured ipfix context to start sending ipfix templates + if (ipfix_collector_ip && ipfix_policy != UPF_IPFIX_POLICY_NONE) + { + upf_ipfix_context_key_t context_key = { 0 }; + ip_address_copy (&context_key.collector_ip, &nwi->ipfix.collector_ip); + context_key.observation_domain_id = nwi->ipfix.observation_domain_id; + context_key.policy = ipfix_policy; + + context_key.is_ip4 = true; + nwi->ipfix.contexts[FIB_PROTOCOL_IP4][ipfix_policy] = + upf_ipfix_ensure_context (&context_key); + + context_key.is_ip4 = false; + nwi->ipfix.contexts[FIB_PROTOCOL_IP6][ipfix_policy] = + upf_ipfix_ensure_context (&context_key); + } + return 0; } @@ -249,7 +271,6 @@ vnet_upf_delete_nwi_if (u8 *name) upf_main_t *gtm = &upf_main; upf_nwi_t *nwi; uword *p; - u32 *ipfix_ctx_index; p = hash_get_mem (gtm->nwi_index_by_name, name); if (!p) @@ -269,14 +290,8 @@ vnet_upf_delete_nwi_if (u8 *name) vec_add1 (gtm->free_nwi_hw_if_indices, nwi->hw_if_index); hash_unset_mem (gtm->nwi_index_by_name, nwi->name); - vec_free (nwi->observation_domain_name); + vec_free (nwi->ipfix.observation_domain_name); vec_free (nwi->name); - - vec_foreach (ipfix_ctx_index, nwi->ipfix_context_indices) - { - upf_unref_ipfix_context_by_index (*ipfix_ctx_index); - } - pool_put (gtm->nwis, nwi); return 0; @@ -1227,8 +1242,8 @@ pfcp_free_rules (upf_session_t *sx, int rule) memset (rules, 0, sizeof (*rules)); } -void -upf_ref_forwarding_policies (upf_far_t *far, u8 is_del) +always_inline void +upf_pfcp_far_ref_forwarding_policies (upf_far_t *far, u8 is_del) { upf_main_t *gtm = &upf_main; uword *hash_ptr; @@ -1276,7 +1291,7 @@ pfcp_disable_session (upf_session_t *sx) /* derefer forwarding policies */ vec_foreach (far, active->far) { - upf_ref_forwarding_policies (far, 1); + upf_pfcp_far_ref_forwarding_policies (far, 1); } node_assoc_detach_session (sx); @@ -2145,7 +2160,7 @@ pfcp_update_apply (upf_session_t *sx) far->forward.outer_header_creation.teid, far->id); } } - upf_ref_forwarding_policies (far, 0); + upf_pfcp_far_ref_forwarding_policies (far, 0); } } else @@ -2208,13 +2223,61 @@ pfcp_update_apply (upf_session_t *sx) upf_pfcp_session_start_up_inactivity_timer (si, sx->last_ul_traffic, &active->inactivity_timer); + if (pending_pdr + pending_far) + { + upf_pdr_t *pdr; + vec_foreach (pdr, active->pdr) + { + if (pdr->far_id == (u16) ~0) + continue; + + upf_far_t *far = pfcp_get_far_by_id (active, pdr->far_id); + if (!far) + continue; + + if (far->forward.nwi_index != ~0) + { + upf_nwi_t *nwi = + pool_elt_at_index (gtm->nwis, far->forward.nwi_index); + + upf_ipfix_policy_t policy; + // FAR has priority for policy + if (far->ipfix_policy != UPF_IPFIX_POLICY_UNSPECIFIED) + policy = far->ipfix_policy; + else + policy = nwi->ipfix.default_policy; + + // Force creation of ipfix contexts to send templates early + if (policy != UPF_IPFIX_POLICY_NONE) + { + upf_ipfix_context_key_t context_key = { 0 }; + ip_address_copy (&context_key.collector_ip, + &nwi->ipfix.collector_ip); + context_key.observation_domain_id = + nwi->ipfix.observation_domain_id; + context_key.policy = policy; + if (pdr->pdi.ue_addr.flags & PFCP_UE_IP_ADDRESS_V4) + { + context_key.is_ip4 = true; + upf_ipfix_ensure_context (&context_key); + } + if (pdr->pdi.ue_addr.flags & PFCP_UE_IP_ADDRESS_V6) + { + context_key.is_ip4 = false; + upf_ipfix_ensure_context (&context_key); + } + } + } + } + } + if (pending_far) { upf_far_t *far; vec_foreach (far, pending->far) { - upf_ref_forwarding_policies (far, 1); + upf_pfcp_far_ref_forwarding_policies (far, 1); if (far->forward.outer_header_creation.description != 0) peer_addr_unref (&far->forward); } diff --git a/upf/upf_pfcp.h b/upf/upf_pfcp.h index 18118bc..5fcaefa 100644 --- a/upf/upf_pfcp.h +++ b/upf/upf_pfcp.h @@ -86,7 +86,6 @@ bool process_qers (vlib_main_t *vm, upf_session_t *sess, struct rules *r, upf_pdr_t *pdr, vlib_buffer_t *b, u8 is_dl, u8 is_ul); void upf_pfcp_error_report (upf_session_t *sx, gtp_error_ind_t *error); -void upf_ref_forwarding_policies (upf_far_t *far, u8 is_del); int pfcp_session_server_apply_config (u64 segment_size, u32 prealloc_fifos, u32 fifo_size); void pfcp_session_server_get_config (u64 *segment_size, u32 *prealloc_fifos, @@ -155,16 +154,21 @@ upf_nwi_fib_index (fib_protocol_t proto, u32 nwi_index) return ~0; } -static_always_inline u32 -flow_pdr_idx (flow_entry_t *flow, flow_direction_t direction, struct rules *r) +static_always_inline upf_pdr_t * +flow_pdr (flow_entry_t *flow, flow_direction_t direction, struct rules *r) { - upf_pdr_t *pdr; u32 pdr_id = flow_side (flow, direction)->pdr_id; if (pdr_id == ~0) - return ~0; + return NULL; + + return pfcp_get_pdr_by_id (r, pdr_id); +} - pdr = pfcp_get_pdr_by_id (r, pdr_id); +static_always_inline u32 +flow_pdr_idx (flow_entry_t *flow, flow_direction_t direction, struct rules *r) +{ + upf_pdr_t *pdr = flow_pdr (flow, direction, r); return pdr ? pdr - r->pdr : ~0; } diff --git a/upf/upf_pfcp_api.c b/upf/upf_pfcp_api.c index 0063859..ec03dff 100644 --- a/upf/upf_pfcp_api.c +++ b/upf/upf_pfcp_api.c @@ -1493,6 +1493,7 @@ handle_create_far (upf_session_t *sx, pfcp_ie_create_far_t *create_far, vec_dup (far->forwarding_parameters.nat_port_block); rc = handle_nat_binding_creation (sx, pool_name, response); vec_free (pool_name); + create->apply_action |= FAR_NAT; if (rc) { far_error (response, far, diff --git a/upf/upf_proxy_input.c b/upf/upf_proxy_input.c index 4706e2f..aa744ac 100644 --- a/upf/upf_proxy_input.c +++ b/upf/upf_proxy_input.c @@ -462,9 +462,9 @@ upf_proxy_input (vlib_main_t *vm, vlib_node_runtime_t *node, IS_DL (pdr, far), IS_UL (pdr, far))) next = UPF_FORWARD_NEXT_DROP; - flow_update_stats (vm, b, flow, is_ip4, timestamp_ns, - current_time); + flow_update_stats (vm, b, flow, is_ip4, timestamp_ns); + upf_ipfix_flow_stats_update_handler (flow, current_time); #undef IS_DL #undef IS_UL }