写一个简单的应用,要求根监督者有 1 个监督者子进程和 1 个工作者子进程 A,监督者子进程有 10 个同类型的工作者子进程 B。向 A 发送 1 个消息,A 可以把消息派发给 B1、B2 ... B10。
x
$ rebar3 new release simple_application
===> Writing simple_application/apps/simple_application/src/simple_application_app.erl
===> Writing simple_application/apps/simple_application/src/simple_application_sup.erl
===> Writing simple_application/apps/simple_application/src/simple_application.app.src
===> Writing simple_application/rebar.config
===> Writing simple_application/config/sys.config
===> Writing simple_application/config/vm.args
===> Writing simple_application/.gitignore
===> Writing simple_application/LICENSE
===> Writing simple_application/README.md
将工作目录切换到项目根目录:
xxxxxxxxxx
$ cd simple_application
x
$ tree .
.
├── LICENSE
├── README.md
├── apps
│ └── simple_application
│ └── src
│ ├── proxy_server.erl
│ ├── simple_application.app.src
│ ├── simple_application_app.erl
│ ├── simple_application_sup.erl
│ ├── worker_lib.erl
│ ├── worker_server.erl
│ └── worker_server_sup.erl
├── config
│ ├── sys.config
│ └── vm.args
└── rebar.config
4 directories, 12 files
下文未提及的文件的内容保持不变。
config/sys.config:
xxxxxxxxxx
[
{
simple_application, [
{worker_server_process_count, 10},
{worker_server_process_name_prefix, worker_server}
]
}
].
apps/simple_application/src/proxy_server.erl:
xxxxxxxxxx
-module(proxy_server).
-behaviour(gen_server).
-record(state, {workers :: [atom()]}).
-import(worker_lib, [ ]).
%% API 函数
-export([ , ]).
%% gen_server 回调函数
-export([
,
,
,
,
,
]).
%%%% %%%% %%%% %%%%
%%%% 下面是为客户端提供的接口
%%%% %%%% %%%% %%%%
%% 启动一个服务,后台将启动一个 erlang process,并且进入 loop 函数。
%% 其实现在 OTP 代码的 lib/stdlib/src/gen_server.erl。
-spec start_link() -> {ok, pid()}.
start_link() ->
%% gen_server:start_link/4 启动 process,并且将其注册在当前 node 上,名称为当前 Module 名
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%% 向 Proxy Server 发送消息,并且等待其响应
send_message(Message) ->
gen_server:call(?MODULE, {send_message, Message}).
%%%% %%%% %%%% %%%%
%%%% 下面是 gen_server 在发生事件时回调的函数
%%%% %%%% %%%% %%%%
%% gen_server:start_link 被调用,服务启动时,将回调 init/1。
init([]) ->
{ok, #state{workers = get_worker_names()}}.
%% gen_server 进程收到一个普通 erlang 消息,
%% 即不是通过 gen_server:call 和 gen_server:cast 发来的消息。
handle_info(_Message, State) ->
{noreply, State}.
%% gen_server:call 被调用时,将回调该函数。
%% gen_server:call 是“同步”调用,调用方可以设置超时时间。
%% 返回值里的 Reply 是返回给调用者的内容。
handle_call({send_message, Message}, _From, State) ->
%% 从列表中,随机获取一个 Worker Server
PidList = State#state.workers,
Pid = lists:nth(round(random:uniform() * 1000000) rem erlang:length(PidList) + 1, PidList),
io:format("~p received [~p], proxy it to [~p]~n", [?MODULE, Message, Pid]),
Result = gen_server:call(Pid, Message),
Reply = {success, Result},
{reply, Reply, State}.
%% gen_server:cast 被调用时,将回调该函数。
%% gen_server:cast 是“异步”调用。
%% 调用者给 gen_server 发送消息,但不接收 gen_server 回复。
handle_cast(_Message, State) ->
{noreply, State}.
%% 上面的三个函数:handle_call/3、handle_cast/2、handle_info/2,
%% 都可以返回 {stop, Reason, State},在这种情况下,gen_server 将退出。
%% 退出前,将回调 terminate/2。
%% Reason 为 normal 被认为是正常退出;
%% 对于其它 Reason,sasl 将打印错误日志。
terminate(Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
apps/simple_application/src/worker_lib.erl:
xxxxxxxxxx
-module(worker_lib).
-define(PROCESS_NAME_PREFIX, worker_server).
-define(PROCESS_COUNT, 10).
-export([ ]).
%% 获取 Worker Server 的名称列表。
%% 进程数量由 worker_server_process_count 确定,默认值为 PROCESS_COUNT 宏的值;
%% 进程名称前缀由 worker_server_process_name_prefix 确定,默认值为 PROCESS_NAME_PREFIX 宏的值
-spec get_worker_names() -> [atom()].
get_worker_names() ->
ProcessCount = case application:get_env(worker_server_process_count) of
{ok, Count} -> Count;
_ -> ?PROCESS_COUNT
end,
ProcessNamePrefix = case application:get_env(worker_server_process_name_prefix) of
{ok, Prefix} -> Prefix;
_ -> ?PROCESS_NAME_PREFIX
end,
get_worker_names(ProcessCount, ProcessNamePrefix, []).
get_worker_names(0, _, ProcessNameList) -> ProcessNameList;
get_worker_names(CurrentCount, ProcessNamePrefix, ProcessNameList) ->
ProcessName = list_to_atom(
string:join(
[
atom_to_list(ProcessNamePrefix),
integer_to_list(CurrentCount)
],
"_"
)
),
get_worker_names(CurrentCount - 1, ProcessNamePrefix, [ProcessName | ProcessNameList]).
apps/simple_application/src/worker_server.erl:
xxxxxxxxxx
-module(worker_server).
-behaviour(gen_server).
-record(state, {worker_name :: atom()}).
-export([ ]).
-export([
,
,
,
]).
start_link(WorkerName) ->
gen_server:start_link({local, WorkerName}, ?MODULE, [WorkerName], []).
init([WorkerName]) ->
{ok, #state{worker_name = WorkerName}}.
handle_call(Request, From, State) ->
io:format("~p got [~p] from [~p]~n", [State#state.worker_name, Request, From]),
Reply = ok,
{reply, Reply, State}.
handle_cast(_Message, State) ->
{noreply, State}.
handle_info(_Message, State) ->
{noreply, State}.
apps/simple_application/src/worker_server_sup.erl:
xxxxxxxxxx
-module(worker_server_sup).
-behaviour(supervisor).
-export([ ]).
-export([ ]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
SupFlags = #{strategy => one_for_one,
intensity => 2,
period => 10},
WorkerNames = worker_lib:get_worker_names(),
ChildSpecs = generate_child_specs(WorkerNames, []),
{ok, {SupFlags, ChildSpecs}}.
generate_child_specs([], ChildSpecs) -> ChildSpecs;
generate_child_specs([WorkerName | Rest], ChildSpecs) ->
ChildSpec = #{
id => WorkerName,
start => {worker_server, start_link, [WorkerName]},
restart => permanent,
shutdown => brutal_kill,
type => worker,
modules => [cg3]
},
generate_child_specs(Rest, [ChildSpec | ChildSpecs]).
apps/simple_application/src/simple_application_sup.erl:
xxxxxxxxxx
-module(simple_application_sup).
-behaviour(supervisor).
-export([ ]).
-export([ ]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
%% strategy:
%% - one_for_one:如果子进程停止,则重启它
%% - one_for_all:如果一个子进程停止,则停止其它所有子进程,然后重启停止的子进程
%% - rest_for_one:如果一个子进程停止,则停止启动顺序中在其后的所有子进程,然后重启停止的子进程
%% - simple_one_for_one:简化的 one_for_one,所有子进程都是同类型进程,并且是动态添加的实例
%% intensity 和 period 共同指定 supervisor 的重启限度。
%% 比如 1、5,表示 5s 内最多重启 1 次,超过该限度后,
%% supervisor 进程将终止所有子进程和它本身。能够有效地防止 supervisor 无限重启。
SupFlags = #{strategy => one_for_one,
intensity => 2,
period => 10},
%% 子进程规范:
%% - id:用于 supervisor 内部识别子规范的名字
%% - start:由三元组 {M, F, A} 确定
%% - restart:定义子进程的重启策略。
%% permanent 表示始终重启;
%% temporary 表示绝不重启;
%% transient 表示仅当子进程异常终止时,才重启,即除 normal 以外的终止原因
%% - shutdown:定义子进程的终止策略。
%% brutal_kill 表示使用 exit(Child, kill) 无条件地终止子进程;
%% 整数 timeout 值表示 supervisor 通过调用 exit(Child, shutdown) 告诉子进程请终止自己,然后等待子进程返回退出信号。如果没有在指定的时间内接收到(来自子进程的) 退出信号,则使用 exit(Child, kill) 无条件终止子进程;
%% 如果子进程是另一个 supervisor,则应该设置为 infinity,给子树足够的时间来终止
%% - type:指定子进程是 supervisor,还是 worker
%% - modules:如果子进程是 supervisor,或子进程是 worker,并且采用 gen_server 或 gen_fsm 行为模式实现,则 modules 是 callback 模块的名字;
%% 如果子进程是 worker,并且采用 gen_event 行为模式实现,则 modules 应该为 dynamic ,该信息用于在升级和降级时,供 release handler 使用
ChildSpecs = [
#{
id => worker_server_sup,
start => {worker_server_sup, start_link, []},
restart => permanent,
shutdown => brutal_kill,
type => supervisor,
modules => [cg3]
},
#{
id => proxy_server,
start => {proxy_server, start_link, []},
restart => permanent,
shutdown => brutal_kill,
type => worker,
modules => [cg3]
}
],
{ok, {SupFlags, ChildSpecs}}.
xxxxxxxxxx
$ rebar3 as prod tar
xxxxxxxxxx
_build/prod/rel/simple_application/bin/simple_application console
在交互式 Erlang Shell 中:
x
1> supervisor:count_children(simple_application_sup).
[{specs,2},{active,2},{supervisors,1},{workers,1}]
2> supervisor:count_children(worker_server_sup).
[{specs,10},{active,10},{supervisors,0},{workers,10}]
3> proxy_server:send_message("This is a message.").
proxy_server received ["This is a message."], proxy it to [worker_server_6]
worker_server_6 got ["This is a message."] from [{<0.280.0>,
[alias|
#Ref<0.2217166263.2602369027.229245>]}]
{success,ok}
4> proxy_server:send_message("This is a message.").
proxy_server received ["This is a message."], proxy it to [worker_server_1]
worker_server_1 got ["This is a message."] from [{<0.280.0>,
[alias|
#Ref<0.2217166263.2602369027.229255>]}]
{success,ok}
5> exit(whereis(worker_server_1), shutdown).
true
%% 执行若干次 proxy_server:send_message("This is a message."),直到消息被代理到 worker_server_1
13> proxy_server:send_message("This is a message.").
proxy_server received ["This is a message."], proxy it to [worker_server_1]
worker_server_1 got ["This is a message."] from [{<0.280.0>,
[alias|
#Ref<0.2217166263.2602369027.229339>]}]
{success,ok}
14> exit(whereis(proxy_server), shutdown).
true
15> proxy_server:send_message("This is a message.").
proxy_server received ["This is a message."], proxy it to [worker_server_9]
worker_server_9 got ["This is a message."] from [{<0.280.0>,
[alias|
#Ref<0.2217166263.2602369027.229353>]}]
{success,ok}
16> exit(whereis(worker_server_sup), shutdown).
true
17> proxy_server:send_message("This is a message.").
proxy_server received ["This is a message."], proxy it to [worker_server_10]
worker_server_10 got ["This is a message."] from [{<0.280.0>,
[alias|
#Ref<0.2217166263.2602369027.229367>]}]
{success,ok}
%% 退出
18> halt().