网站相似度计算:裸机 & Kubernetes 部署实战
背景与目标
- 任务:基于 Flink Table API,用 SQL 计算网站间的相似度(Jaccard Coefficient)。
- 数据:
referrer-referree
格式的 CSV,数千到数万条记录。 - 目标:
- 跑通 Flink Job,并且能够在外部访问 flink web ui
- 在K8S集群中部署flink,能够使用多台机器共同计算较大的数据集
一些常用命令备忘:
## 将文本文件转换为csv
# 1. 添加表头
echo "referrer,referree" > medium_relation.csv
# 2. 替换空格为逗号并追加到新文件
sed 's/ /,/g' medium_relation >> medium_relation.csv
## 压缩和解压缩
tar -czvf xxx
tar -xzvf xxx.tar.gz -C ~/
# -c 创建一个新的 tar 文件
# -x 解压文件
# -z 使用gzip压缩 后缀为.tar.gz
# -j 使用bzip2压缩 后缀为.tar.bz2
# -v 显示详细的压缩过程
# -f 指定 tar 文件的名称
# -C 指定解压缩包的目录
## 下载文件
curl -L -o helm-v3.7.0.tar.gz https://github.com/helm/helm/releases/download/v3.7.0/helm-v3.7.0-linux-amd64.tar.gz
# -L:表示跟随重定向(很多 GitHub 下载链接会重定向到实际的下载 URL)
# -o helm-v3.7.0.tar.gz:指定下载文件的保存文件名
wget https://github.com/helm/helm/releases/download/v3.7.0/helm-v3.7.0-linux-amd64.tar.gz
# wget 在下载时会自动保存文件到当前目录,文件名通常是 URL 中最后的部分
一、裸机部署 Flink
环境安装 & 启动 Flink
- Flink 1.16.2
- Java 8
- Maven 3.9.4
flink 安装好了之后修改flink/conf/flink-conf.yaml
中的rest.address
和rest.band.address
为0.0.0.0
(为了能够从webui进行查看,当然别忘了修改防火墙配置)
并且使用scripts/start-cluster.sh
启动 Flink
项目代码
maven 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.edu.shu</groupId>
<artifactId>websitesimilarity</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>WebsiteSimilarity</name>
<description>Website Similarity Calculation using Flink</description>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- Flink Streaming Java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.2</version>
</dependency>
<!-- Flink Clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.2</version>
</dependency>
<!-- Flink Java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.16.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.17.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Java 代码
package cn.edu.shu;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
/**
* 最简单的基于 SQL 的网站相似度计算
*/
public class WebsiteSimilarityJob {
public static void main(String[] args) throws Exception {
// 创建 Flink Table 环境(批处理模式)
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// CSV 文件路径,确保文件已转换成 referrer, referree 格式
String inputPath = "file:///home/ubuntu/similarity_app/medium_relation.csv";
String outputPath = "file:///home/ubuntu/similarity_app/output_result.csv";
// 注册源表 links
tEnv.executeSql(
"CREATE TEMPORARY TABLE links (" +
" referrer BIGINT, " +
" referree BIGINT" +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = '" + inputPath + "', " +
" 'format' = 'csv', " +
" 'csv.ignore-parse-errors' = 'true'" +
")"
);
// 计算相似度,并取前50
Table result = tEnv.sqlQuery(
"WITH website_union AS (" +
" SELECT wl1.referrer AS website_A, " +
" wl2.referrer AS website_B, " +
" wl1.referree AS referree_A, " +
" wl2.referree AS referree_B " +
" FROM links wl1 " +
" JOIN links wl2 " +
" ON wl1.referree = wl2.referree " +
" WHERE wl1.referrer <> wl2.referrer " +
") " +
"SELECT website_A, website_B, " +
" COUNT(DISTINCT referree_A) * 1.0 / " +
" (SELECT COUNT(DISTINCT referree) FROM (" +
" SELECT referree_A AS referree FROM website_union " +
" UNION " +
" SELECT referree_B AS referree FROM website_union)) AS similarity " +
"FROM website_union " +
"GROUP BY website_A, website_B " +
"LIMIT 50"
);
// 注册输出表,将结果保存到文件
tEnv.executeSql(
"CREATE TEMPORARY TABLE output_result (" +
" website_A BIGINT, " +
" website_B BIGINT, " +
" similarity DOUBLE" +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = '" + outputPath + "', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = ','" + // 设置字段分隔符为逗号
")"
);
// 将结果写入文件
result.executeInsert("output_result").await();
// 输出完成信息
System.out.println("Result has been saved to: " + outputPath);
}
}
编译 & 部署
mvn package
flink run --class cn.edu.shu.WebsiteSimilarityJob ~/similarity_app/target/xxx.jar
然后作业就会提交到Flink中,可以在web UI上查看进度
!=
不被支持 → 改用 SQL 标准<>
。- 多字段
COUNT(DISTINCT a,b)
不支持 → 重写子查询。
二、Kubernetes 部署 Flink
:arrow_right: Apache Flink 官方参考指南
前提条件
确保安装了docker , kubernetes , helm
helm 和 kubernetes 的版本需要匹配,可查询:arrow_right: Helm官方文档
安装 Operator
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.11.0/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
Flink Kubernetes Operator 的作用就是 识别 YAML 配置文件中的 FlinkDeployment
类型,并根据配置自动执行相关的操作。它实际上是一个 Kubernetes 控制器,负责管理和协调 Kubernetes 集群中的 Flink 集群的生命周期。
Flink Kubernetes Operator 工作流程:
- 提交
FlinkDeployment
:- 您通过
kubectl apply -f flink-deployment.yaml
提交一个包含 Flink 集群配置的 YAML 文件(FlinkDeployment
)。
- 您通过
- Operator 监控
FlinkDeployment
:- Flink Kubernetes Operator 会监听 Kubernetes 中的
FlinkDeployment
资源。 - 一旦
FlinkDeployment
被创建或更新,Operator 会识别该资源,并根据其内容执行相关操作。
- Flink Kubernetes Operator 会监听 Kubernetes 中的
- 创建和管理资源:
- 根据
FlinkDeployment
中的配置,Operator 会自动创建和管理 Kubernetes 中的 JobManager 和 TaskManager Pods。 - 它会根据配置中的
replicas
、resource
、image
等字段来启动相应的资源。
- 根据
- 更新集群:
- 如果您更改了
FlinkDeployment
中的配置(例如,修改镜像版本或调整资源),Operator 会自动处理集群的升级和更新。 - 例如,您更改了镜像版本,Operator 会根据新配置启动新的 Pods,替换旧的 Pods。
- 如果您更改了
编写部署YAML
一个官方测试的一个YAML脚本 be like:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
spec:
image: flink:1.20
flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless
其中指定了 flink 的 image 版本,还有 jobManager 和 taskManager 的数量和配置等,再在job中指定了运行的 jar 的 URI 。
如果是分布式环境的话,需要确保 JAR 在一个大家都能访问得到的地方。
GPT说有以下几种方法可以尝试:
- 方法 1: 使用 Kubernetes ConfigMap 上传 JAR 文件
ConfigMap 是一种 Kubernetes 资源对象,主要用于存储配置文件、环境变量等文本数据。您可以将 JAR 文件作为 ConfigMap 存储在 Kubernetes 中。
- 方法 2: 使用 Kubernetes Persistent Volume (PV)
Persistent Volume (PV) 允许您将外部存储(如 NFS、Ceph、AWS EBS 等)挂载到 Kubernetes 中,供容器访问。
- 方法 3: 使用云存储(如 AWS S3)
如果您的 Kubernetes 集群在 AWS 上,您可以将 JAR 文件上传到 S3 存储桶,并通过
jarURI
引用它。
因为我的这个JAR很小,大小仅 3.5 KB(可以通过ls -lh
查看) → 选用 ConfigMap 共享
用 kubectl 对 JAR 文件进行挂载:
kubectl create configmap flink-job-jar \
--from-file=websitesimilarity-1.0-SNAPSHOT.jar=target/websitesimilarity-1.0-SNAPSHOT.jar
ConfigMap 更新:每次 Jar 变动都需 kubectl delete configmap && create configmap
FlinkDeployment.yaml 样例:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
spec:
image: flink:1.16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
jobManager:
resource: { cpu:1, memory:"4096m" }
podTemplate:
spec:
volumes:
- name: flink-job-jar
configMap: { name: flink-job-jar }
containers:
- name: flink-job-manager
image: flink:1.16
volumeMounts:
- name: flink-job-jar
mountPath: /opt/flink/jobs
subPath: websitesimilarity-1.0-SNAPSHOT.jar
taskManager:
replicas: 2
resource: { cpu:1, memory:"4096m" }
podTemplate:
spec:
volumes:
- name: flink-job-jar
configMap: { name: flink-job-jar }
containers:
- name: flink-task-manager
image: flink:1.16
volumeMounts:
- name: flink-job-jar
mountPath: /opt/flink/jobs
subPath: websitesimilarity-1.0-SNAPSHOT.jar
job:
jarURI: local:///opt/flink/jobs/websitesimilarity-1.0-SNAPSHOT.jar
parallelism: 2
upgradeMode: stateless
部署
kubectl apply -f flink-deployment.yaml
→ Operator 自动创建 Deployment查看服务:
kubectl get svc
flinkapp
(6123/TCP, 6124/TCP) 内部通信flinkapp-rest
(8081/TCP) Web UI
端口转发:
如果想在本地浏览器中访问,需要加上 –address 0.0.0.0 的选项,并且记得放开防火墙8085端口
kubectl port-forward --address 0.0.0.0 svc/flinkapp-rest 8085:8081