| IP | 主机名 | 角色 | 操作系统 | 配置 | Flink 版本 |
|---|---|---|---|---|---|
| 192.168.56.10 | flink-10 | Master | Ubuntu 20.04.6 LTS | 4Core、4G | 1.17.1 |
| 192.168.56.11 | flink-11 | Worker | Ubuntu 20.04.6 LTS | 4Core、4G | 1.17.1 |
| 192.168.56.12 | Flink-12 | Worker | Ubuntu 20.04.6 LTS | 4Core、4G | 1.17.1 |
在每台机器执行:
xxxxxxxxxxssh-keygen -t rsa遇到输入提示符直接按回车即可。
将每台机器的 ~/.ssh/id_rsa.pub 文件的内容追加到其它机器的 ~/.ssh/authorized_keys 文件。注意,~/.ssh/authorized_keys 的权限必须是 600 或 400。
在每台机器执行:
x
apt update && apt install -y openjdk-8-jdk然后,执行:
x
realpath $(which java)假设输出是 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java,那么 JAVA_HOME 是 /usr/lib/jvm/java-8-openjdk-amd64/。
在 /etc/profile 末尾增加如下内容:
x
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/export PATH=$PATH:$JAVA_HOME/bin/接下来,执行:
xxxxxxxxxxsource /etc/profile最后,验证 Java 是否安装成功:
xxxxxxxxxxjava -version在每台机器安装 Python 3.7、3.8、3.9 或 3.10,并且创建软链接:
x
ln -s $(which python3) /usr/bin/python验证,保证 python 命令已指向 Python3:
xxxxxxxxxxpython -V如果机器未安装 pip,那么安装:
xxxxxxxxxxapt install -y python3-pip接下来,安装 apache-flink 的依赖包:
xxxxxxxxxxapt install -y python3-testresources最后,安装 apache-flink,本文使用的版本是最新的 1.17.1(由于该包较大,所以最好配置代理):
x
python -m pip install apache-flink==1.17.1官网下载是:https://flink.apache.org/downloads/,本文使用的版本是 1.17.1。
在每台机器执行:
x
wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgzmkdir -p /opttar zxf flink-1.17.1-bin-scala_2.12.tgz -C /opt下文假设已经切换到 /opt/flink-1.17.1/ 目录。
修改所有机器的 conf/flink-conf.yaml:
x
jobmanager.rpc.address192.168.56.10jobmanager.rpc.port6123jobmanager.bind-host0.0.0.0taskmanager.bind-host0.0.0.0# 值是机器自己的 IP 地址,比如 192.168.56.11taskmanager.hosta.b.c.dtaskmanager.numberOfTaskSlots4rest.address192.168.56.10rest.bind-address0.0.0.0
# 新增如下配置blob.server.port6124将 Master 节点的 conf/masters 文件的内容设置为:
x
192.168.56.10:8081将 Master 节点的 conf/workers 文件的内容设置为:
xxxxxxxxxx192.168.56.11192.168.56.12请参阅 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/,获取 Flink 支持的全部配置。
在 Master 节点上执行:
x
/opt/flink-1.17.1/bin/start-cluster.sh然后在每台机器上执行:
xxxxxxxxxxjps在 Master 节点上可以看到类似下面的内容:
xxxxxxxxxx17808 StandaloneSessionClusterEntrypoint17957 Jps
在 Worker 节点上可以看到类似下面的内容:
xxxxxxxxxx17726 TaskManagerRunner17887 Jps
如果启动失败,那么请查看 /opt/flink-1.17.1/log/ 下的日志文件。
使用浏览器打开 http://192.168.56.10:8081/,可以查看 Flink Dashboard。
从官方仓库(https://github.com/apache/flink/blob/master/flink-python/pyflink/examples/table/word_count.py)获取 word_count.py。
在任意一台机器上执行:
x
/opt/flink-1.17.1/bin/flink run -py word_count.py -Dblob.server.host=192.168.56.10如果在 Master 节点上执行上述命令,那么可以不指定 -Dblob.server.host=192.168.56.10。
命令的输出如下:
xxxxxxxxxxExecuting word_count example with default input data set.Use --input to specify file input.Printing result to stdout. Use --output to specify output path.Job has been submitted with JobID 6c3e4c4d75d5920cb1d63c716cf75830
使用浏览器打开 http://192.168.56.10:8081/,在左侧导航栏,点击 Jobs >> Completed Jobs,可以找到刚才运行的任务。
xxxxxxxxxxVagrant.configure("2") do |config| config.vm.box = "ubuntu/focal64"
vms = Array(10..12) vms.each do |seq| config.vm.define :"flink-#{seq}" do |vagrant| vagrant.vm.hostname = "flink-#{seq}" vagrant.vm.network "private_network", ip: "192.168.56.#{seq}" vagrant.vm.provider "virtualbox" do |vb| vb.customize ["modifyvm", :id, "--name", "flink-#{seq}"] vb.gui = false vb.memory = "4096" vb.cpus = "4" end end endend