网站相似度计算:裸机 & Kubernetes 部署实战

背景与目标

  • 任务:基于 Flink Table API,用 SQL 计算网站间的相似度(Jaccard Coefficient)。
  • 数据referrer-referree 格式的 CSV,数千到数万条记录。
  • 目标
    1. 跑通 Flink Job,并且能够在外部访问 flink web ui
    2. 在K8S集群中部署flink,能够使用多台机器共同计算较大的数据集

一些常用命令备忘:

## 将文本文件转换为csv
# 1. 添加表头
echo "referrer,referree" > medium_relation.csv
# 2. 替换空格为逗号并追加到新文件
sed 's/ /,/g' medium_relation >> medium_relation.csv

## 压缩和解压缩
tar -czvf xxx
tar -xzvf xxx.tar.gz -C ~/
# -c 创建一个新的 tar 文件
# -x 解压文件
# -z 使用gzip压缩 后缀为.tar.gz
# -j 使用bzip2压缩 后缀为.tar.bz2 
# -v 显示详细的压缩过程
# -f 指定 tar 文件的名称
# -C 指定解压缩包的目录

## 下载文件
curl -L -o helm-v3.7.0.tar.gz https://github.com/helm/helm/releases/download/v3.7.0/helm-v3.7.0-linux-amd64.tar.gz
# -L:表示跟随重定向(很多 GitHub 下载链接会重定向到实际的下载 URL)
# -o helm-v3.7.0.tar.gz:指定下载文件的保存文件名
wget https://github.com/helm/helm/releases/download/v3.7.0/helm-v3.7.0-linux-amd64.tar.gz
# wget 在下载时会自动保存文件到当前目录,文件名通常是 URL 中最后的部分
  • Flink 1.16.2
  • Java 8
  • Maven 3.9.4

flink 安装好了之后修改flink/conf/flink-conf.yaml中的rest.addressrest.band.address0.0.0.0(为了能够从webui进行查看,当然别忘了修改防火墙配置)

并且使用scripts/start-cluster.sh启动 Flink

项目代码

maven 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.edu.shu</groupId>
  <artifactId>websitesimilarity</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>WebsiteSimilarity</name>
  <description>Website Similarity Calculation using Flink</description>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <!-- Flink Streaming Java -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>1.16.2</version>
    </dependency>

    <!-- Flink Clients -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>1.16.2</version>
    </dependency>

    <!-- Flink Java -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.16.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge</artifactId>
      <version>1.17.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>1.17.0</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <!-- Maven Compiler Plugin -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

Java 代码

package cn.edu.shu;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

/**
 * 最简单的基于 SQL 的网站相似度计算
 */
public class WebsiteSimilarityJob {
    public static void main(String[] args) throws Exception {
        // 创建 Flink Table 环境(批处理模式)
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inBatchMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // CSV 文件路径,确保文件已转换成 referrer, referree 格式
        String inputPath = "file:///home/ubuntu/similarity_app/medium_relation.csv";
        String outputPath = "file:///home/ubuntu/similarity_app/output_result.csv";

        // 注册源表 links
        tEnv.executeSql(
                "CREATE TEMPORARY TABLE links (" +
                        " referrer BIGINT, " +
                        " referree BIGINT" +
                        ") WITH (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = '" + inputPath + "', " +
                        " 'format' = 'csv', " +
                        " 'csv.ignore-parse-errors' = 'true'" +
                        ")"
        );

        // 计算相似度,并取前50
        Table result = tEnv.sqlQuery(
                "WITH website_union AS (" +
                        "   SELECT wl1.referrer AS website_A, " +
                        "          wl2.referrer AS website_B, " +
                        "          wl1.referree AS referree_A, " +
                        "          wl2.referree AS referree_B " +
                        "   FROM links wl1 " +
                        "   JOIN links wl2 " +
                        "     ON wl1.referree = wl2.referree " +
                        "   WHERE wl1.referrer <> wl2.referrer " +
                        ") " +
                        "SELECT website_A, website_B, " +
                        "       COUNT(DISTINCT referree_A) * 1.0 / " +
                        "       (SELECT COUNT(DISTINCT referree) FROM (" +
                        "         SELECT referree_A AS referree FROM website_union " +
                        "         UNION " +
                        "         SELECT referree_B AS referree FROM website_union)) AS similarity " +
                        "FROM website_union " +
                        "GROUP BY website_A, website_B " +
                        "LIMIT 50"
        );

        // 注册输出表,将结果保存到文件
        tEnv.executeSql(
                "CREATE TEMPORARY TABLE output_result (" +
                        " website_A BIGINT, " +
                        " website_B BIGINT, " +
                        " similarity DOUBLE" +
                        ") WITH (" +
                        " 'connector' = 'filesystem', " +
                        " 'path' = '" + outputPath + "', " +
                        " 'format' = 'csv', " +
                        " 'csv.field-delimiter' = ','" + // 设置字段分隔符为逗号
                        ")"
        );

        // 将结果写入文件
        result.executeInsert("output_result").await();

        // 输出完成信息
        System.out.println("Result has been saved to: " + outputPath);
    }
}

编译 & 部署

mvn package
flink run --class cn.edu.shu.WebsiteSimilarityJob ~/similarity_app/target/xxx.jar

然后作业就会提交到Flink中,可以在web UI上查看进度

  • != 不被支持 → 改用 SQL 标准 <>
  • 多字段 COUNT(DISTINCT a,b) 不支持 → 重写子查询。

:arrow_right: Apache Flink 官方参考指南

前提条件

确保安装了docker , kubernetes , helm

helm 和 kubernetes 的版本需要匹配,可查询:arrow_right: Helm官方文档

安装 Operator

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.11.0/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

Flink Kubernetes Operator 的作用就是 识别 YAML 配置文件中的 FlinkDeployment 类型,并根据配置自动执行相关的操作。它实际上是一个 Kubernetes 控制器,负责管理和协调 Kubernetes 集群中的 Flink 集群的生命周期。

Flink Kubernetes Operator 工作流程

  1. 提交 FlinkDeployment
    • 您通过 kubectl apply -f flink-deployment.yaml 提交一个包含 Flink 集群配置的 YAML 文件(FlinkDeployment)。
  2. Operator 监控 FlinkDeployment
    • Flink Kubernetes Operator 会监听 Kubernetes 中的 FlinkDeployment 资源。
    • 一旦 FlinkDeployment 被创建或更新,Operator 会识别该资源,并根据其内容执行相关操作。
  3. 创建和管理资源
    • 根据 FlinkDeployment 中的配置,Operator 会自动创建和管理 Kubernetes 中的 JobManagerTaskManager Pods。
    • 它会根据配置中的 replicasresourceimage 等字段来启动相应的资源。
  4. 更新集群
    • 如果您更改了 FlinkDeployment 中的配置(例如,修改镜像版本或调整资源),Operator 会自动处理集群的升级和更新。
    • 例如,您更改了镜像版本,Operator 会根据新配置启动新的 Pods,替换旧的 Pods。

编写部署YAML

一个官方测试的一个YAML脚本 be like:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.20
  flinkVersion: v1_20
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: stateless

其中指定了 flink 的 image 版本,还有 jobManager 和 taskManager 的数量和配置等,再在job中指定了运行的 jar 的 URI 。

如果是分布式环境的话,需要确保 JAR 在一个大家都能访问得到的地方。

GPT说有以下几种方法可以尝试:

  • 方法 1: 使用 Kubernetes ConfigMap 上传 JAR 文件

ConfigMap 是一种 Kubernetes 资源对象,主要用于存储配置文件、环境变量等文本数据。您可以将 JAR 文件作为 ConfigMap 存储在 Kubernetes 中。

  • 方法 2: 使用 Kubernetes Persistent Volume (PV)

Persistent Volume (PV) 允许您将外部存储(如 NFS、Ceph、AWS EBS 等)挂载到 Kubernetes 中,供容器访问。

  • 方法 3: 使用云存储(如 AWS S3)

​ 如果您的 Kubernetes 集群在 AWS 上,您可以将 JAR 文件上传到 S3 存储桶,并通过 jarURI 引用它。

因为我的这个JAR很小,大小仅 3.5 KB(可以通过ls -lh查看) → 选用 ConfigMap 共享

用 kubectl 对 JAR 文件进行挂载:

kubectl create configmap flink-job-jar \
  --from-file=websitesimilarity-1.0-SNAPSHOT.jar=target/websitesimilarity-1.0-SNAPSHOT.jar

ConfigMap 更新:每次 Jar 变动都需 kubectl delete configmap && create configmap

FlinkDeployment.yaml 样例:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  jobManager:
    resource: { cpu:1, memory:"4096m" }
    podTemplate:
      spec:
        volumes:
          - name: flink-job-jar
            configMap: { name: flink-job-jar }
        containers:
          - name: flink-job-manager
            image: flink:1.16
            volumeMounts:
              - name: flink-job-jar
                mountPath: /opt/flink/jobs
                subPath: websitesimilarity-1.0-SNAPSHOT.jar
  taskManager:
    replicas: 2
    resource: { cpu:1, memory:"4096m" }
    podTemplate:
      spec:
        volumes:
          - name: flink-job-jar
            configMap: { name: flink-job-jar }
        containers:
          - name: flink-task-manager
            image: flink:1.16
            volumeMounts:
              - name: flink-job-jar
                mountPath: /opt/flink/jobs
                subPath: websitesimilarity-1.0-SNAPSHOT.jar
  job:
    jarURI: local:///opt/flink/jobs/websitesimilarity-1.0-SNAPSHOT.jar
    parallelism: 2
    upgradeMode: stateless

部署

  1. kubectl apply -f flink-deployment.yaml → Operator 自动创建 Deployment

  2. 查看服务:kubectl get svc

    • flinkapp (6123/TCP, 6124/TCP) 内部通信
    • flinkapp-rest (8081/TCP) Web UI
  3. 端口转发:

    如果想在本地浏览器中访问,需要加上 –address 0.0.0.0 的选项,并且记得放开防火墙8085端口

    kubectl port-forward --address 0.0.0.0 svc/flinkapp-rest 8085:8081