标签:when ica contexts ogr 成功 dash ogre ++i mes
本文主要介绍on_applied、on_commit、on_applied_sync、on_all_commit、on_all_applied在数据IO处理流程中的回调代码梳理。写的比较简单,不过关键点都已经整理。以filestore为例:OSD端详细程分析:https://blog.51cto.com/wendashuai/2497104
主端:
PrimaryLogPG::execute_ctx()->issue_repop(repop, ctx)->pgbackend->submit_transaction()->issue_op(); parent->queue_transactions()
1.
void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx)
{
Context *onapplied_sync = new C_OSD_OndiskWriteUnlock()
Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
pgbackend->submit_transaction(//-> ReplicatedBackend::submit_transaction(...,on_local_applied_sync,on_all_acked,on_all_commit,...)
soid,
ctx->delta_stats,
ctx->at_version,
std::move(ctx->op_t),
pg_trim_to,
min_last_complete_ondisk,
ctx->log,
ctx->updated_hset_history,
onapplied_sync,
on_all_applied,
on_all_commit,
repop->rep_tid,
ctx->reqid,
ctx->op);
}
2.
void ReplicatedBackend::submit_transaction(
const hobject_t &soid,
const object_stat_sum_t &delta_stats,
const eversion_t &at_version,
PGTransactionUPtr &&_t,
const eversion_t &trim_to,
const eversion_t &roll_forward_to,
const vector<pg_log_entry_t> &_log_entries,
boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync,
Context *on_all_acked,
Context *on_all_commit,
ceph_tid_t tid,
osd_reqid_t reqid,
OpRequestRef orig_op)
{
InProgressOp &op = in_progress_ops.insert(
make_pair(
tid,
InProgressOp(
tid, on_all_commit, on_all_acked,
orig_op, at_version)
)
).first->second;
op.waiting_for_applied.insert(
parent->get_actingbackfill_shards().begin(),
parent->get_actingbackfill_shards().end());
op.waiting_for_commit.insert(
parent->get_actingbackfill_shards().begin(),
parent->get_actingbackfill_shards().end());
//issue_op将ops的信息封装成message发送给replica osd副本的。这个操作就是在封装message,这里就不再多说了
issue_op(
soid,
at_version,
tid,
reqid,
trim_to,
at_version,
added.size() ? *(added.begin()) : hobject_t(),
removed.size() ? *(removed.begin()) : hobject_t(),
log_entries,
hset_history,
&op,
op_t);
op_t.register_on_applied_sync(on_local_applied_sync); --->on_applied_sync
op_t.register_on_applied( --->on_applied
parent->bless_context(
new C_OSD_OnOpApplied(this, &op)));
op_t.register_on_commit( --->on_commit
parent->bless_context(
new C_OSD_OnOpCommit(this, &op)));
parent->queue_transactions(tls, op.op);//int FileStore::queue_transactions()
}
3.
void ReplicatedBackend::issue_op(
const hobject_t &soid,
const eversion_t &at_version,
ceph_tid_t tid,
osd_reqid_t reqid,
eversion_t pg_trim_to,
eversion_t pg_roll_forward_to,
hobject_t new_temp_oid,
hobject_t discard_temp_oid,
const vector<pg_log_entry_t> &log_entries,
boost::optional<pg_hit_set_history_t> &hset_hist,
InProgressOp *op,
ObjectStore::Transaction &op_t)
{
get_parent()->send_message_osd_cluster(peer.osd, wr, get_osdmap()->get_epoch());//go
}
4.
副本端:
ReplicatedBackend::handle_message()--->sub_op_modify(op)--->queue_transactions()
// sub op modify 当pg的从副本接收到MSG_OSD_REPOP,调用该函数,完成本地对象的数据写入
void ReplicatedBackend::sub_op_modify(OpRequestRef op)
{
rm->opt.register_on_commit(
parent->bless_context(
new C_OSD_RepModifyCommit(this, rm)));
rm->localt.register_on_applied(
parent->bless_context(
new C_OSD_RepModifyApply(this, rm)));
parent->queue_transactions(tls, op);// ->int FileStore::queue_transactions()
}
5.
主回调:
class C_OSD_OnOpCommit : public Context {
ReplicatedBackend *pg;
ReplicatedBackend::InProgressOp *op;
public:
C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
: pg(pg), op(op) {}
void finish(int) override {
pg->op_commit(op);
}
};
class C_OSD_OnOpApplied : public Context {
ReplicatedBackend *pg;
ReplicatedBackend::InProgressOp *op;
public:
C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
: pg(pg), op(op) {}
void finish(int) override {
pg->op_applied(op);
}
};
6.
副本回调:
struct ReplicatedBackend::C_OSD_RepModifyApply : public Context {
ReplicatedBackend *pg;
RepModifyRef rm;
C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r)
: pg(pg), rm(r) {}
void finish(int r) override {
pg->repop_applied(rm);
}
};
struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
ReplicatedBackend *pg;
RepModifyRef rm;
C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
: pg(pg), rm(r) {}
void finish(int r) override {
pg->repop_commit(rm);
}
};
7.filestore端
int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
TrackedOpRef osd_op,
ThreadPool::TPHandle *handle)
{
Context *onreadable;
Context *ondisk;
Context *onreadable_sync;
ObjectStore::Transaction::collect_contexts(
tls, &onreadable, &ondisk, &onreadable_sync);
}
static void collect_contexts(
vector<Transaction>& t,
Context **out_on_applied,
Context **out_on_commit,
Context **out_on_applied_sync)
{
list<Context *> on_applied, on_commit, on_applied_sync;
for (vector<Transaction>::iterator i = t.begin();i != t.end();++i)
{
on_applied.splice(on_applied.end(), (*i).on_applied);
on_commit.splice(on_commit.end(), (*i).on_commit);
on_applied_sync.splice(on_applied_sync.end(), (*i).on_applied_sync);
}
*out_on_applied = C_Contexts::list_to_context(on_applied);
*out_on_commit = C_Contexts::list_to_context(on_commit);
*out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync);
}
8.写journal和data回调关系
_op_journal_transactions(tbl, orig_len, o->op,new C_JournaledAhead(this, osr, o, ondisk),osd_op); ->onjournal ->oncommit
写完journal,回调ondisk,Finisher线程ondisk_finishers ---> *Finisher::finisher_thread_entry() --->complete() ---> finish()
if (ondisk) {
dout(10) << " queueing ondisk " << ondisk << dendl;
ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
}
写完data,回调onreadable,Finisher线程apply_finishers ---> *Finisher::finisher_thread_entry() --->complete() ---> finish()
if (o->onreadable) {//写完filestore后,数据开始可读。(此时可能写到page cache了)
apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
}
onreadable ---> out_on_applied ---on_applied
ondisk ---> out_on_commit --->on_commit
onreadable_sync --->out_on_applied_sync --->on_applied_sync
主完成了journal的写入: C_OSD_OnOpCommit pg->op_commit(op); ondisk ondisk_finishers; 此时可继续写?
主完成data写入: C_OSD_OnOpApplied pg->op_applied(op) onreadable apply_finishers; 此时写入的数据可读?
副本完成了journal的写入: C_OSD_RepModifyCommit pg->repop_commit(rm) send_message_osd_cluster发送到主
副本完成data写入: C_OSD_RepModifyApply pg->repop_applied(rm) send_message_osd_cluster发送到主
完成所有副本journal写入:all_committed on_all_commit C_OSD_RepopCommit repop_all_committed waiting_for_ondisk //called when all commit
完成所有副本data写入: all_applied on_all_applied C_OSD_RepopApplied repop_all_applied waiting_for_ack //called when all acked
Context *on_all_commit = new C_OSD_RepopCommit(this, repop);//on_all_commit
Context *on_all_applied = new C_OSD_RepopApplied(this, repop);//on_all_acked
Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(xxx); //on_local_applied_sync 本地端sync完成
所有端完成了journal的写入后,此时数据已经写到journal盘,会处理waiting_for_ondisk list,ondisk状态。环境一旦崩溃,可以journal replay方式回放恢复;
所有端完成了data的写入后,即写入到了filestore层,此时代表apply成功,会处理waiting_for_ack list,向client端发送ack通知写完成,此时数据处于可读状态onreadable;
标签:when ica contexts ogr 成功 dash ogre ++i mes
原文地址:https://blog.51cto.com/wendashuai/2511231