2465 字
12 分钟
搭建基于goflow2+kafka+clickhouse+grafana架构的跨省流量监控

本次使用的系统及环境为:

软件/系统版本
Ubuntu22.04.5
GoFlow2v2.2.6
Kafkav4.1.1
ClickHousev25.12.2.54
Grafanav12.3.1

实现流程#

交换机通过sFlow/NetFlow协议将流量数据发送到GoFlow2采集器,GoFlow2解析后以JSON格式写入Kafka消息队列,ClickHouse从Kafka消费数据并存储到列式数据库,最后Grafana通过SQL查询ClickHouse进行可视化展示和监控。

交换机(sFlow/NetFlow)
↓ (UDP 6343/2055端口)
GoFlow2(协议解析)
↓ (JSON格式)
Kafka(消息缓冲)
↓ (消费者拉取)
ClickHouse(列式存储)
↓ (SQL查询)
Grafana(可视化监控)

下载并部署kafka#

安装java#

kafka需要本地java环境为≥17,先安装java

Terminal window
apt update && apt install -y openjdk-17-jdk

下载kafka并修改环境变量#

Terminal window
wget https://dlcdn.apache.org/kafka/4.1.1/kafka_2.13-4.1.1.tgz
tar -xzf kafka_2.13-4.1.1.tgz
mv kafka_2.13-4.1.1 /opt/
sudo ln -s /opt/kafka_2.13-4.1.1 /opt/kafka
#修改环境变量
cat << 'EOF' >> /etc/profile
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
EOF
#使环境变量立即生效
source /etc/profile
#查看一下kafka版本,判断环境变量和kafka是否正常
kafka-topics.sh --version
#4.1.1

初始化kafka#

Terminal window
# 创建数据目录
sudo mkdir -p /var/lib/kafka/data
sudo mkdir -p /var/lib/zookeeper/data
#每个Kafka集群需要一个全局唯一ID,先随机生成 Kafka 的集群 UUID
KAFKA_CLUSTER_ID="$(/opt/kafka/bin/kafka-storage.sh random-uuid)"
echo "Cluster UUID: $KAFKA_CLUSTER_ID"
#Cluster UUID: tStfHNWWSJ6fK555k9ILGg
# 创建持久化数据目录
sudo mkdir -p /var/lib/kafka/kraft-combined-logs
# 设置权限
sudo chmod -R 755 /var/lib/kafka

编辑Kafka配置文件#

每个参数的含义可以参考这个链接

Terminal window
nano /opt/kafka/config/server.properties
server.properties
############################# Server Basics #############################
# KRaft 模式角色配置
# 因为我是单台部署,服务器同时担任数据存储和元数据管理角色,正式生产环境建议分离
process.roles=broker,controller
# 设置节点id
node.id=1
# 以下端口均可自由修改,但需要同时修改goflow和clickhouse的配置
controller.quorum.bootstrap.servers=localhost:9093
############################# Socket Server Settings #############################
# 监听器配置
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 网络线程配置
num.network.threads=3
num.io.threads=8
# Socket 缓冲区配置
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 日志目录(持久化存储)
log.dirs=/var/lib/kafka/kraft-combined-logs
# 分区配置(提高并行性)
num.partitions=3
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# 内部主题副本因子
offsets.topic.replication.factor=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
# 日志保留策略(7天)
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Performance Tuning #############################
# 压缩(节省存储和带宽)
compression.type=snappy
# 批处理优化
batch.size=16384
linger.ms=10
# 网络缓冲
replica.socket.receive.buffer.bytes=65536

创建服务#

Terminal window
nano /etc/systemd/system/kafka.service
kafka.service
[Unit]
Description=Apache Kafka Server (KRaft Mode)
Documentation=https://kafka.apache.org/documentation/
After=network.target
Wants=network-online.target
[Service]
Type=simple
User=root
Group=root
# Java 堆内存配置(根据你的服务器内存调整)
Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"
Environment="LOG_DIR=/var/lib/kafka/logs"
Environment="KAFKA_HOME=/opt/kafka"
# 工作目录
WorkingDirectory=/opt/kafka
# 启动命令
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
# 停止命令
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
# 重启策略
Restart=on-failure
RestartSec=10
StartLimitBurst=3
StartLimitInterval=120
# 日志
StandardOutput=journal
StandardError=journal
SyslogIdentifier=kafka
# 安全和资源限制
LimitNOFILE=65536
LimitNPROC=4096
[Install]
WantedBy=multi-user.target

启动kafka服务#

Terminal window
systemctl daemon-reload
systemctl enable kafka
systemctl start kafka
systemctl status kafka
Terminal window
# 创建 flow-messages topic(用于 NetFlow/IPFIX/sFlow 数据)
/opt/kafka/bin/kafka-topics.sh --create --topic flow-messages --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --config retention.ms=86400000 --config compression.type=lz4
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

下载并部署goflow2#

Terminal window
wget https://github.com/netsampler/goflow2/releases/download/v2.2.6/goflow2-2.2.6-linux-amd64
chmod +x goflow2-2.2.6-linux-amd64
mv goflow2-2.2.6-linux-amd64 /usr/local/bin/goflow2

创建工作目录#

Terminal window
mkdir -p /var/log/goflow2
nano /etc/systemd/system/goflow2.service

编辑服务#

goflow2.service
[Unit]
Description=GoFlow2 NetFlow/IPFIX/sFlow Collector
Documentation=https://github.com/netsampler/goflow2
After=network.target kafka.service
Wants=kafka.service
[Service]
Type=simple
User=root
Group=root
WorkingDirectory=/var/log/goflow2
# 将数据转发到kafka
ExecStart=/usr/local/bin/goflow2 \
-transport=kafka \
-transport.kafka.brokers=localhost:9092 \
-transport.kafka.topic=flow-messages \
-listen=sflow://:6343,netflow://:2055 \
-format=json \
-loglevel=info \
-addr=:8080
Restart=on-failure
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=goflow2
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target

启动Goflow2服务#

Terminal window
systemctl daemon-reload
systemctl enable goflow2
systemctl start goflow2
systemctl status goflow2

在交换机上配置sflow#

警告

以下操作因涉及到交换机配置,如果您不了解每个配置的含义,建议您停止操作,以免导致损失。
以下操作因涉及到交换机配置,如果您不了解每个配置的含义,建议您停止操作,以免导致损失。
以下操作因涉及到交换机配置,如果您不了解每个配置的含义,建议您停止操作,以免导致损失。

S系列:#

以华为S5720为例,不同设备命令不同,可通过这个链接查询设备对应信息

Terminal window
#在system上配置
sflow collector <1-2> ip <监控服务器ip> port 6343
sflow agent ip <交换机ip>
#进入需要监控的网口进行配置
interface <你要配置的网口>
sflow flow-sampling collector <你设定的collector id>
sflow flow-sampling rate 2000 #采集比例 1:2000,根据自身流量大小进行修改,建议参考官方文档
undo sflow flow-sampling inbound #只采集上行流量,不写默认监控上行和下行

CE系列#

以华为CE12804为例,不同设备命令不同,可通过这个链接查询设备对应信息

Terminal window
#在system上配置
sflow collector <1-2> ip <监控服务器ip> udp-port 6343
sflow agent ip <交换机ip>
#进入需要监控的网口进行配置
interface <你要配置的网口>
sflow sampling collector <你设定的collector id>
sflow sampling rate 32768 #采集比例 1:32768,根据自身流量大小进行修改,建议参考官方文档
sflow sampling outbound #只采集上行流量

检查采样包#

查看这个接口当前是否在采样,有没有采样包。

Terminal window
dis sflow statistics interface <你配置的网口>

如果没有包的话,返回服务器内查看goflow状态和6343端口是否正常,或检查交换机配置是否正常。

如果有采集包,在服务器内输入

Terminal window
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-messages --from-beginning

来查看goflow2到kafka数据传输是否正常,正常情况下会收到以下内容:

{"type":"SFLOW_5","time_received_ns":1768279339983732918,"sequence_num":4,"sampling_rate":2048,"sampler_address":"192.168.1.201","time_flow_start_ns":1768279339983732918,"time_flow_end_ns":1768279339983732918,"bytes":68,"packets":1,"src_addr":"100.64.32.105","dst_addr":"112.17.34.247","etype":"IPv4","proto":"TCP","src_port":60534,"dst_port":80,"in_if":54,"out_if":21,"src_mac":"00:90:27:e1:12:d8","dst_mac":"a0:f4:79:59:d9:c5","src_vlan":100,"dst_vlan":100,"vlan_id":100,"ip_tos":0,"forwarding_status":0,"ip_ttl":64,"ip_flags":2,"tcp_flags":4,"icmp_type":0,"icmp_code":0,"ipv6_flow_label":0,"fragment_id":0,"fragment_offset":0,"src_as":0,"dst_as":0,"next_hop":"","next_hop_as":0,"src_net":"0.0.0.0/0","dst_net":"0.0.0.0/0","bgp_next_hop":"","bgp_communities":[],"as_path":[],"mpls_ttl":[],"mpls_label":[],"mpls_ip":[],"observation_domain_id":0,"observation_point_id":0,"layer_stack":["Ethernet","Dot1Q","IPv4","TCP"],"layer_size":[14,4,20,20],"ipv6_routing_header_addresses":[],"ipv6_routing_header_seg_left":0}

安装clickhouse#

Terminal window
sudo apt-get install -y apt-transport-https ca-certificates curl gnupg
curl -fsSL 'https://packages.clickhouse.com/rpm/lts/repodata/repomd.xml.key' | sudo gpg --dearmor -o /usr/share/keyrings/clickhouse-keyring.gpg
ARCH=$(dpkg --print-architecture)
echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg arch=${ARCH}] https://packages.clickhouse.com/deb stable main" | sudo tee /etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client
#这里网络不好的话大概能摸鱼1h-2h左右(大概

等看到提示Set up the password for the default user: 直接回车即可

启动clickhouse服务#

Terminal window
systemctl enable clickhouse-server.service
systemctl start clickhouse-server.service
systemctl status clickhouse-server.service

创建数据库#

根据goflow2传到kafka的json参数,创建数据库及用得到的表 先进入clickhouse并创建数据库

Terminal window
clickhouse-client
CREATE DATABASE netflow;

创建netflow.flows表#

此表作为MergeTree表,是最终存储和处理数据的地方,提供高效的查询性能。

netflow.flows
-- 创建数据
CREATE TABLE netflow.flows
(
`type` String,
`time_received_ns` UInt64,
`sequence_num` UInt32,
`sampling_rate` UInt32,
`sampler_address` String,
`time_flow_start_ns` UInt64,
`time_flow_end_ns` UInt64,
`bytes` UInt64,
`packets` UInt64,
`src_addr` String,
`dst_addr` String,
`etype` String,
`proto` String,
`src_port` UInt16,
`dst_port` UInt16,
`in_if` UInt32,
`out_if` UInt32,
`src_mac` String,
`dst_mac` String,
`src_vlan` UInt16,
`dst_vlan` UInt16,
`vlan_id` UInt16,
`ip_tos` UInt8,
`forwarding_status` UInt8,
`ip_ttl` UInt8,
`ip_flags` UInt8,
`tcp_flags` UInt8,
`icmp_type` UInt8,
`icmp_code` UInt8,
`ipv6_flow_label` UInt32,
`fragment_id` UInt32,
`fragment_offset` UInt16,
`src_as` UInt32,
`dst_as` UInt32,
`next_hop` String,
`next_hop_as` UInt32,
`src_net` String,
`dst_net` String,
`bgp_next_hop` String,
`observation_domain_id` UInt32,
`observation_point_id` UInt32,
-- sflow传过来的数据为纳秒时间戳,转换为秒级时间戳
`timestamp` DateTime DEFAULT toDateTime(time_received_ns / 1000000000)
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, sampler_address, src_addr, dst_addr)
TTL timestamp + toIntervalDay(30)
SETTINGS index_granularity = 8192;

创建netflow.flows_kafka表#

此表作为Kafka引擎表,从Kafka主题中消费数据,是数据进入ClickHouse的桥梁。

netflow.flows_kafka
-- 创建Kafka引擎表,从kafka的topic中拉取数据
CREATE TABLE netflow.flows_kafka
(
`type` String,
`time_received_ns` UInt64,
`sequence_num` UInt32,
`sampling_rate` UInt32,
`sampler_address` String,
`time_flow_start_ns` UInt64,
`time_flow_end_ns` UInt64,
`bytes` UInt64,
`packets` UInt64,
`src_addr` String,
`dst_addr` String,
`etype` String,
`proto` String,
`src_port` UInt16,
`dst_port` UInt16,
`in_if` UInt32,
`out_if` UInt32,
`src_mac` String,
`dst_mac` String,
`src_vlan` UInt16,
`dst_vlan` UInt16,
`vlan_id` UInt16,
`ip_tos` UInt8,
`forwarding_status` UInt8,
`ip_ttl` UInt8,
`ip_flags` UInt8,
`tcp_flags` UInt8,
`icmp_type` UInt8,
`icmp_code` UInt8,
`ipv6_flow_label` UInt32,
`fragment_id` UInt32,
`fragment_offset` UInt16,
`src_as` UInt32,
`dst_as` UInt32,
`next_hop` String,
`next_hop_as` UInt32,
`src_net` String,
`dst_net` String,
`bgp_next_hop` String,
`observation_domain_id` UInt32,
`observation_point_id` UInt32
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'flow-messages',
kafka_group_name = 'clickhouse_netflow_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 3,
kafka_max_block_size = 65536;

创建netflow.flows_mv#

此表作为物化视图,监听flows_kafka,将数据自动转存到flows表,相当于数据传输的管道。

netflow.flows_mv
-- 创建物化视图,把flows_kafka拉取的数据自动转存到 netflow.flows
CREATE MATERIALIZED VIEW netflow.flows_mv TO netflow.flows
(
`type` String,
`time_received_ns` UInt64,
`sequence_num` UInt32,
`sampling_rate` UInt32,
`sampler_address` String,
`time_flow_start_ns` UInt64,
`time_flow_end_ns` UInt64,
`bytes` UInt64,
`packets` UInt64,
`src_addr` String,
`dst_addr` String,
`etype` String,
`proto` String,
`src_port` UInt16,
`dst_port` UInt16,
`in_if` UInt32,
`out_if` UInt32,
`src_mac` String,
`dst_mac` String,
`src_vlan` UInt16,
`dst_vlan` UInt16,
`vlan_id` UInt16,
`ip_tos` UInt8,
`forwarding_status` UInt8,
`ip_ttl` UInt8,
`ip_flags` UInt8,
`tcp_flags` UInt8,
`icmp_type` UInt8,
`icmp_code` UInt8,
`ipv6_flow_label` UInt32,
`fragment_id` UInt32,
`fragment_offset` UInt16,
`src_as` UInt32,
`dst_as` UInt32,
`next_hop` String,
`next_hop_as` UInt32,
`src_net` String,
`dst_net` String,
`bgp_next_hop` String,
`observation_domain_id` UInt32,
`observation_point_id` UInt32
)
AS SELECT *
FROM netflow.flows_kafka;

配置完成后检查下clickhouse能否从kafka拉取数据

-- 检查数据是否为0
SELECT count() FROM netflow.flows;
-- 检查最近的几条数据
SELECT *
FROM netflow.flows
ORDER BY timestamp DESC
LIMIT 5;

正常来说你应该能看到

Terminal window
goflow :) SELECT count() FROM netflow.flows;
SELECT count()
FROM netflow.flows
Query id: 357f75ee-5db4-4e9c-bc9a-141fe0674240
┌─count()─┐
1. 1300
└─────────┘
1 row in set. Elapsed: 0.003 sec.
goflow :) SELECT *
FROM netflow.flows
ORDER BY timestamp DESC
LIMIT 5;
Query id: 666e4e49-1a34-4795-b577-2347e4ca7534
Row 1:
──────
type: SFLOW_5
time_received_ns: 1768289585360343652
sequence_num: 1092
sampling_rate: 2000
sampler_address: 192.168.1.201
.....
timestamp: 2026-01-13 07:33:05
.....

安装grafana#

Terminal window
sudo apt-get install -y apt-transport-https wget
sudo mkdir -p /etc/apt/keyrings/
wget -q -O - https://apt.grafana.com/gpg.key | gpg --dearmor | sudo tee /etc/apt/keyrings/grafana.gpg > /dev/null
# Updates the list of available packages
sudo apt-get update
# Installs the latest OSS release:
sudo apt-get install grafana
#好欸,又能摸鱼半小时

启动grafana服务#

Terminal window
systemctl daemon-reload
systemctl enable grafana-server.service
systemctl start grafana-server.service

安装插件#

访问http://<ip>:3000即可打开grafana 默认账号密码均为admin 进入平台后选择administration>Plugins and data>Plugins搜索clickhouse alt text 点击Install进行安装

添加数据源#

安装完成后点击Add new data source链接到数据库

Server address
127.0.0.1
Server port
9000

其他保持默认,点击Save & test 不出意外的话会提示

Data source is working
Next, you can start to visualize data by building a dashboard , or by querying data in the Explore view .

然后刷新一下界面,在Default DB and table Default database栏内输入netflow,再次点击点击Save & test就连接完成了

新建监控面板#

点击Dashboards>Create dashboard>Add visualization数据源选择刚刚配置的clickhouse,修改下方A栏目内的Editor Type为SQL Editor alt text 在下方代码框内输入

SELECT
src_addr AS source_ip,
dst_addr AS dest_ip,
formatReadableSize(sum(bytes) * 2000) AS total_traffic, --根据采集比例自行调整
sum(packets) * 2000 AS total_packets, --根据采集比例自行调整
min(timestamp) AS first_seen,
max(timestamp) AS last_seen,
dateDiff('second', min(timestamp), max(timestamp)) AS duration_seconds,
count() AS flow_count
FROM netflow.flows
WHERE $__timeFilter(timestamp)
GROUP BY src_addr, dst_addr
ORDER BY sum(bytes) DESC
LIMIT 100

点击代码框右上方Run Query就出图啦,因为我们是要看详细信息,所以说在右侧栏内,将Time series修改为Table就可以了 alt text 改成table后: alt text 点击右上方save dashboard,按需修改下内容保存退出 可在退出后的界面手动调整图标大小

最终效果#

alt text

先挖个坑在这儿: 后续更新:对接ip数据库,显示ip地址地理信息

搭建基于goflow2+kafka+clickhouse+grafana架构的跨省流量监控
https://blog.tql.xyz/posts/ipaddress_traffic_monitor/
作者
Senko
发布于
2026-01-13
许可协议
CC BY-NC-SA 4.0