写一个简单的应用,要求根监督者有 1 个监督者子进程和 1 个工作者子进程 A,监督者子进程有 10 个同类型的工作者子进程 B。向 A 发送 1 个消息,A 可以把消息派发给 B1、B2 ... B10。
x
$ rebar3 new release simple_application===> Writing simple_application/apps/simple_application/src/simple_application_app.erl===> Writing simple_application/apps/simple_application/src/simple_application_sup.erl===> Writing simple_application/apps/simple_application/src/simple_application.app.src===> Writing simple_application/rebar.config===> Writing simple_application/config/sys.config===> Writing simple_application/config/vm.args===> Writing simple_application/.gitignore===> Writing simple_application/LICENSE===> Writing simple_application/README.md将工作目录切换到项目根目录:
xxxxxxxxxx$ cd simple_applicationx
$ tree ..├── LICENSE├── README.md├── apps│ └── simple_application│ └── src│ ├── proxy_server.erl│ ├── simple_application.app.src│ ├── simple_application_app.erl│ ├── simple_application_sup.erl│ ├── worker_lib.erl│ ├── worker_server.erl│ └── worker_server_sup.erl├── config│ ├── sys.config│ └── vm.args└── rebar.config
4 directories, 12 files下文未提及的文件的内容保持不变。
config/sys.config:
xxxxxxxxxx[ { simple_application, [ {worker_server_process_count, 10}, {worker_server_process_name_prefix, worker_server} ] }].apps/simple_application/src/proxy_server.erl:
xxxxxxxxxx-module(proxy_server).-behaviour(gen_server).-record(state, {workers :: [atom()]}).-import(worker_lib, []).%% API 函数-export([, ]).%% gen_server 回调函数-export([ , , , , , ]).
%%%% %%%% %%%% %%%%%%%% 下面是为客户端提供的接口%%%% %%%% %%%% %%%%
%% 启动一个服务,后台将启动一个 erlang process,并且进入 loop 函数。%% 其实现在 OTP 代码的 lib/stdlib/src/gen_server.erl。-spec start_link() -> {ok, pid()}.start_link() -> %% gen_server:start_link/4 启动 process,并且将其注册在当前 node 上,名称为当前 Module 名 gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%% 向 Proxy Server 发送消息,并且等待其响应send_message(Message) -> gen_server:call(?MODULE, {send_message, Message}).
%%%% %%%% %%%% %%%%%%%% 下面是 gen_server 在发生事件时回调的函数%%%% %%%% %%%% %%%%
%% gen_server:start_link 被调用,服务启动时,将回调 init/1。init([]) -> {ok, #state{workers = get_worker_names()}}.
%% gen_server 进程收到一个普通 erlang 消息,%% 即不是通过 gen_server:call 和 gen_server:cast 发来的消息。handle_info(_Message, State) -> {noreply, State}.
%% gen_server:call 被调用时,将回调该函数。%% gen_server:call 是“同步”调用,调用方可以设置超时时间。%% 返回值里的 Reply 是返回给调用者的内容。handle_call({send_message, Message}, _From, State) -> %% 从列表中,随机获取一个 Worker Server PidList = State#state.workers, Pid = lists:nth(round(random:uniform() * 1000000) rem erlang:length(PidList) + 1, PidList), io:format("~p received [~p], proxy it to [~p]~n", [?MODULE, Message, Pid]), Result = gen_server:call(Pid, Message), Reply = {success, Result}, {reply, Reply, State}.
%% gen_server:cast 被调用时,将回调该函数。%% gen_server:cast 是“异步”调用。%% 调用者给 gen_server 发送消息,但不接收 gen_server 回复。handle_cast(_Message, State) -> {noreply, State}.
%% 上面的三个函数:handle_call/3、handle_cast/2、handle_info/2,%% 都可以返回 {stop, Reason, State},在这种情况下,gen_server 将退出。%% 退出前,将回调 terminate/2。
%% Reason 为 normal 被认为是正常退出;%% 对于其它 Reason,sasl 将打印错误日志。terminate(Reason, _State) -> ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}.apps/simple_application/src/worker_lib.erl:
xxxxxxxxxx-module(worker_lib).-define(PROCESS_NAME_PREFIX, worker_server).-define(PROCESS_COUNT, 10).-export([]).
%% 获取 Worker Server 的名称列表。%% 进程数量由 worker_server_process_count 确定,默认值为 PROCESS_COUNT 宏的值;%% 进程名称前缀由 worker_server_process_name_prefix 确定,默认值为 PROCESS_NAME_PREFIX 宏的值-spec get_worker_names() -> [atom()].get_worker_names() -> ProcessCount = case application:get_env(worker_server_process_count) of {ok, Count} -> Count; _ -> ?PROCESS_COUNT end, ProcessNamePrefix = case application:get_env(worker_server_process_name_prefix) of {ok, Prefix} -> Prefix; _ -> ?PROCESS_NAME_PREFIX end, get_worker_names(ProcessCount, ProcessNamePrefix, []).
get_worker_names(0, _, ProcessNameList) -> ProcessNameList;get_worker_names(CurrentCount, ProcessNamePrefix, ProcessNameList) -> ProcessName = list_to_atom( string:join( [ atom_to_list(ProcessNamePrefix), integer_to_list(CurrentCount) ], "_" ) ), get_worker_names(CurrentCount - 1, ProcessNamePrefix, [ProcessName | ProcessNameList]).apps/simple_application/src/worker_server.erl:
xxxxxxxxxx-module(worker_server).-behaviour(gen_server).-record(state, {worker_name :: atom()}).-export([]).-export([ , , , ]).
start_link(WorkerName) -> gen_server:start_link({local, WorkerName}, ?MODULE, [WorkerName], []).
init([WorkerName]) -> {ok, #state{worker_name = WorkerName}}.
handle_call(Request, From, State) -> io:format("~p got [~p] from [~p]~n", [State#state.worker_name, Request, From]), Reply = ok, {reply, Reply, State}.
handle_cast(_Message, State) -> {noreply, State}.
handle_info(_Message, State) -> {noreply, State}.apps/simple_application/src/worker_server_sup.erl:
xxxxxxxxxx-module(worker_server_sup).
-behaviour(supervisor).
-export([]).
-export([]).
-define(SERVER, ?MODULE).
start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) -> SupFlags = #{strategy => one_for_one, intensity => 2, period => 10}, WorkerNames = worker_lib:get_worker_names(), ChildSpecs = generate_child_specs(WorkerNames, []), {ok, {SupFlags, ChildSpecs}}.
generate_child_specs([], ChildSpecs) -> ChildSpecs;generate_child_specs([WorkerName | Rest], ChildSpecs) -> ChildSpec = #{ id => WorkerName, start => {worker_server, start_link, [WorkerName]}, restart => permanent, shutdown => brutal_kill, type => worker, modules => [cg3] }, generate_child_specs(Rest, [ChildSpec | ChildSpecs]).apps/simple_application/src/simple_application_sup.erl:
xxxxxxxxxx-module(simple_application_sup).
-behaviour(supervisor).
-export([]).
-export([]).
-define(SERVER, ?MODULE).
start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) -> %% strategy: %% - one_for_one:如果子进程停止,则重启它 %% - one_for_all:如果一个子进程停止,则停止其它所有子进程,然后重启停止的子进程 %% - rest_for_one:如果一个子进程停止,则停止启动顺序中在其后的所有子进程,然后重启停止的子进程 %% - simple_one_for_one:简化的 one_for_one,所有子进程都是同类型进程,并且是动态添加的实例 %% intensity 和 period 共同指定 supervisor 的重启限度。 %% 比如 1、5,表示 5s 内最多重启 1 次,超过该限度后, %% supervisor 进程将终止所有子进程和它本身。能够有效地防止 supervisor 无限重启。 SupFlags = #{strategy => one_for_one, intensity => 2, period => 10},
%% 子进程规范: %% - id:用于 supervisor 内部识别子规范的名字 %% - start:由三元组 {M, F, A} 确定 %% - restart:定义子进程的重启策略。 %% permanent 表示始终重启; %% temporary 表示绝不重启; %% transient 表示仅当子进程异常终止时,才重启,即除 normal 以外的终止原因 %% - shutdown:定义子进程的终止策略。 %% brutal_kill 表示使用 exit(Child, kill) 无条件地终止子进程; %% 整数 timeout 值表示 supervisor 通过调用 exit(Child, shutdown) 告诉子进程请终止自己,然后等待子进程返回退出信号。如果没有在指定的时间内接收到(来自子进程的) 退出信号,则使用 exit(Child, kill) 无条件终止子进程; %% 如果子进程是另一个 supervisor,则应该设置为 infinity,给子树足够的时间来终止 %% - type:指定子进程是 supervisor,还是 worker %% - modules:如果子进程是 supervisor,或子进程是 worker,并且采用 gen_server 或 gen_fsm 行为模式实现,则 modules 是 callback 模块的名字; %% 如果子进程是 worker,并且采用 gen_event 行为模式实现,则 modules 应该为 dynamic ,该信息用于在升级和降级时,供 release handler 使用 ChildSpecs = [ #{ id => worker_server_sup, start => {worker_server_sup, start_link, []}, restart => permanent, shutdown => brutal_kill, type => supervisor, modules => [cg3] }, #{ id => proxy_server, start => {proxy_server, start_link, []}, restart => permanent, shutdown => brutal_kill, type => worker, modules => [cg3] } ], {ok, {SupFlags, ChildSpecs}}.xxxxxxxxxx$ rebar3 as prod tarxxxxxxxxxx_build/prod/rel/simple_application/bin/simple_application console在交互式 Erlang Shell 中:
x
1> supervisor:count_children(simple_application_sup).[{specs,2},{active,2},{supervisors,1},{workers,1}]2> supervisor:count_children(worker_server_sup).[{specs,10},{active,10},{supervisors,0},{workers,10}]3> proxy_server:send_message("This is a message.").proxy_server received ["This is a message."], proxy it to [worker_server_6]worker_server_6 got ["This is a message."] from [{<0.280.0>, [alias| #Ref<0.2217166263.2602369027.229245>]}]{success,ok}4> proxy_server:send_message("This is a message.").proxy_server received ["This is a message."], proxy it to [worker_server_1]worker_server_1 got ["This is a message."] from [{<0.280.0>, [alias| #Ref<0.2217166263.2602369027.229255>]}]{success,ok}5> exit(whereis(worker_server_1), shutdown).true
%% 执行若干次 proxy_server:send_message("This is a message."),直到消息被代理到 worker_server_1
13> proxy_server:send_message("This is a message.").proxy_server received ["This is a message."], proxy it to [worker_server_1]worker_server_1 got ["This is a message."] from [{<0.280.0>, [alias| #Ref<0.2217166263.2602369027.229339>]}]{success,ok}14> exit(whereis(proxy_server), shutdown).true15> proxy_server:send_message("This is a message.").proxy_server received ["This is a message."], proxy it to [worker_server_9]worker_server_9 got ["This is a message."] from [{<0.280.0>, [alias| #Ref<0.2217166263.2602369027.229353>]}]{success,ok}16> exit(whereis(worker_server_sup), shutdown).true17> proxy_server:send_message("This is a message.").proxy_server received ["This is a message."], proxy it to [worker_server_10]worker_server_10 got ["This is a message."] from [{<0.280.0>, [alias| #Ref<0.2217166263.2602369027.229367>]}]{success,ok}
%% 退出18> halt().