Erlang源码阅读笔记之proc_lib 上篇


声明:本文转载自https://my.oschina.net/u/3745448/blog/1588552,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

概述

在erlang otp源码中,随处可见proc_lib的身影,可以发现,在otp中spawn一个进程,都不是通过erlang:spawn函数,而是通过proc_lib:spawn。那通过这俩库spawn出的进程有啥区别呢?我们要理解erlang otp其它组件的源码,必须要先去了解proc_lib做了什么事情。

官方文档对proc_lib的解释是这样的:

> Functions for asynchronous and synchronous start of processes adhering to the OTP design principles.

就是说proc_lib提供符合OTP设计原则的同步或异步进程启动函数。关于OTP设计原则,后续会单开一篇进行讲述,它大致描述了一系列的代码组织标准,包括进程、模块以及项目目录组织结构等等。通过proc_lib启动的进程,会容易符合这种设计原则的要求。

proc_lib开放的API很多,但基本可以分成spawn、start、hibernate、init_ack、init_p、format、initial_call、stop这几组,下面我们一组一组的来看。

spawn 组

spawn组的函数有spawn/1, spawn_link/1, spawn/2, spawn_link/2, spawn/3, spawn_link/3, spawn/4, spawn_link/4, spawn_opt/2, spawn_opt/3, spawn_opt/4, spawn_opt/5

先看最为简单的spawn/1:

-spec spawn(Fun) -> pid() when       Fun :: function().  spawn(F) when is_function(F) ->     Parent = get_my_name(),     Ancestors = get_ancestors(),     erlang:spawn(?MODULE, init_p, [Parent,Ancestors,F]). 

逻辑如下:

  1. 通过get_my_name函数获取当前进程的注册名。
  2. 通过get_ancestors()函数获取进程祖先列表
  3. 通过原生的erlang:spawn/3创建一个新的进程,以init_p函数作为新的进程逻辑,当前进程名、祖先进程列表和要执行的目标函数作为init_p的执行参数。

get_my_name实现的细节:

get_my_name() ->     case proc_info(self(),registered_name) of 	{registered_name,Name} -> Name; 	_                      -> self()     end.  ...  proc_info(Pid,Item) when node(Pid) =:= node() ->     process_info(Pid,Item); proc_info(Pid,Item) ->     case lists:member(node(Pid),nodes()) of 	true -> 	    check(rpc:call(node(Pid), erlang, process_info, [Pid, Item])); 	_ -> 	    hidden     end. 

get_my_name()根据进程是本地还是远程,从process_info返回进程注册名称。 process_info是个很有用的函数,process_info/1可以返回指定进程的全部信息:

1> Pid = spawn(fun() -> receive hehe -> hehe end end). <0.35.0> 2> process_info(Pid). [{current_function,{prim_eval,'receive',2}},  {initial_call,{erlang,apply,2}},  {status,waiting},  {message_queue_len,0},  {messages,[]},  {links,[]},  {dictionary,[]},  {trap_exit,false},  {error_handler,error_handler},  {priority,normal},  {group_leader,<0.26.0>},  {total_heap_size,233},  {heap_size,233},  {stack_size,9},  {reductions,17},  {garbage_collection,[{min_bin_vheap_size,46422},                       {min_heap_size,233},                       {fullsweep_after,65535},                       {minor_gcs,0}]},  {suspending,[]}] 3> Pid ! hehe. hehe 4> process_info(Pid). undefined 

而通过process_info/2可以获得某个段的信息。

get_ancestors的实现细节:

get_ancestors() ->     case get('$ancestors') of 	A when is_list(A) -> A; 	_                 -> []     end. 

本地版的很简单,直接读进程字典并判断类型是否正确即可。另外还有一个远程版本:

get_ancestors(Pid) ->     case get_dictionary(Pid,'$ancestors') of 	{'$ancestors',Ancestors} -> 	    {ancestors,Ancestors}; 	_ -> 	    {ancestors,[]}     end.  ...  get_dictionary(Pid,Tag) ->     case get_process_info(Pid,dictionary) of 	{dictionary,Dict} -> 	    case lists:keysearch(Tag,1,Dict) of 		{value,Value} -> Value; 		_             -> undefined 	    end; 	_ -> 	    undefined     end.  ...  get_process_info(Pid, Tag) ->  translate_process_info(Tag, catch proc_info(Pid, Tag)).  translate_process_info(registered_name, []) ->   {registered_name, []}; translate_process_info(_ , {'EXIT', _}) ->   undefined; translate_process_info(_, Result) ->   Result. 

这一长串其实就做了一件事情,从进程字典中读出'$ancestors'这个属性,但因为涉及到远程进程的访问,无法直接使用get,所以需要通过proc_info这个函数(process_info是可以返回进程字典内容的),另外translate_process_info对proc_info返回的结果作了包装,包括异常发生的情况。

最后,我们重点来看init_p这个函数,init_p里面所包含的逻辑才是proc_lib真正的对外出售内容 —— 符合OTP设计原则的进程。init_p的实现:

init_p(Parent, Ancestors, Fun) when is_function(Fun) ->     put('$ancestors', [Parent|Ancestors]),     Mfa = erlang:fun_info_mfa(Fun),     put('$initial_call', Mfa),     try 	Fun()     catch 	Class:Reason -> 	    exit_p(Class, Reason, erlang:get_stacktrace())     end. 

逻辑如下:

  1. 将Parent进程合并入Ancestors列表并加入到进程字典中。
  2. 获取目标函数的MFA信息(MFA即Module、Function、Args,我们在erlang中会到处看到这个缩写)
  3. 将MFA信息也保存到进程字典中。
  4. 在try catch中运行目标函数。

我们看到init_p为进程增加了更多元信息以及提供了一个错误处理框架,目标函数所发生的异常都会由exit_p来处理。exit_p的实现:

exit_p(Class, Reason, Stacktrace) ->     case get('$initial_call') of 	{M,F,A} when is_atom(M), is_atom(F), is_integer(A) -> 	    MFA = {M,F,make_dummy_args(A, [])}, 	    crash_report(Class, Reason, MFA, Stacktrace), 	    erlang:raise(exit, exit_reason(Class, Reason, Stacktrace), Stacktrace); 	_ -> 	    %% The process dictionary has been cleared or 	    %% possibly modified. 	    crash_report(Class, Reason, [], Stacktrace), 	    erlang:raise(exit, exit_reason(Class, Reason, Stacktrace), Stacktrace)     end.  exit_reason(error, Reason, Stacktrace) ->     {Reason, Stacktrace}; exit_reason(exit, Reason, _Stacktrace) ->     Reason; exit_reason(throw, Reason, Stacktrace) ->     {{nocatch, Reason}, Stacktrace}. 

exit_p做了两件事情,一是调用crash_report生成错误报告,二是通过exit_reason函数重新对异常原因进行标准化包装,然后再次抛出。

crash_report(exit, normal, _, _)       -> ok; crash_report(exit, shutdown, _, _)     -> ok; crash_report(exit, {shutdown,_}, _, _) -> ok; crash_report(Class, Reason, StartF, Stacktrace) ->     OwnReport = my_info(Class, Reason, StartF, Stacktrace),     LinkReport = linked_info(self()),     Rep = [OwnReport,LinkReport],     error_logger:error_report(crash_report, Rep). 

以上分析就是proc_lib:spawn/1所做的主要工作了,spawn/2也是大同小异,只不过增加了Node参数:

spawn(Node, F) when is_function(F) ->     Parent = get_my_name(),     Ancestors = get_ancestors(),     erlang:spawn(Node, ?MODULE, init_p, [Parent,Ancestors,F]). 

spawn/3和spawn/4所调用的init_p有些差别,目标函数是通过apply调用的:

spawn(M,F,A) when is_atom(M), is_atom(F), is_list(A) ->     Parent = get_my_name(),     Ancestors = get_ancestors(),     erlang:spawn(?MODULE, init_p, [Parent,Ancestors,M,F,A]).  spawn(Node, M, F, A) when is_atom(M), is_atom(F), is_list(A) ->     Parent = get_my_name(),     Ancestors = get_ancestors(),     erlang:spawn(Node, ?MODULE, init_p, [Parent,Ancestors,M,F,A]).  ...  init_p(Parent, Ancestors, M, F, A) when is_atom(M), is_atom(F), is_list(A) ->     put('$ancestors', [Parent|Ancestors]),     put('$initial_call', trans_init(M, F, A)),     init_p_do_apply(M, F, A).  init_p_do_apply(M, F, A) ->     try 	apply(M, F, A)      catch 	Class:Reason -> 	    exit_p(Class, Reason, erlang:get_stacktrace())     end. 

另外spawn_link做的事情也一样,只不过是通过erlang:spawn_link函数来创建进程的,在当前进程和新创建的进程之间建立了一个link关系:

spawn_link(F) when is_function(F) ->     Parent = get_my_name(),     Ancestors = get_ancestors(),     erlang:spawn_link(?MODULE, init_p, [Parent,Ancestors,F]). 

关于spawn_opt,逻辑也一样,重点在于可以传递一些创建进程的控制参数,这里并不准备去研究这些参数,后续会专门拿出一篇来讲述。

start组

前面说过,proc_lib包含的是同步和异步的进程启动API,spawn组的函数无疑都是异步的,而start组提供的都是同步的。相对于spawn,start组提供的函数要少一些:start/3, start/4, start/5, start_link/3, start_link/4, start_link/5

先看具有代表性的start/4的实现:

start(M, F, A, Timeout) when is_atom(M), is_atom(F), is_list(A) ->     PidRef = spawn_mon(M, F, A),     sync_wait_mon(PidRef, Timeout).  ...  spawn_mon(M,F,A) ->     Parent = get_my_name(),     Ancestors = get_ancestors(),     erlang:spawn_monitor(?MODULE, init_p, [Parent,Ancestors,M,F,A]).  ...  sync_wait_mon({Pid, Ref}, Timeout) ->     receive 	{ack, Pid, Return} -> 	    erlang:demonitor(Ref, [flush]), 	    Return; 	{'DOWN', Ref, _Type, Pid, Reason} -> 	    {error, Reason}; 	{'EXIT', Pid, Reason} -> %% link as spawn_opt? 	    erlang:demonitor(Ref, [flush]), 	    {error, Reason}     after Timeout -> 	    erlang:demonitor(Ref, [flush]), 	    exit(Pid, kill), 	    flush(Pid), 	    {error, timeout}     end. 

可以看到,start/4的工作分为两个过程,首先是基于init_p创建新进程,并于当前进程创建monitor的关系;接下来会同步等待新进程反馈的信息,分ack、'DOWN'、'EXIT'、超时四种情况,并做了不同的处理。需要注意的是,erlang:demonitor函数可以取消进程的监控关系,如果超时,会强制杀掉目标进程,超时有一个细节就是flush函数:

flush(Pid) ->     receive 	{'EXIT', Pid, _} -> 	    true     after 0 -> 	    true     end. 

这个函数有什么用途?因为当发生超时后,在我们显式调用demonitor函数结束之前,函数可能已经向监控进程发出了exit消息,这条消息就会积攒在当前进程的邮箱里得不到消费,flush可以清空邮箱中的这些消息,指定了after 0的receive语句会率先将邮箱里所有的消息进行匹配后立即返回而不会阻塞。让我们再复习一下receive ... after的执行规则:

  1. 如果包含after,进入receive语句时会先启动一个定时器。
  2. 取出邮箱里面的第一个消息,并尝试同Pattern1、Pattern2等模式匹配,如果匹配成功,系统会从邮箱删除这个消息,并执行模式后面的表达式。
  3. 如果receive里的所有模式都不匹配邮箱的第一个消息,系统会从邮箱中移除这个消息并把它放入一个保存队列,然后继续尝试邮箱里的第二个消息,这一过程会不断重复,直到发现匹配消息或者邮箱里的所有消息都检查过了为止。
  4. 如果邮箱里的所有消息都不匹配,进程就会被挂起并重新调度,直到新的消息进入邮箱才会继续执行。新消息到达后,保存队列里的所有消息不会重新匹配,只有新消息才会进行匹配。
  5. 一旦某个消息匹配成功,保存队列里的所有消息就会按照到达进程的顺序重新进入邮箱,如果设置了定时器,就会清除它。
  6. 如果定时器在我们等待消息时到期了,系统就会执行after后的表达式,并把所有保存的消息按照它们的到达进程的顺序重新放回邮箱。

其它start的实现也是这两个步骤,只不过参数重载有差异。再来看start_link的实现,这里选取的是start_link/4:

start_link(M, F, A, Timeout) when is_atom(M), is_atom(F), is_list(A) ->     Pid = ?MODULE:spawn_link(M, F, A),     sync_wait(Pid, Timeout).  ...  sync_wait(Pid, Timeout) ->     receive 	{ack, Pid, Return} -> 	    Return; 	{'EXIT', Pid, Reason} -> 	    {error, Reason}     after Timeout -> 	    unlink(Pid), 	    exit(Pid, kill), 	    flush(Pid), 	    {error, timeout}     end. 

也是分为两个步骤,重点是sync_link的实现,相对比于monitor,没有了'DOWN'的情况。

本文发表于2017年12月12日 18:33
(c)注:本文转载自https://my.oschina.net/u/3745448/blog/1588552,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2283 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1