标签:
// bif.c 实现 erlang:monitor/2 BIF_RETTYPE monitor_2(BIF_ALIST_2) { Eterm target = BIF_ARG_2; BIF_RETTYPE ret; DistEntry *dep = NULL; int deref_de = 0; /* 目前只支持 erlang:monitor(process, Target) */ if (BIF_ARG_1 != am_process) { goto error; } if (is_internal_pid(target)) { // 如果是本节点进程 local_pid: ret = local_pid_monitor(BIF_P, target); // 处理本节点进程 } else if (is_external_pid(target)) { // 如果是其他节点进程 dep = external_pid_dist_entry(target); if (dep == erts_this_dist_entry) // 如果进程归属于本节点,跳到本节点进程处理 goto local_pid; ret = remote_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, dep, target, 0); // 处理其他节点进程 } else if (is_atom(target)) { // Target是atom处理 ret = local_name_monitor(BIF_P, target); } else if (is_tuple(target)) { // Target是tuple处理 Eterm *tp = tuple_val(target); Eterm remote_node; Eterm name; if (arityval(*tp) != 2) goto error; remote_node = tp[2]; name = tp[1]; if (!is_atom(remote_node) || !is_atom(name)) { goto error; } if (!erts_is_alive && remote_node != am_Noname) { goto error; /* Remote monitor from (this) undistributed node */ } dep = erts_sysname_to_connected_dist_entry(remote_node); if (dep == erts_this_dist_entry) { deref_de = 1; ret = local_name_monitor(BIF_P, name); } else { if (dep) deref_de = 1; ret = remote_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, dep, name, 1); } } else { error: ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG); } if (deref_de) { deref_de = 0; erts_deref_dist_entry(dep); } return ret; }现在,看下本节点进程的监控处理:
// bif.c 实现本地节点进程监控处理 static BIF_RETTYPE local_pid_monitor(Process *p, Eterm target) { BIF_RETTYPE ret; Eterm mon_ref; Process *rp; ErtsProcLocks p_locks = ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK; mon_ref = erts_make_ref(p); ERTS_BIF_PREP_RET(ret, mon_ref); if (target == p->common.id) { // 如果进程监控自己 return ret; } erts_smp_proc_lock(p, ERTS_PROC_LOCK_LINK); // 锁住进程link操作,避免进程监控数据被脏写 rp = erts_pid2proc_opt(p, p_locks, target, ERTS_PROC_LOCK_LINK, // 同样是link锁 ERTS_P2P_FLG_ALLOW_OTHER_X); if (!rp) { erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK); p_locks &= ~ERTS_PROC_LOCK_LINK; erts_queue_monitor_message(p, &p_locks, mon_ref, am_process, target, am_noproc); } else { ASSERT(rp != p); // 当前进程添加监控数据 erts_add_monitor(&ERTS_P_MONITORS(p), MON_ORIGIN, mon_ref, target, NIL); // 目标进程添加被监控数据 erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, mon_ref, p->common.id, NIL); erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); } erts_smp_proc_unlock(p, p_locks & ~ERTS_PROC_LOCK_MAIN); return ret; }实际上,这里只是修改进程的监控数据,监控者和被监控者两份数据。
// erl_monitors.c 实现进程增加监控信息 void erts_add_monitor(ErtsMonitor **root, Uint type, Eterm ref, Eterm pid, Eterm name) { void *tstack[STACK_NEED]; int tpos = 0; int dstack[STACK_NEED+1]; int dpos = 1; int state = 0; ErtsMonitor **this = root; Sint c; dstack[0] = DIR_END; for (;;) { if (!*this) { /* Found our place */ state = 1; *this = create_monitor(type,ref,pid,name); break; } else if ((c = CMP_MON_REF(ref,(*this)->ref)) < 0) { /* go left */ dstack[dpos++] = DIR_LEFT; tstack[tpos++] = this; this = &((*this)->left); } else if (c > 0) { /* go right */ dstack[dpos++] = DIR_RIGHT; tstack[tpos++] = this; this = &((*this)->right); } else { /* Equal key is an error for monitors */ erl_exit(1,"Insertion of already present monitor!"); break; } } insertion_rotation(dstack, dpos, tstack, tpos, state); }再看下这个宏,取的就是进程结构的监控数据。就是说,每个进程都有一份监控数据,记录了监控和被监控信息,保存为AVL树结构。
#define ERTS_P_MONITORS(P) ((P)->common.u.alive.monitors)
// erl_monitor.c 触发所有monitor(遍历 monitor 数据,执行 doit 函数回调) void erts_sweep_monitors(ErtsMonitor *root, void (*doit)(ErtsMonitor *, void *), void *context) { ErtsMonitor *tstack[STACK_NEED]; int tpos = 0; int dstack[STACK_NEED+1]; int dpos = 1; int dir; dstack[0] = DIR_END; for (;;) { if (root == NULL) { if ((dir = dstack[dpos-1]) == DIR_END) { return; } if (dir == DIR_LEFT) { /* Still has DIR_RIGHT to do */ dstack[dpos-1] = DIR_RIGHT; root = (tstack[tpos-1])->right; } else { /* stacktop is an object to be deleted */ (*doit)(tstack[--tpos],context); // 执行回调 --dpos; root = NULL; } } else { dstack[dpos++] = DIR_LEFT; tstack[tpos++] = root; root = root->left; } } }
// erl_process.c 进程关闭处理(有删节) void erts_continue_exit_process(Process *p) { //... mon = ERTS_P_MONITORS(p); lnk = ERTS_P_LINKS(p); //... if (lnk) { // link的处理 DeclareTmpHeap(tmp_heap,4,p); Eterm exit_tuple; Uint exit_tuple_sz; Eterm* hp; UseTmpHeap(4,p); hp = &tmp_heap[0]; exit_tuple = TUPLE3(hp, am_EXIT, p->common.id, reason); exit_tuple_sz = size_object(exit_tuple); { ExitLinkContext context = {p, reason, exit_tuple, exit_tuple_sz}; erts_sweep_links(lnk, &doit_exit_link, &context); } UnUseTmpHeap(4,p); } { // monitor的处理 ExitMonitorContext context = {reason, p}; erts_sweep_monitors(mon,&doit_exit_monitor,&context); /* Allocates TmpHeap, but we have none here */ } //... }
// erl_process.c 进程关闭监控处理 static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) { ExitMonitorContext *pcontext = vpcontext; DistEntry *dep; ErtsMonitor *rmon; Process *rp; if (mon->type == MON_ORIGIN) { //如果该进程有监控其他进程,删除其他进程的被监控信息 /* We are monitoring someone else, we need to demonitor that one.. */ if (is_atom(mon->pid)) { /* remote by name */ ASSERT(is_node_name_atom(mon->pid)); dep = erts_sysname_to_connected_dist_entry(mon->pid); if (dep) { // 如果该进程监控远程节点的进程 erts_smp_de_links_lock(dep); // 先删除DistEntry的监控信息 rmon = erts_remove_monitor(&(dep->monitors), mon->ref); erts_smp_de_links_unlock(dep); if (rmon) { // 然后通知远程节点去掉被监控信息 ErtsDSigData dsd; int code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_demonitor(&dsd, rmon->pid, mon->name, mon->ref, 1); ASSERT(code == ERTS_DSIG_SEND_OK); } erts_destroy_monitor(rmon); } erts_deref_dist_entry(dep); } } else { ASSERT(is_pid(mon->pid)); if (is_internal_pid(mon->pid)) { // 如果是本节点进程 rp = erts_pid2proc(NULL, 0, mon->pid, ERTS_PROC_LOCK_LINK); if (!rp) { goto done; } // 删除被监控进程的监控信息 rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); if (rmon == NULL) { goto done; } erts_destroy_monitor(rmon); } else { /* remote by pid */ ASSERT(is_external_pid(mon->pid)); dep = external_pid_dist_entry(mon->pid); ASSERT(dep != NULL); if (dep) { erts_smp_de_links_lock(dep); // 先删除DistEntry的监控信息 rmon = erts_remove_monitor(&(dep->monitors), mon->ref); erts_smp_de_links_unlock(dep); if (rmon) {// 然后通知远程节点去掉被监控信息 ErtsDSigData dsd; int code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_demonitor(&dsd, rmon->pid, mon->pid, mon->ref, 1); ASSERT(code == ERTS_DSIG_SEND_OK); } erts_destroy_monitor(rmon); } } } } } else { //如果有进程监控该进程,则通知监控进程 ASSERT(mon->type == MON_TARGET); ASSERT(is_pid(mon->pid) || is_internal_port(mon->pid)); if (is_internal_port(mon->pid)) { // 如果监控进程是本节点端口 Port *prt = erts_id2port(mon->pid); if (prt == NULL) { goto done; } erts_fire_port_monitor(prt, mon->ref); erts_port_release(prt); } else if (is_internal_pid(mon->pid)) { // 如果监控进程是本节点进程 Eterm watched; DeclareTmpHeapNoproc(lhp,3); ErtsProcLocks rp_locks = (ERTS_PROC_LOCK_LINK | ERTS_PROC_LOCKS_MSG_SEND); rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks); if (rp == NULL) { goto done; } UseTmpHeapNoproc(3); // 先把监控进程的监控信息移除掉 rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); if (rmon) { erts_destroy_monitor(rmon); watched = (is_atom(mon->name) ? TUPLE2(lhp, mon->name, erts_this_dist_entry->sysname) : pcontext->p->common.id); // 然后把进程关闭信息以消息通知监控进程 {‘DOWN‘,Ref,process,Pid,Reason} erts_queue_monitor_message(rp, &rp_locks, mon->ref, am_process, watched, pcontext->reason); } UnUseTmpHeapNoproc(3); /* else: demonitor while we exited, i.e. do nothing... */ erts_smp_proc_unlock(rp, rp_locks); } else { // 如果监控进程是远程节点进程 ASSERT(is_external_pid(mon->pid)); dep = external_pid_dist_entry(mon->pid); ASSERT(dep != NULL); if (dep) { erts_smp_de_links_lock(dep); // 先删除DistEntry的监控信息 rmon = erts_remove_monitor(&(dep->monitors), mon->ref); erts_smp_de_links_unlock(dep); if (rmon) {// 然后通知远程节点该进程退出消息 ErtsDSigData dsd; int code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, mon->pid, (rmon->name != NIL ? rmon->name : rmon->pid), mon->ref, pcontext->reason); ASSERT(code == ERTS_DSIG_SEND_OK); } erts_destroy_monitor(rmon); } } } } done: /* As the monitors are previously removed from the process, distribution operations will not cause monitors to disappear, we can safely delete it. */ erts_destroy_monitor(mon); }
// bif.c 跨节点进程监控的处理 static BIF_RETTYPE remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, DistEntry *dep, Eterm target, int byname) { ErtsDSigData dsd; BIF_RETTYPE ret; int code; erts_smp_proc_lock(p, ERTS_PROC_LOCK_LINK); code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_RLOCK, 0); // 获取分布式端口的状态 switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: // 端口还没激活使用,使用Trap处理 /* Let the dmonitor_p trap handle it */ case ERTS_DSIG_PREP_NOT_CONNECTED: // 端口未连接,使用Trap处理 erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK); ERTS_BIF_PREP_TRAP2(ret, dmonitor_p_trap, p, bifarg1, bifarg2); // 使用Trap处理,在下次调度时调用erlang:dmonitor_p/2 break; case ERTS_DSIG_PREP_CONNECTED: // 端口已连接,可发送数据 if (!(dep->flags & DFLAG_DIST_MONITOR) || (byname && !(dep->flags & DFLAG_DIST_MONITOR_NAME))) { erts_smp_de_runlock(dep); erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK); ERTS_BIF_PREP_ERROR(ret, p, BADARG); } else { Eterm p_trgt, p_name, d_name, mon_ref; mon_ref = erts_make_ref(p); if (byname) { p_trgt = dep->sysname; p_name = target; d_name = target; } else { p_trgt = target; p_name = NIL; d_name = NIL; } erts_smp_de_links_lock(dep); // 当前进程添加监控数据 erts_add_monitor(&ERTS_P_MONITORS(p), MON_ORIGIN, mon_ref, p_trgt, p_name); // DistEntry添加被监控数据 erts_add_monitor(&(dep->monitors), MON_TARGET, mon_ref, p->common.id, d_name); erts_smp_de_links_unlock(dep); erts_smp_de_runlock(dep); erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK); // 发监控消息到远程节点 code = erts_dsig_send_monitor(&dsd, p->common.id, target, mon_ref); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_PREP_YIELD_RETURN(ret, p, mon_ref); else ERTS_BIF_PREP_RET(ret, mon_ref); } break; default: // 其他端口状态,如端口将被挂起 ASSERT(! "Invalid dsig prepare result"); ERTS_BIF_PREP_ERROR(ret, p, EXC_INTERNAL_ERROR); break; } return ret; }接着,看下发消息给远程节点的处理。
// dist.c 发监控消息到远程节点 int erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, Eterm ref) { Eterm ctl; DeclareTmpHeapNoproc(ctl_heap,5); int res; UseTmpHeapNoproc(5); ctl = TUPLE4(&ctl_heap[0], make_small(DOP_MONITOR_P), watcher, watched, ref); // 构造消息{DOP_MONITOR_P, LocalPid, RemotePidOrName, Ref} 发给远程节点 res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0); UnUseTmpHeapNoproc(5); return res; }看下远程接收到这个消息后的处理。
// dist.c 处理其他节点发来的消息(有删节) int erts_net_message(Port *prt, DistEntry *dep, byte *hbuf, ErlDrvSizeT hlen, byte *buf, ErlDrvSizeT len) { // ... switch (type = unsigned_val(tuple[1])) { // ... // 处理 {DOP_MONITOR_P, Remote pid, local pid or name, ref} case DOP_MONITOR_P: { /* A remote process wants to monitor us, we get: {DOP_MONITOR_P, Remote pid, local pid or name, ref} */ Eterm name; if (tuple_arity != 4) { goto invalid_message; } watcher = tuple[2]; watched = tuple[3]; /* local proc to monitor */ ref = tuple[4]; if (is_not_ref(ref)) { goto invalid_message; } if (is_atom(watched)) { name = watched; rp = erts_whereis_process(NULL, 0, watched, ERTS_PROC_LOCK_LINK, ERTS_P2P_FLG_ALLOW_OTHER_X); } else { name = NIL; rp = erts_pid2proc_opt(NULL, 0, watched, ERTS_PROC_LOCK_LINK, ERTS_P2P_FLG_ALLOW_OTHER_X); } if (!rp) { // 如果被监控进程不存在,则回复进程退出消息 ErtsDSigData dsd; int code; code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); } } else { if (is_atom(watched)) watched = rp->common.id; erts_smp_de_links_lock(dep); // DistEntry添加监控数据 erts_add_monitor(&(dep->monitors), MON_ORIGIN, ref, watched, name); // 进程添加被监控数据 erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, ref, watcher, name); erts_smp_de_links_unlock(dep); erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); } break; } //... }
/********************************************************************** * Process X Process Y * +-------------+ +-------------+ * Type: | MON_ORIGIN | | MON_TARGET | * +-------------+ +-------------+ * Pid: | Pid(Y) | | Pid(X) | * +-------------+ +-------------+ **********************************************************************/跨节点的处理:(节点A的进程X监控节点B的进程Y)
/********************************************************************** * Node A | Node B * ---------------------------------+---------------------------------- * Process X (@A) Distentry @A Distentry @B Process Y (@B) * for node B for node A * +-------------+ +-------------+ +-------------+ +-------------+ * Type: | MON_ORIGIN | | MON_TARGET | | MON_ORIGIN | | MON_TARGET | * +-------------+ +-------------+ +-------------+ +-------------+ * Pid: | Atom(node B)| | Pid(X) | | Pid(Y) | | Pid(X) | * +-------------+ +-------------+ +-------------+ +-------------+ **********************************************************************/对比就是多了一步DistEntry的处理,这是由跨节点网络的不稳定性决定的。远程进程出现异常,可能是进程挂了,也有可能是节点连接出问题。当远程节点出现异常,就要触发这个节点关联进程的处理。
版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:
原文地址:http://blog.csdn.net/mycwq/article/details/46961489