本次使用的系统及环境为:
| 软件/系统 | 版本 |
|---|---|
| Ubuntu | 22.04.5 |
| GoFlow2 | v2.2.6 |
| Kafka | v4.1.1 |
| ClickHouse | v25.12.2.54 |
| Grafana | v12.3.1 |
实现流程
交换机通过sFlow/NetFlow协议将流量数据发送到GoFlow2采集器,GoFlow2解析后以JSON格式写入Kafka消息队列,ClickHouse从Kafka消费数据并存储到列式数据库,最后Grafana通过SQL查询ClickHouse进行可视化展示和监控。
交换机(sFlow/NetFlow) ↓ (UDP 6343/2055端口)GoFlow2(协议解析) ↓ (JSON格式)Kafka(消息缓冲) ↓ (消费者拉取)ClickHouse(列式存储) ↓ (SQL查询)Grafana(可视化监控)下载并部署kafka
安装java
kafka需要本地java环境为≥17,先安装java
apt update && apt install -y openjdk-17-jdk下载kafka并修改环境变量
wget https://dlcdn.apache.org/kafka/4.1.1/kafka_2.13-4.1.1.tgztar -xzf kafka_2.13-4.1.1.tgzmv kafka_2.13-4.1.1 /opt/sudo ln -s /opt/kafka_2.13-4.1.1 /opt/kafka#修改环境变量cat << 'EOF' >> /etc/profileexport KAFKA_HOME=/opt/kafkaexport PATH=$PATH:$KAFKA_HOME/binEOF#使环境变量立即生效source /etc/profile#查看一下kafka版本,判断环境变量和kafka是否正常kafka-topics.sh --version#4.1.1初始化kafka
# 创建数据目录sudo mkdir -p /var/lib/kafka/datasudo mkdir -p /var/lib/zookeeper/data#每个Kafka集群需要一个全局唯一ID,先随机生成 Kafka 的集群 UUIDKAFKA_CLUSTER_ID="$(/opt/kafka/bin/kafka-storage.sh random-uuid)"echo "Cluster UUID: $KAFKA_CLUSTER_ID"#Cluster UUID: tStfHNWWSJ6fK555k9ILGg# 创建持久化数据目录sudo mkdir -p /var/lib/kafka/kraft-combined-logs# 设置权限sudo chmod -R 755 /var/lib/kafka编辑Kafka配置文件
每个参数的含义可以参考这个链接
nano /opt/kafka/config/server.properties############################# Server Basics #############################
# KRaft 模式角色配置# 因为我是单台部署,服务器同时担任数据存储和元数据管理角色,正式生产环境建议分离process.roles=broker,controller# 设置节点idnode.id=1# 以下端口均可自由修改,但需要同时修改goflow和clickhouse的配置controller.quorum.bootstrap.servers=localhost:9093
############################# Socket Server Settings #############################
# 监听器配置listeners=PLAINTEXT://:9092,CONTROLLER://:9093inter.broker.listener.name=PLAINTEXTadvertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093controller.listener.names=CONTROLLERlistener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 网络线程配置num.network.threads=3num.io.threads=8
# Socket 缓冲区配置socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600
############################# Log Basics #############################
# 日志目录(持久化存储)log.dirs=/var/lib/kafka/kraft-combined-logs
# 分区配置(提高并行性)num.partitions=3num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# 内部主题副本因子offsets.topic.replication.factor=1share.coordinator.state.topic.replication.factor=1share.coordinator.state.topic.min.isr=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
# 日志保留策略(7天)log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000
############################# Performance Tuning #############################
# 压缩(节省存储和带宽)compression.type=snappy
# 批处理优化batch.size=16384linger.ms=10
# 网络缓冲replica.socket.receive.buffer.bytes=65536创建服务
nano /etc/systemd/system/kafka.service[Unit]Description=Apache Kafka Server (KRaft Mode)Documentation=https://kafka.apache.org/documentation/After=network.targetWants=network-online.target
[Service]Type=simpleUser=rootGroup=root
# Java 堆内存配置(根据你的服务器内存调整)Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"Environment="LOG_DIR=/var/lib/kafka/logs"Environment="KAFKA_HOME=/opt/kafka"
# 工作目录WorkingDirectory=/opt/kafka
# 启动命令ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
# 停止命令ExecStop=/opt/kafka/bin/kafka-server-stop.sh
# 重启策略Restart=on-failureRestartSec=10StartLimitBurst=3StartLimitInterval=120
# 日志StandardOutput=journalStandardError=journalSyslogIdentifier=kafka
# 安全和资源限制LimitNOFILE=65536LimitNPROC=4096
[Install]WantedBy=multi-user.target启动kafka服务
systemctl daemon-reloadsystemctl enable kafkasystemctl start kafkasystemctl status kafka# 创建 flow-messages topic(用于 NetFlow/IPFIX/sFlow 数据)/opt/kafka/bin/kafka-topics.sh --create --topic flow-messages --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --config retention.ms=86400000 --config compression.type=lz4/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092下载并部署goflow2
wget https://github.com/netsampler/goflow2/releases/download/v2.2.6/goflow2-2.2.6-linux-amd64chmod +x goflow2-2.2.6-linux-amd64mv goflow2-2.2.6-linux-amd64 /usr/local/bin/goflow2创建工作目录
mkdir -p /var/log/goflow2nano /etc/systemd/system/goflow2.service编辑服务
[Unit]Description=GoFlow2 NetFlow/IPFIX/sFlow CollectorDocumentation=https://github.com/netsampler/goflow2After=network.target kafka.serviceWants=kafka.service
[Service]Type=simpleUser=rootGroup=rootWorkingDirectory=/var/log/goflow2
# 将数据转发到kafkaExecStart=/usr/local/bin/goflow2 \ -transport=kafka \ -transport.kafka.brokers=localhost:9092 \ -transport.kafka.topic=flow-messages \ -listen=sflow://:6343,netflow://:2055 \ -format=json \ -loglevel=info \ -addr=:8080
Restart=on-failureRestartSec=10StandardOutput=journalStandardError=journalSyslogIdentifier=goflow2LimitNOFILE=65536
[Install]WantedBy=multi-user.target启动Goflow2服务
systemctl daemon-reloadsystemctl enable goflow2systemctl start goflow2systemctl status goflow2在交换机上配置sflow
警告以下操作因涉及到交换机配置,如果您不了解每个配置的含义,建议您停止操作,以免导致损失。
以下操作因涉及到交换机配置,如果您不了解每个配置的含义,建议您停止操作,以免导致损失。
以下操作因涉及到交换机配置,如果您不了解每个配置的含义,建议您停止操作,以免导致损失。
S系列:
以华为S5720为例,不同设备命令不同,可通过这个链接查询设备对应信息
#在system上配置sflow collector <1-2> ip <监控服务器ip> port 6343sflow agent ip <交换机ip>
#进入需要监控的网口进行配置interface <你要配置的网口>sflow flow-sampling collector <你设定的collector id>sflow flow-sampling rate 2000 #采集比例 1:2000,根据自身流量大小进行修改,建议参考官方文档undo sflow flow-sampling inbound #只采集上行流量,不写默认监控上行和下行CE系列
以华为CE12804为例,不同设备命令不同,可通过这个链接查询设备对应信息
#在system上配置sflow collector <1-2> ip <监控服务器ip> udp-port 6343sflow agent ip <交换机ip>
#进入需要监控的网口进行配置interface <你要配置的网口>sflow sampling collector <你设定的collector id>sflow sampling rate 32768 #采集比例 1:32768,根据自身流量大小进行修改,建议参考官方文档sflow sampling outbound #只采集上行流量检查采样包
查看这个接口当前是否在采样,有没有采样包。
dis sflow statistics interface <你配置的网口>如果没有包的话,返回服务器内查看goflow状态和6343端口是否正常,或检查交换机配置是否正常。
如果有采集包,在服务器内输入
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-messages --from-beginning来查看goflow2到kafka数据传输是否正常,正常情况下会收到以下内容:
{"type":"SFLOW_5","time_received_ns":1768279339983732918,"sequence_num":4,"sampling_rate":2048,"sampler_address":"192.168.1.201","time_flow_start_ns":1768279339983732918,"time_flow_end_ns":1768279339983732918,"bytes":68,"packets":1,"src_addr":"100.64.32.105","dst_addr":"112.17.34.247","etype":"IPv4","proto":"TCP","src_port":60534,"dst_port":80,"in_if":54,"out_if":21,"src_mac":"00:90:27:e1:12:d8","dst_mac":"a0:f4:79:59:d9:c5","src_vlan":100,"dst_vlan":100,"vlan_id":100,"ip_tos":0,"forwarding_status":0,"ip_ttl":64,"ip_flags":2,"tcp_flags":4,"icmp_type":0,"icmp_code":0,"ipv6_flow_label":0,"fragment_id":0,"fragment_offset":0,"src_as":0,"dst_as":0,"next_hop":"","next_hop_as":0,"src_net":"0.0.0.0/0","dst_net":"0.0.0.0/0","bgp_next_hop":"","bgp_communities":[],"as_path":[],"mpls_ttl":[],"mpls_label":[],"mpls_ip":[],"observation_domain_id":0,"observation_point_id":0,"layer_stack":["Ethernet","Dot1Q","IPv4","TCP"],"layer_size":[14,4,20,20],"ipv6_routing_header_addresses":[],"ipv6_routing_header_seg_left":0}安装clickhouse
sudo apt-get install -y apt-transport-https ca-certificates curl gnupgcurl -fsSL 'https://packages.clickhouse.com/rpm/lts/repodata/repomd.xml.key' | sudo gpg --dearmor -o /usr/share/keyrings/clickhouse-keyring.gpgARCH=$(dpkg --print-architecture)echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg arch=${ARCH}] https://packages.clickhouse.com/deb stable main" | sudo tee /etc/apt/sources.list.d/clickhouse.listsudo apt-get updatesudo apt-get install -y clickhouse-server clickhouse-client#这里网络不好的话大概能摸鱼1h-2h左右(大概等看到提示Set up the password for the default user: 直接回车即可
启动clickhouse服务
systemctl enable clickhouse-server.servicesystemctl start clickhouse-server.servicesystemctl status clickhouse-server.service创建数据库
根据goflow2传到kafka的json参数,创建数据库及用得到的表 先进入clickhouse并创建数据库
clickhouse-clientCREATE DATABASE netflow;创建netflow.flows表
此表作为MergeTree表,是最终存储和处理数据的地方,提供高效的查询性能。
-- 创建数据CREATE TABLE netflow.flows( `type` String, `time_received_ns` UInt64, `sequence_num` UInt32, `sampling_rate` UInt32, `sampler_address` String, `time_flow_start_ns` UInt64, `time_flow_end_ns` UInt64, `bytes` UInt64, `packets` UInt64, `src_addr` String, `dst_addr` String, `etype` String, `proto` String, `src_port` UInt16, `dst_port` UInt16, `in_if` UInt32, `out_if` UInt32, `src_mac` String, `dst_mac` String, `src_vlan` UInt16, `dst_vlan` UInt16, `vlan_id` UInt16, `ip_tos` UInt8, `forwarding_status` UInt8, `ip_ttl` UInt8, `ip_flags` UInt8, `tcp_flags` UInt8, `icmp_type` UInt8, `icmp_code` UInt8, `ipv6_flow_label` UInt32, `fragment_id` UInt32, `fragment_offset` UInt16, `src_as` UInt32, `dst_as` UInt32, `next_hop` String, `next_hop_as` UInt32, `src_net` String, `dst_net` String, `bgp_next_hop` String, `observation_domain_id` UInt32, `observation_point_id` UInt32, -- sflow传过来的数据为纳秒时间戳,转换为秒级时间戳 `timestamp` DateTime DEFAULT toDateTime(time_received_ns / 1000000000))ENGINE = MergeTreePARTITION BY toYYYYMM(timestamp)ORDER BY (timestamp, sampler_address, src_addr, dst_addr)TTL timestamp + toIntervalDay(30)SETTINGS index_granularity = 8192;创建netflow.flows_kafka表
此表作为Kafka引擎表,从Kafka主题中消费数据,是数据进入ClickHouse的桥梁。
-- 创建Kafka引擎表,从kafka的topic中拉取数据CREATE TABLE netflow.flows_kafka( `type` String, `time_received_ns` UInt64, `sequence_num` UInt32, `sampling_rate` UInt32, `sampler_address` String, `time_flow_start_ns` UInt64, `time_flow_end_ns` UInt64, `bytes` UInt64, `packets` UInt64, `src_addr` String, `dst_addr` String, `etype` String, `proto` String, `src_port` UInt16, `dst_port` UInt16, `in_if` UInt32, `out_if` UInt32, `src_mac` String, `dst_mac` String, `src_vlan` UInt16, `dst_vlan` UInt16, `vlan_id` UInt16, `ip_tos` UInt8, `forwarding_status` UInt8, `ip_ttl` UInt8, `ip_flags` UInt8, `tcp_flags` UInt8, `icmp_type` UInt8, `icmp_code` UInt8, `ipv6_flow_label` UInt32, `fragment_id` UInt32, `fragment_offset` UInt16, `src_as` UInt32, `dst_as` UInt32, `next_hop` String, `next_hop_as` UInt32, `src_net` String, `dst_net` String, `bgp_next_hop` String, `observation_domain_id` UInt32, `observation_point_id` UInt32)ENGINE = KafkaSETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'flow-messages', kafka_group_name = 'clickhouse_netflow_consumer', kafka_format = 'JSONEachRow', kafka_num_consumers = 3, kafka_max_block_size = 65536;创建netflow.flows_mv
此表作为物化视图,监听flows_kafka,将数据自动转存到flows表,相当于数据传输的管道。
-- 创建物化视图,把flows_kafka拉取的数据自动转存到 netflow.flowsCREATE MATERIALIZED VIEW netflow.flows_mv TO netflow.flows( `type` String, `time_received_ns` UInt64, `sequence_num` UInt32, `sampling_rate` UInt32, `sampler_address` String, `time_flow_start_ns` UInt64, `time_flow_end_ns` UInt64, `bytes` UInt64, `packets` UInt64, `src_addr` String, `dst_addr` String, `etype` String, `proto` String, `src_port` UInt16, `dst_port` UInt16, `in_if` UInt32, `out_if` UInt32, `src_mac` String, `dst_mac` String, `src_vlan` UInt16, `dst_vlan` UInt16, `vlan_id` UInt16, `ip_tos` UInt8, `forwarding_status` UInt8, `ip_ttl` UInt8, `ip_flags` UInt8, `tcp_flags` UInt8, `icmp_type` UInt8, `icmp_code` UInt8, `ipv6_flow_label` UInt32, `fragment_id` UInt32, `fragment_offset` UInt16, `src_as` UInt32, `dst_as` UInt32, `next_hop` String, `next_hop_as` UInt32, `src_net` String, `dst_net` String, `bgp_next_hop` String, `observation_domain_id` UInt32, `observation_point_id` UInt32)AS SELECT *FROM netflow.flows_kafka;配置完成后检查下clickhouse能否从kafka拉取数据
-- 检查数据是否为0SELECT count() FROM netflow.flows;
-- 检查最近的几条数据SELECT *FROM netflow.flowsORDER BY timestamp DESCLIMIT 5;正常来说你应该能看到
goflow :) SELECT count() FROM netflow.flows;
SELECT count()FROM netflow.flows
Query id: 357f75ee-5db4-4e9c-bc9a-141fe0674240
┌─count()─┐1. │ 1300 │ └─────────┘
1 row in set. Elapsed: 0.003 sec.
goflow :) SELECT *FROM netflow.flowsORDER BY timestamp DESCLIMIT 5;
Query id: 666e4e49-1a34-4795-b577-2347e4ca7534
Row 1:──────type: SFLOW_5time_received_ns: 1768289585360343652sequence_num: 1092sampling_rate: 2000sampler_address: 192.168.1.201.....timestamp: 2026-01-13 07:33:05.....安装grafana
sudo apt-get install -y apt-transport-https wgetsudo mkdir -p /etc/apt/keyrings/wget -q -O - https://apt.grafana.com/gpg.key | gpg --dearmor | sudo tee /etc/apt/keyrings/grafana.gpg > /dev/null# Updates the list of available packagessudo apt-get update# Installs the latest OSS release:sudo apt-get install grafana#好欸,又能摸鱼半小时启动grafana服务
systemctl daemon-reloadsystemctl enable grafana-server.servicesystemctl start grafana-server.service安装插件
访问http://<ip>:3000即可打开grafana
默认账号密码均为admin
进入平台后选择administration>Plugins and data>Plugins搜索clickhouse
点击Install进行安装
添加数据源
安装完成后点击Add new data source链接到数据库
Server address127.0.0.1Server port9000其他保持默认,点击Save & test 不出意外的话会提示
Data source is workingNext, you can start to visualize data by building a dashboard , or by querying data in the Explore view .然后刷新一下界面,在Default DB and table Default database栏内输入netflow,再次点击点击Save & test就连接完成了
新建监控面板
点击Dashboards>Create dashboard>Add visualization数据源选择刚刚配置的clickhouse,修改下方A栏目内的Editor Type为SQL Editor
在下方代码框内输入
SELECT src_addr AS source_ip, dst_addr AS dest_ip, formatReadableSize(sum(bytes) * 2000) AS total_traffic, --根据采集比例自行调整 sum(packets) * 2000 AS total_packets, --根据采集比例自行调整 min(timestamp) AS first_seen, max(timestamp) AS last_seen, dateDiff('second', min(timestamp), max(timestamp)) AS duration_seconds, count() AS flow_countFROM netflow.flowsWHERE $__timeFilter(timestamp)GROUP BY src_addr, dst_addrORDER BY sum(bytes) DESCLIMIT 100点击代码框右上方Run Query就出图啦,因为我们是要看详细信息,所以说在右侧栏内,将Time series修改为Table就可以了
改成table后:
点击右上方save dashboard,按需修改下内容保存退出
可在退出后的界面手动调整图标大小
最终效果

先挖个坑在这儿: 后续更新:对接ip数据库,显示ip地址地理信息