IP | 主机名 | 角色 | 操作系统 | 配置 | Flink 版本 |
---|---|---|---|---|---|
192.168.56.10 | flink-10 | Master | Ubuntu 20.04.6 LTS | 4Core、4G | 1.17.1 |
192.168.56.11 | flink-11 | Worker | Ubuntu 20.04.6 LTS | 4Core、4G | 1.17.1 |
192.168.56.12 | Flink-12 | Worker | Ubuntu 20.04.6 LTS | 4Core、4G | 1.17.1 |
在每台机器执行:
xxxxxxxxxx
ssh-keygen -t rsa
遇到输入提示符直接按回车即可。
将每台机器的 ~/.ssh/id_rsa.pub
文件的内容追加到其它机器的 ~/.ssh/authorized_keys
文件。注意,~/.ssh/authorized_keys
的权限必须是 600 或 400。
在每台机器执行:
x
apt update && apt install -y openjdk-8-jdk
然后,执行:
x
realpath $(which java)
假设输出是 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java,那么 JAVA_HOME 是 /usr/lib/jvm/java-8-openjdk-amd64/。
在 /etc/profile 末尾增加如下内容:
x
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/
export PATH=$PATH:$JAVA_HOME/bin/
接下来,执行:
xxxxxxxxxx
source /etc/profile
最后,验证 Java 是否安装成功:
xxxxxxxxxx
java -version
在每台机器安装 Python 3.7、3.8、3.9 或 3.10,并且创建软链接:
x
ln -s $(which python3) /usr/bin/python
验证,保证 python 命令已指向 Python3:
xxxxxxxxxx
python -V
如果机器未安装 pip,那么安装:
xxxxxxxxxx
apt install -y python3-pip
接下来,安装 apache-flink 的依赖包:
xxxxxxxxxx
apt install -y python3-testresources
最后,安装 apache-flink,本文使用的版本是最新的 1.17.1(由于该包较大,所以最好配置代理):
x
python -m pip install apache-flink==1.17.1
官网下载是:https://flink.apache.org/downloads/,本文使用的版本是 1.17.1。
在每台机器执行:
x
wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
mkdir -p /opt
tar zxf flink-1.17.1-bin-scala_2.12.tgz -C /opt
下文假设已经切换到 /opt/flink-1.17.1/
目录。
修改所有机器的 conf/flink-conf.yaml:
x
jobmanager.rpc.address192.168.56.10
jobmanager.rpc.port6123
jobmanager.bind-host0.0.0.0
taskmanager.bind-host0.0.0.0
# 值是机器自己的 IP 地址,比如 192.168.56.11
taskmanager.host a.b.c.d
taskmanager.numberOfTaskSlots4
rest.address192.168.56.10
rest.bind-address0.0.0.0
# 新增如下配置
blob.server.port6124
将 Master 节点的 conf/masters 文件的内容设置为:
x
192.168.56.10:8081
将 Master 节点的 conf/workers 文件的内容设置为:
xxxxxxxxxx
192.168.56.11
192.168.56.12
请参阅 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/,获取 Flink 支持的全部配置。
在 Master 节点上执行:
x
/opt/flink-1.17.1/bin/start-cluster.sh
然后在每台机器上执行:
xxxxxxxxxx
jps
在 Master 节点上可以看到类似下面的内容:
xxxxxxxxxx
17808 StandaloneSessionClusterEntrypoint
17957 Jps
在 Worker 节点上可以看到类似下面的内容:
xxxxxxxxxx
17726 TaskManagerRunner
17887 Jps
如果启动失败,那么请查看 /opt/flink-1.17.1/log/
下的日志文件。
使用浏览器打开 http://192.168.56.10:8081/
,可以查看 Flink Dashboard。
从官方仓库(https://github.com/apache/flink/blob/master/flink-python/pyflink/examples/table/word_count.py)获取 word_count.py。
在任意一台机器上执行:
x
/opt/flink-1.17.1/bin/flink run -py word_count.py -Dblob.server.host=192.168.56.10
如果在 Master 节点上执行上述命令,那么可以不指定 -Dblob.server.host=192.168.56.10
。
命令的输出如下:
xxxxxxxxxx
Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 6c3e4c4d75d5920cb1d63c716cf75830
使用浏览器打开 http://192.168.56.10:8081/
,在左侧导航栏,点击 Jobs >> Completed Jobs,可以找到刚才运行的任务。
xxxxxxxxxx
Vagrant.configure("2") do |config|
config.vm.box = "ubuntu/focal64"
vms = Array(10..12)
vms.each do |seq|
config.vm.define :"flink-#{seq}" do |vagrant|
vagrant.vm.hostname = "flink-#{seq}"
vagrant.vm.network "private_network", ip: "192.168.56.#{seq}"
vagrant.vm.provider "virtualbox" do |vb|
vb.customize ["modifyvm", :id, "--name", "flink-#{seq}"]
vb.gui = false
vb.memory = "4096"
vb.cpus = "4"
end
end
end
end