概述
在erlang otp源码中,随处可见proc_lib的身影,可以发现,在otp中spawn一个进程,都不是通过erlang:spawn函数,而是通过proc_lib:spawn。那通过这俩库spawn出的进程有啥区别呢?我们要理解erlang otp其它组件的源码,必须要先去了解proc_lib做了什么事情。
官方文档对proc_lib的解释是这样的:
> Functions for asynchronous and synchronous start of processes adhering to the OTP design principles.
就是说proc_lib提供符合OTP设计原则的同步或异步进程启动函数。关于OTP设计原则,后续会单开一篇进行讲述,它大致描述了一系列的代码组织标准,包括进程、模块以及项目目录组织结构等等。通过proc_lib启动的进程,会容易符合这种设计原则的要求。
proc_lib开放的API很多,但基本可以分成spawn、start、hibernate、init_ack、init_p、format、initial_call、stop这几组,下面我们一组一组的来看。
spawn 组
spawn组的函数有spawn/1, spawn_link/1, spawn/2, spawn_link/2, spawn/3, spawn_link/3, spawn/4, spawn_link/4, spawn_opt/2, spawn_opt/3, spawn_opt/4, spawn_opt/5
先看最为简单的spawn/1:
-spec spawn(Fun) -> pid() when Fun :: function(). spawn(F) when is_function(F) -> Parent = get_my_name(), Ancestors = get_ancestors(), erlang:spawn(?MODULE, init_p, [Parent,Ancestors,F]).
逻辑如下:
- 通过get_my_name函数获取当前进程的注册名。
- 通过get_ancestors()函数获取进程祖先列表
- 通过原生的erlang:spawn/3创建一个新的进程,以init_p函数作为新的进程逻辑,当前进程名、祖先进程列表和要执行的目标函数作为init_p的执行参数。
get_my_name实现的细节:
get_my_name() -> case proc_info(self(),registered_name) of {registered_name,Name} -> Name; _ -> self() end. ... proc_info(Pid,Item) when node(Pid) =:= node() -> process_info(Pid,Item); proc_info(Pid,Item) -> case lists:member(node(Pid),nodes()) of true -> check(rpc:call(node(Pid), erlang, process_info, [Pid, Item])); _ -> hidden end.
get_my_name()根据进程是本地还是远程,从process_info返回进程注册名称。 process_info是个很有用的函数,process_info/1可以返回指定进程的全部信息:
1> Pid = spawn(fun() -> receive hehe -> hehe end end). <0.35.0> 2> process_info(Pid). [{current_function,{prim_eval,'receive',2}}, {initial_call,{erlang,apply,2}}, {status,waiting}, {message_queue_len,0}, {messages,[]}, {links,[]}, {dictionary,[]}, {trap_exit,false}, {error_handler,error_handler}, {priority,normal}, {group_leader,<0.26.0>}, {total_heap_size,233}, {heap_size,233}, {stack_size,9}, {reductions,17}, {garbage_collection,[{min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,0}]}, {suspending,[]}] 3> Pid ! hehe. hehe 4> process_info(Pid). undefined
而通过process_info/2可以获得某个段的信息。
get_ancestors的实现细节:
get_ancestors() -> case get('$ancestors') of A when is_list(A) -> A; _ -> [] end.
本地版的很简单,直接读进程字典并判断类型是否正确即可。另外还有一个远程版本:
get_ancestors(Pid) -> case get_dictionary(Pid,'$ancestors') of {'$ancestors',Ancestors} -> {ancestors,Ancestors}; _ -> {ancestors,[]} end. ... get_dictionary(Pid,Tag) -> case get_process_info(Pid,dictionary) of {dictionary,Dict} -> case lists:keysearch(Tag,1,Dict) of {value,Value} -> Value; _ -> undefined end; _ -> undefined end. ... get_process_info(Pid, Tag) -> translate_process_info(Tag, catch proc_info(Pid, Tag)). translate_process_info(registered_name, []) -> {registered_name, []}; translate_process_info(_ , {'EXIT', _}) -> undefined; translate_process_info(_, Result) -> Result.
这一长串其实就做了一件事情,从进程字典中读出'$ancestors'这个属性,但因为涉及到远程进程的访问,无法直接使用get,所以需要通过proc_info这个函数(process_info是可以返回进程字典内容的),另外translate_process_info对proc_info返回的结果作了包装,包括异常发生的情况。
最后,我们重点来看init_p这个函数,init_p里面所包含的逻辑才是proc_lib真正的对外出售内容 —— 符合OTP设计原则的进程。init_p的实现:
init_p(Parent, Ancestors, Fun) when is_function(Fun) -> put('$ancestors', [Parent|Ancestors]), Mfa = erlang:fun_info_mfa(Fun), put('$initial_call', Mfa), try Fun() catch Class:Reason -> exit_p(Class, Reason, erlang:get_stacktrace()) end.
逻辑如下:
- 将Parent进程合并入Ancestors列表并加入到进程字典中。
- 获取目标函数的MFA信息(MFA即Module、Function、Args,我们在erlang中会到处看到这个缩写)
- 将MFA信息也保存到进程字典中。
- 在try catch中运行目标函数。
我们看到init_p为进程增加了更多元信息以及提供了一个错误处理框架,目标函数所发生的异常都会由exit_p来处理。exit_p的实现:
exit_p(Class, Reason, Stacktrace) -> case get('$initial_call') of {M,F,A} when is_atom(M), is_atom(F), is_integer(A) -> MFA = {M,F,make_dummy_args(A, [])}, crash_report(Class, Reason, MFA, Stacktrace), erlang:raise(exit, exit_reason(Class, Reason, Stacktrace), Stacktrace); _ -> %% The process dictionary has been cleared or %% possibly modified. crash_report(Class, Reason, [], Stacktrace), erlang:raise(exit, exit_reason(Class, Reason, Stacktrace), Stacktrace) end. exit_reason(error, Reason, Stacktrace) -> {Reason, Stacktrace}; exit_reason(exit, Reason, _Stacktrace) -> Reason; exit_reason(throw, Reason, Stacktrace) -> {{nocatch, Reason}, Stacktrace}.
exit_p做了两件事情,一是调用crash_report生成错误报告,二是通过exit_reason函数重新对异常原因进行标准化包装,然后再次抛出。
crash_report(exit, normal, _, _) -> ok; crash_report(exit, shutdown, _, _) -> ok; crash_report(exit, {shutdown,_}, _, _) -> ok; crash_report(Class, Reason, StartF, Stacktrace) -> OwnReport = my_info(Class, Reason, StartF, Stacktrace), LinkReport = linked_info(self()), Rep = [OwnReport,LinkReport], error_logger:error_report(crash_report, Rep).
以上分析就是proc_lib:spawn/1所做的主要工作了,spawn/2也是大同小异,只不过增加了Node参数:
spawn(Node, F) when is_function(F) -> Parent = get_my_name(), Ancestors = get_ancestors(), erlang:spawn(Node, ?MODULE, init_p, [Parent,Ancestors,F]).
spawn/3和spawn/4所调用的init_p有些差别,目标函数是通过apply调用的:
spawn(M,F,A) when is_atom(M), is_atom(F), is_list(A) -> Parent = get_my_name(), Ancestors = get_ancestors(), erlang:spawn(?MODULE, init_p, [Parent,Ancestors,M,F,A]). spawn(Node, M, F, A) when is_atom(M), is_atom(F), is_list(A) -> Parent = get_my_name(), Ancestors = get_ancestors(), erlang:spawn(Node, ?MODULE, init_p, [Parent,Ancestors,M,F,A]). ... init_p(Parent, Ancestors, M, F, A) when is_atom(M), is_atom(F), is_list(A) -> put('$ancestors', [Parent|Ancestors]), put('$initial_call', trans_init(M, F, A)), init_p_do_apply(M, F, A). init_p_do_apply(M, F, A) -> try apply(M, F, A) catch Class:Reason -> exit_p(Class, Reason, erlang:get_stacktrace()) end.
另外spawn_link做的事情也一样,只不过是通过erlang:spawn_link函数来创建进程的,在当前进程和新创建的进程之间建立了一个link关系:
spawn_link(F) when is_function(F) -> Parent = get_my_name(), Ancestors = get_ancestors(), erlang:spawn_link(?MODULE, init_p, [Parent,Ancestors,F]).
关于spawn_opt,逻辑也一样,重点在于可以传递一些创建进程的控制参数,这里并不准备去研究这些参数,后续会专门拿出一篇来讲述。
start组
前面说过,proc_lib包含的是同步和异步的进程启动API,spawn组的函数无疑都是异步的,而start组提供的都是同步的。相对于spawn,start组提供的函数要少一些:start/3, start/4, start/5, start_link/3, start_link/4, start_link/5
先看具有代表性的start/4的实现:
start(M, F, A, Timeout) when is_atom(M), is_atom(F), is_list(A) -> PidRef = spawn_mon(M, F, A), sync_wait_mon(PidRef, Timeout). ... spawn_mon(M,F,A) -> Parent = get_my_name(), Ancestors = get_ancestors(), erlang:spawn_monitor(?MODULE, init_p, [Parent,Ancestors,M,F,A]). ... sync_wait_mon({Pid, Ref}, Timeout) -> receive {ack, Pid, Return} -> erlang:demonitor(Ref, [flush]), Return; {'DOWN', Ref, _Type, Pid, Reason} -> {error, Reason}; {'EXIT', Pid, Reason} -> %% link as spawn_opt? erlang:demonitor(Ref, [flush]), {error, Reason} after Timeout -> erlang:demonitor(Ref, [flush]), exit(Pid, kill), flush(Pid), {error, timeout} end.
可以看到,start/4的工作分为两个过程,首先是基于init_p创建新进程,并于当前进程创建monitor的关系;接下来会同步等待新进程反馈的信息,分ack、'DOWN'、'EXIT'、超时四种情况,并做了不同的处理。需要注意的是,erlang:demonitor函数可以取消进程的监控关系,如果超时,会强制杀掉目标进程,超时有一个细节就是flush函数:
flush(Pid) -> receive {'EXIT', Pid, _} -> true after 0 -> true end.
这个函数有什么用途?因为当发生超时后,在我们显式调用demonitor函数结束之前,函数可能已经向监控进程发出了exit消息,这条消息就会积攒在当前进程的邮箱里得不到消费,flush可以清空邮箱中的这些消息,指定了after 0的receive语句会率先将邮箱里所有的消息进行匹配后立即返回而不会阻塞。让我们再复习一下receive ... after的执行规则:
- 如果包含after,进入receive语句时会先启动一个定时器。
- 取出邮箱里面的第一个消息,并尝试同Pattern1、Pattern2等模式匹配,如果匹配成功,系统会从邮箱删除这个消息,并执行模式后面的表达式。
- 如果receive里的所有模式都不匹配邮箱的第一个消息,系统会从邮箱中移除这个消息并把它放入一个保存队列,然后继续尝试邮箱里的第二个消息,这一过程会不断重复,直到发现匹配消息或者邮箱里的所有消息都检查过了为止。
- 如果邮箱里的所有消息都不匹配,进程就会被挂起并重新调度,直到新的消息进入邮箱才会继续执行。新消息到达后,保存队列里的所有消息不会重新匹配,只有新消息才会进行匹配。
- 一旦某个消息匹配成功,保存队列里的所有消息就会按照到达进程的顺序重新进入邮箱,如果设置了定时器,就会清除它。
- 如果定时器在我们等待消息时到期了,系统就会执行after后的表达式,并把所有保存的消息按照它们的到达进程的顺序重新放回邮箱。
其它start的实现也是这两个步骤,只不过参数重载有差异。再来看start_link的实现,这里选取的是start_link/4:
start_link(M, F, A, Timeout) when is_atom(M), is_atom(F), is_list(A) -> Pid = ?MODULE:spawn_link(M, F, A), sync_wait(Pid, Timeout). ... sync_wait(Pid, Timeout) -> receive {ack, Pid, Return} -> Return; {'EXIT', Pid, Reason} -> {error, Reason} after Timeout -> unlink(Pid), exit(Pid, kill), flush(Pid), {error, timeout} end.
也是分为两个步骤,重点是sync_link的实现,相对比于monitor,没有了'DOWN'的情况。