标签:
1 Status DB::Open(const Options& options, const std::string& dbname,
2 DB** dbptr) {
3 *dbptr = NULL;
4
5 DBImpl* impl = new DBImpl(options, dbname);
6 impl->mutex_.Lock();
7 VersionEdit edit;
8 Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
9 if (s.ok()) {
10 uint64_t new_log_number = impl->versions_->NewFileNumber();
11 WritableFile* lfile;
12 s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
13 &lfile);
14 if (s.ok()) {
15 edit.SetLogNumber(new_log_number);
16 impl->logfile_ = lfile;
17 impl->logfile_number_ = new_log_number;
18 impl->log_ = new log::Writer(lfile);
19 s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
20 }
21 if (s.ok()) {
22 impl->DeleteObsoleteFiles();
23 impl->MaybeScheduleCompaction();
24 }
25 }
26 impl->mutex_.Unlock();
27 if (s.ok()) {
28 *dbptr = impl;
29 } else {
30 delete impl;
31 }
32 return s;
33 }
1 Status DBImpl::Get(const ReadOptions& options,
2 const Slice& key,
3 std::string* value) {
4 Status s;
5 MutexLock l(&mutex_);
6 SequenceNumber snapshot;
7 if (options.snapshot != NULL) {
8 snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
9 } else {
10 snapshot = versions_->LastSequence();
11 }
12
13 MemTable* mem = mem_;
14 MemTable* imm = imm_;
15 Version* current = versions_->current();
16 mem->Ref();
17 if (imm != NULL) imm->Ref();
18 current->Ref();
19
20 bool have_stat_update = false;
21 Version::GetStats stats;
22
23 // Unlock while reading from files and memtables
24 {
25 mutex_.Unlock();
26 // First look in the memtable, then in the immutable memtable (if any).
27 LookupKey lkey(key, snapshot);
28 if (mem->Get(lkey, value, &s)) {
29 // Done
30 } else if (imm != NULL && imm->Get(lkey, value, &s)) {
31 // Done
32 } else {
33 s = current->Get(options, lkey, value, &stats);
34 have_stat_update = true;
35 }
36 mutex_.Lock();
37 }
38
39 if (have_stat_update && current->UpdateStats(stats)) {
40 MaybeScheduleCompaction();
41 }
42 mem->Unref();
43 if (imm != NULL) imm->Unref();
44 current->Unref();
45 return s;
46 }
1 void DBImpl::RecordReadSample(Slice key) {
2 MutexLock l(&mutex_);
3 if (versions_->current()->RecordReadSample(key)) {
4 MaybeScheduleCompaction();
5 }
6 }
1 Status DBImpl::MakeRoomForWrite(bool force) {
2 mutex_.AssertHeld();
3 assert(!writers_.empty());
4 bool allow_delay = !force;
5 Status s;
6 while (true) {
7 if (!bg_error_.ok()) {
8 // Yield previous error
9 s = bg_error_;
10 break;
11 } else if (
12 allow_delay &&
13 versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
14 // We are getting close to hitting a hard limit on the number of
15 // L0 files. Rather than delaying a single write by several
16 // seconds when we hit the hard limit, start delaying each
17 // individual write by 1ms to reduce latency variance. Also,
18 // this delay hands over some CPU to the compaction thread in
19 // case it is sharing the same core as the writer.
20 mutex_.Unlock();
21 env_->SleepForMicroseconds(1000);
22 allow_delay = false; // Do not delay a single write more than once
23 mutex_.Lock();
24 } else if (!force &&
25 (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
26 // There is room in current memtable
27 break;
28 } else if (imm_ != NULL) {
29 // We have filled up the current memtable, but the previous
30 // one is still being compacted, so we wait.
31 Log(options_.info_log, "Current memtable full; waiting...\n");
32 bg_cv_.Wait();
33 } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
34 // There are too many level-0 files.
35 Log(options_.info_log, "Too many L0 files; waiting...\n");
36 bg_cv_.Wait();
37 } else {
38 // Attempt to switch to a new memtable and trigger compaction of old
39 assert(versions_->PrevLogNumber() == 0);
40 uint64_t new_log_number = versions_->NewFileNumber();
41 WritableFile* lfile = NULL;
42 s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
43 if (!s.ok()) {
44 // Avoid chewing through file number space in a tight loop.
45 versions_->ReuseFileNumber(new_log_number);
46 break;
47 }
48 delete log_;
49 delete logfile_;
50 logfile_ = lfile;
51 logfile_number_ = new_log_number;
52 log_ = new log::Writer(lfile);
53 imm_ = mem_;
54 has_imm_.Release_Store(imm_);
55 mem_ = new MemTable(internal_comparator_);
56 mem_->Ref();
57 force = false; // Do not force another compaction if have room
58 MaybeScheduleCompaction();
59 }
60 }
61 return s;
62 }
1 void DBImpl::BackgroundCompaction() {
2 mutex_.AssertHeld();
3
4 if (imm_ != NULL) {
5 CompactMemTable();
6 return;
7 }
8
9 Compaction* c;
10 bool is_manual = (manual_compaction_ != NULL); // 正常情况下为false,因为初始化时为空
11 InternalKey manual_end;
12 if (is_manual) {
13 ManualCompaction* m = manual_compaction_;
14 c = versions_->CompactRange(m->level, m->begin, m->end);
15 m->done = (c == NULL);
16 if (c != NULL) {
17 manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
18 }
19 Log(options_.info_log,
20 "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
21 m->level,
22 (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
23 (m->end ? m->end->DebugString().c_str() : "(end)"),
24 (m->done ? "(end)" : manual_end.DebugString().c_str()));
25 } else {
26 c = versions_->PickCompaction(); // 找出应该合并的 level 及 level + 1层的FileMetaData*
27 }
28
29 Status status;
30 if (c == NULL) {
31 // Nothing to do
32 } else if (!is_manual && c->IsTrivialMove()) {
33 // Move file to next level
34 assert(c->num_input_files(0) == 1);
35 FileMetaData* f = c->input(0, 0);
36 c->edit()->DeleteFile(c->level(), f->number);
37 c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
38 f->smallest, f->largest);
39 status = versions_->LogAndApply(c->edit(), &mutex_);
40 if (!status.ok()) {
41 RecordBackgroundError(status);
42 }
43 VersionSet::LevelSummaryStorage tmp;
44 Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
45 static_cast<unsigned long long>(f->number),
46 c->level() + 1,
47 static_cast<unsigned long long>(f->file_size),
48 status.ToString().c_str(),
49 versions_->LevelSummary(&tmp));
50 } else {
51 CompactionState* compact = new CompactionState(c);
52 status = DoCompactionWork(compact); // 核心Compact
53 if (!status.ok()) {
54 RecordBackgroundError(status);
55 }
56 CleanupCompaction(compact);
57 c->ReleaseInputs();
58 DeleteObsoleteFiles();
59 }
60 delete c;
61
62 if (status.ok()) {
63 // Done
64 } else if (shutting_down_.Acquire_Load()) {
65 // Ignore compaction errors found during shutting down
66 } else {
67 Log(options_.info_log,
68 "Compaction error: %s", status.ToString().c_str());
69 }
70
71 if (is_manual) {
72 ManualCompaction* m = manual_compaction_;
73 if (!status.ok()) {
74 m->done = true;
75 }
76 if (!m->done) {
77 // We only compacted part of the requested range. Update *m
78 // to the range that is left to be compacted.
79 m->tmp_storage = manual_end;
80 m->begin = &m->tmp_storage;
81 }
82 manual_compaction_ = NULL;
83 }
84 }
LevelDB场景分析4--BackgroundCompaction
标签:
原文地址:http://www.cnblogs.com/onlyforcloud/p/4494608.html