标签:rdm 线程创建 如何 void指针 _for 关系图 并且 device 原创
转自 https://blog.csdn.net/weixin_37097605/article/details/101488760
1 Int spdk_reactors_init(void) 2 { 3 // 初始化所有的event mempool 4 g_spdk_event_mempool = spdk_mempool_create(…); 5 // 为g_reactors分配内存,g_reactors是一个数组,管理了所有的reactors 6 posix_memalign((void **)&g_reactors, 64, (last_core + 1) * sizeof(struct spdk_reactor)); 7 // 这里设置了reactor创建线程的方法,之后需要初始化线程的时候将会调用该方法 8 spdk_thread_lib_init(spdk_reactor_schedule_thread, sizeof(struct spdk_lw_thread)); 9 // 对于每一个启动的reactor,将会初始化它们 10 // 初始化reactor过程,即为绑定lcore,初始化spdk ring、threads,对rusage无操作 11 SPDK_ENV_FOREACH_CORE(i) { 12 reactor = spdk_reactor_get(i); 13 spdk_reactor_construct(reactor, i); 14 } 15 // 设置好状态返回 16 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 17 return 0; 18 }
1 Void spdk_reactors_start(void) { 2 SPDK_ENV_FOREACH_CORE(i) { 3 if (i != current_core) { // 在非master reactor中 4 reactor = spdk_reactor_get(i); // 得到相应的reactor 5 // 设置好线程创建后的一个消息,该消息为轮询函数 6 rc = spdk_env_thread_launch_pinned(reactor->lcore, _spdk_reactor_run, reactor); 7 // reactor创建好线程并且会自动执行第一个消息 8 spdk_thread_create(thread_name, tmp_cpumask); 9 } 10 } 11 // 当前CPU core得到reactor,并且开始轮询 12 reactor = spdk_reactor_get(current_core); 13 _spdk_reactor_run(reactor); 14 }
1 spdk_reactor_schedule_thread(struct spdk_thread *thread) 2 { 3 // 得到该线程设置的cpu mask 4 cpumask = spdk_thread_get_cpumask(thread); 5 for (i = 0; i < spdk_env_get_core_count(); i++) { 6 …. // 遍历cpu core 7 // 通过cpu mask找到对应的核心,并产生event 8 if (spdk_cpuset_get_cpu(cpumask, core)) { 9 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 10 break; 11 } 12 } 13 // 传递该event,即对应的reatcor会调用_schedule_thread方法, 14 spdk_event_call(evt); 15 } 16 _schedule_thread(void *arg1, void *arg2) 17 { 18 struct spdk_lw_thread *lw_thread = arg1; 19 struct spdk_reactor *reactor; 20 // 消息传递到对应的reactor后将该thread加入到reactor中 21 reactor = spdk_reactor_get(spdk_env_get_current_core()); 22 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 23 } 24 在SPDK_REACTOR_STATE_RUNNING后,此时所有reactor就进入了轮询状态。_spdk_reactor_run函数为线程提供了轮询方法: 25 static int _spdk_reactor_run(void *arg) { 26 while (1) { 27 // 处理reactor上的event消息,消息会在之后讲到 28 _spdk_event_queue_run_batch(reactor); 29 // 每一个reactor上注册的thread进行遍历并且处理poller事件 30 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 31 rc = spdk_thread_poll(thread, 0, now); 32 } 33 // 检查reactor的状态 34 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 35 break; 36 } 37 } 38 }
1 static int _spdk_reactor_run(void *arg) { 2 ….. // 轮询 3 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 4 thread = spdk_thread_get_from_ctx(lw_thread); 5 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 6 spdk_set_thread(thread); 7 spdk_thread_exit(thread); 8 spdk_thread_destroy(thread); 9 } 10 }
1 Int spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) { 2 // 首先先处理ring传递过来的消息 3 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 4 // 调用非定时poller中的方法 5 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 6 active_pollers_head, tailq, tmp) { 7 // 调用poller注册的方法之前,会对poller状态检测且转换 8 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 9 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 10 free(poller); 11 continue; 12 } 13 poller->state = SPDK_POLLER_STATE_RUNNING; 14 // 调用poller注册的方法 15 poller_rc = poller->fn(poller->arg); 16 // poller转换状态 17 poller->state = SPDK_POLLER_STATE_WAITING; 18 } 19 // 调用定时poller中的方法 20 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { 21 // 类似非定时poller过程,不过会检查是否到了预定的时间 22 if (now < poller->next_run_tick) break; 23 } 24 // 最后统计时间 25 }
1 struct io_device { 2 void *io_device; // 抽象的device指针 3 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; // 名字 4 spdk_io_channel_create_cb create_cb; // io_channel创建的回调函数 5 spdk_io_channel_destroy_cb destroy_cb; // io_channel销毁的回调函数 6 spdk_io_device_unregister_cb unregister_cb; // io_device解绑的回调函数 7 struct spdk_thread *unregister_thread; // 不使用该device线程 8 uint32_t ctx_size; // ctx的大小,将会传给io_channel处理 9 uint32_t for_each_count; // io_channel的数量 10 TAILQ_ENTRY(io_device) tailq; // device队列头 11 uint32_t refcnt; // 计数器 12 bool unregistered; // 是否该device被注册 13 };
1 Io_channel的结构 2 struct spdk_io_channel { 3 struct spdk_thread *thread; // 绑定的线程 4 struct io_device *dev; // 绑定的io_device 5 uint32_t ref; // io_channel引用计数 6 uint32_t destroy_ref; // destroy前被引用的次数 7 TAILQ_ENTRY(spdk_io_channel) tailq; // io_channel 队列头 8 spdk_io_channel_destroy_cb destroy_cb; // io_channel销毁的回调函数 9 };
1 enum nvmf_tgt_state { 2 NVMF_TGT_INIT_NONE = 0, // 最初的状态 3 NVMF_TGT_INIT_PARSE_CONFIG, // 解析配置文件 4 NVMF_TGT_INIT_CREATE_POLL_GROUPS, // 创建poll groups 5 NVMF_TGT_INIT_START_SUBSYSTEMS, // 启动subsystem 6 NVMF_TGT_INIT_START_ACCEPTOR, // 开始接收 7 NVMF_TGT_RUNNING, // running 8 NVMF_TGT_FINI_STOP_SUBSYSTEMS, 9 NVMF_TGT_FINI_DESTROY_POLL_GROUPS, 10 NVMF_TGT_FINI_STOP_ACCEPTOR, 11 NVMF_TGT_FINI_FREE_RESOURCES, 12 NVMF_TGT_STOPPED, 13 NVMF_TGT_ERROR, 14 };
1 spdk_io_device_register(tgt, 2 spdk_nvmf_tgt_create_poll_group, 3 spdk_nvmf_tgt_destroy_poll_group, 4 sizeof(struct spdk_nvmf_poll_group), 5 "nvmf_tgt");
1 spdk_for_each_thread(nvmf_tgt_create_poll_group, 2 NULL, 3 nvmf_tgt_create_poll_group_done); 4 static void nvmf_tgt_create_poll_group(void *ctx) 5 { 6 struct nvmf_tgt_poll_group *pg; 7 …. 8 pg->thread = spdk_get_thread(); 9 pg->group = spdk_nvmf_poll_group_create(g_spdk_nvmf_tgt); 10 …. 11 }
1 struct spdk_nvmf_poll_group * spdk_nvmf_poll_group_create(struct spdk_nvmf_tgt *tgt) 2 { 3 struct spdk_io_channel *ch; 4 ch = spdk_get_io_channel(tgt); 5 …. 6 return spdk_io_channel_get_ctx(ch); 7 }
1 static int spdk_nvmf_tgt_create_poll_group(void *io_device, void *ctx_buf) 2 { 3 ….. // 初始化transport 、nvmf subsystem等 4 // 注册一个poller 5 group->poller = spdk_poller_register(spdk_nvmf_poll_group_poll, group, 0); 6 group->thread = spdk_get_thread(); 7 return 0; 8 }
1 g_acceptor_poller = spdk_poller_register(acceptor_poll, g_spdk_nvmf_tgt, 2 g_spdk_nvmf_tgt_conf->acceptor_poll_rate);
标签:rdm 线程创建 如何 void指针 _for 关系图 并且 device 原创
原文地址:https://www.cnblogs.com/yi-mu-xi/p/12821103.html