萌约 MoeYork

一个普通的博客站点

0%

ADOC 实现分析

本文基于仓库 FEAT_7.11

base(FEAT_7.11): 5fbcc8c54d4a8704405b568d53faf23cd0722eeb

base(rocksdb): 5fbcc8c54d4a8704405b568d53faf23cd0722eeb

tools/db_bench_tool.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 相关选项
// @db_bench_tool.cc:783
DEFINE_bool(DOTA_enabled, false, "Whether trigger the DOTA framework");
DEFINE_bool(FEA_enable, false, "Trigger FEAT tuner's FEA component");
DEFINE_bool(TEA_enable, false, "Trigger FEAT tuner's TEA component");
DEFINE_int32(SILK_bandwidth_limitation, 200, "MBPS of disk limitation");
DEFINE_bool(SILK_triggered, false, "Whether the SILK tuner is triggered");
DEFINE_double(idle_rate, 1.25,
"TEA will decide this as the idle rate of the threads");
DEFINE_double(FEA_gap_threshold, 1.5,
"The negative feedback loop's threshold");
DEFINE_double(TEA_slow_flush, 0.5, "The negative feedback loop's threshold");
DEFINE_double(DOTA_tuning_gap, 1.0, "Tuning gap of the DOTA agent, in secs ");
DEFINE_int64(random_fill_average, 150,
"average inputs rate of background write operations");
DEFINE_bool(detailed_running_stats, false,
"Whether record more detailed information in report agent");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 设置相关 flag 后, 初始化局部变量 reporter_agent 为自定义的实现
// rocksdb::Benchmark::RunBenchmark
// @db_bench_tool.cc:3854
std::unique_ptr<ReporterAgent> reporter_agent;
if (FLAGS_report_interval_seconds > 0) {
if ( FLAGS_DOTA_enabled || FLAGS_TEA_enable ||
FLAGS_FEA_enable) {
// need to use another Report Agent
if (FLAGS_DOTA_tuning_gap == 0) {
reporter_agent.reset(new ReporterAgentWithTuning(
reinterpret_cast<DBImpl*>(db_.db), FLAGS_env, FLAGS_report_file,
FLAGS_report_interval_seconds, FLAGS_report_interval_seconds));
} else {
reporter_agent.reset(new ReporterAgentWithTuning(
reinterpret_cast<DBImpl*>(db_.db), FLAGS_env, FLAGS_report_file,
FLAGS_report_interval_seconds, FLAGS_DOTA_tuning_gap));
}
auto tuner_agent =
reinterpret_cast<ReporterAgentWithTuning*>(reporter_agent.get());
tuner_agent->UseFEATTuner(FLAGS_TEA_enable, FLAGS_FEA_enable);
tuner_agent->GetTuner()->set_idle_ratio(FLAGS_idle_rate);
tuner_agent->GetTuner()->set_gap_threshold(FLAGS_FEA_gap_threshold);
tuner_agent->GetTuner()->set_slow_flush_threshold(FLAGS_TEA_slow_flush);
} else if (FLAGS_detailed_running_stats) {
reporter_agent.reset(new ReporterWithMoreDetails(
reinterpret_cast<DBImpl*>(db_.db), FLAGS_env, FLAGS_report_file,
FLAGS_report_interval_seconds));
} else if (FLAGS_SILK_triggered) {
reporter_agent.reset(new ReporterAgentWithSILK(
reinterpret_cast<DBImpl*>(db_.db), FLAGS_env, FLAGS_report_file,
FLAGS_report_interval_seconds, FLAGS_value_size,
FLAGS_SILK_bandwidth_limitation));
} else { /* 注: 这里是原始的 ReporterAgent */
reporter_agent.reset(new ReporterAgent(FLAGS_env, FLAGS_report_file,
FLAGS_report_interval_seconds));
}
}

utilities/DOTA/report_agent.cc

启动路径

作者将 ReporterAgent 的定义 从 tools/db_bench_tool.cc 移动到了 include/rocksdb/utilities/report_agent.h

该实现在 ReporterAgent 构造函数中, 启动了一个新的线程, 主要逻辑为周期性的执行 DetectAndTuning 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// rocksdb::ReporterAgent::ReporterAgent
// @report_agent.h:67
reporting_thread_ = port::Thread([&]() { SleepAndReport(); });

// @report_agent.h:92
void SleepAndReport() {
...
while (true) {
// sleep report_interval_secs_
...
DetectAndTuning(secs_elapsed);
...
}
}

其中 ReporterAgent::DetectAndTuning 为虚函数, 该类本身 将其实现为空函数; 该函数在 ReporterAgent 的三个子类中被 override, 主要逻辑在 ReporterAgentWithTuning::DetectAndTuning

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// @report_agent.cc:40
void ReporterAgentWithTuning::DetectAndTuning(int secs_elapsed) {
if (secs_elapsed % tuning_gap_secs_ == 0) {
DetectChangesPoints(secs_elapsed);
// this->running_db_->immutable_db_options().job_stats->clear();
last_metrics_collect_secs = secs_elapsed;
}
if (tuning_points.empty() ||
tuning_points.front().change_timing < secs_elapsed) {
return;
} else {
PopChangePoints(secs_elapsed);
}
}

// @report_agent.cc:182
void ReporterWithMoreDetails::DetectAndTuning(int secs_elapsed) {
// report_file_->Append(",");
// ReportLine(secs_elapsed, total_ops_done_);
secs_elapsed++;
}

// 没有 ReporterAgentWithSILK::DetectAndTuning

作者在仓库 README 中提到:

The SILK implementation in this repo is a placeholder, since we failed to reproduce the read performance as mentioned in its paper.

调节过程

DetectChangesPointsPopChangePoints 中, 该实现 通过 tuner->DetectTuningOperations 函数动态获得新的 rocksdb 的配置选项 change_points; 然后通过 ApplyChangePointsInstantly 更新数据库和列族设置选项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// @report_agent.cc:31
void ReporterAgentWithTuning::DetectChangesPoints(int sec_elapsed) {
std::vector<ChangePoint> change_points;
if (applying_changes) {
return;
}
tuner->DetectTuningOperations(sec_elapsed, &change_points);
ApplyChangePointsInstantly(&change_points);
}

// @report_agent.cc:169
void ReporterAgentWithTuning::PopChangePoints(int secs_elapsed) {
std::vector<ChangePoint> valid_point;
for (auto it = tuning_points.begin(); it != tuning_points.end(); it++) {
if (it->change_timing <= secs_elapsed) {
if (running_db_ != nullptr) {
valid_point.push_back(*it);
}
tuning_points.erase(it--);
}
}
ApplyChangePointsInstantly(&valid_point);
}

utilities/DOTA/DOTA_tuner.cc

本文件实现了两个 Tuner: FEAT_TunerDOTA_Tuner, 需要注意 FEAT_TunerDOTA_Tuner 的子类, override 了 DetectTuningOperations

新的设置列表生成 (std::vector<ChangePoint>)

新的设置参数计算操作主要在 DetectTuningOperations 中, DOTA_Tuner 在打分和获取统计信息后调用 AdjustmentTuning 获得新的设置列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// @DOTA_tuner.cc:12
void DOTA_Tuner::DetectTuningOperations(
int secs_elapsed, std::vector<ChangePoint> *change_list_ptr) {
current_sec = secs_elapsed;
// UpdateSystemStats();
SystemScores current_score = ScoreTheSystem();
UpdateMaxScore(current_score);
scores.push_back(current_score);
gradients.push_back(current_score - scores.front());

auto thread_stat = LocateThreadStates(current_score);
auto batch_stat = LocateBatchStates(current_score);

AdjustmentTuning(change_list_ptr, current_score, thread_stat, batch_stat);
// decide the operation based on the best behavior and last behavior
// update the histories
last_thread_states = thread_stat;
last_batch_stat = batch_stat;
tuning_rounds++;
}

// @DOTA_tuner.cc:195
void DOTA_Tuner::AdjustmentTuning(std::vector<ChangePoint> *change_list,
SystemScores &score,
ThreadStallLevels thread_levels,
BatchSizeStallLevels batch_levels) {
// tune for thread number
/* 注: 这里得到状态 */
auto tuning_op = VoteForOP(score, thread_levels, batch_levels);
// tune for memtable
/* 注: 这里根据状态(tuning_op)更新数据库设置列表(change_list) */
FillUpChangeList(change_list, tuning_op);
}

FEAT_Tuner 计算状态 (TuningOP) 的方式与 DOTA_Tuner 不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void FEAT_Tuner::DetectTuningOperations(int /*secs_elapsed*/,
std::vector<ChangePoint> *change_list) {
// first, we tune only when the flushing speed is slower than before
auto current_score = this->ScoreTheSystem();
if (current_score.flush_speed_avg == 0) return ;
scores.push_back(current_score);
if (scores.size() == 1) {
return;
}
this->UpdateMaxScore(current_score);
if (scores.size() >= (size_t)this->score_array_len) {
// remove the first record
scores.pop_front();
}
CalculateAvgScore();

current_score_ = current_score;

if (current_score_.flush_speed_avg >0 ){
TuningOP result{kKeep, kKeep};
if (TEA_enable) {
result = TuneByTEA();
}
if (FEA_enable) {
TuningOP fea_result = TuneByFEA();
result.BatchOp = fea_result.BatchOp;
}
FillUpChangeList(change_list, result);

}
}

设置参数细节

FillUpChangeList 会根据计算好的状态 更新参数, 然后通过 SetBatchSize SetThreadNum 追加到 std::vector<ChangePoint> *change_list

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// @DOTA_tuner.cc:251
inline void DOTA_Tuner::SetBatchSize(std::vector<ChangePoint> *change_list,
uint64_t target_value) {
ChangePoint memtable_size_cp;
ChangePoint L1_total_size;
ChangePoint sst_size_cp;
// ChangePoint write_buffer_number;

sst_size_cp.opt = sst_size;
L1_total_size.opt = total_l1_size;
// adjust the memtable size
memtable_size_cp.db_width = false;
memtable_size_cp.opt = memtable_size;

target_value = std::max(target_value, min_memtable_size);
target_value = std::min(target_value, max_memtable_size);

// SST sizes should be controlled to be the same as memtable size
memtable_size_cp.value = std::to_string(target_value);
sst_size_cp.value = std::to_string(target_value);

// calculate the total size of L1
uint64_t l1_size = current_opt.level0_file_num_compaction_trigger *
current_opt.min_write_buffer_number_to_merge *
target_value;

L1_total_size.value = std::to_string(l1_size);
sst_size_cp.db_width = false;
L1_total_size.db_width = false;

// change_list->push_back(write_buffer_number);
change_list->push_back(memtable_size_cp);
change_list->push_back(L1_total_size);
change_list->push_back(sst_size_cp);
}

// @DOTA_tuner.cc:240
inline void DOTA_Tuner::SetThreadNum(std::vector<ChangePoint> *change_list,
int target_value) {
ChangePoint thread_num_cp;
thread_num_cp.opt = max_bg_jobs;
thread_num_cp.db_width = true;
target_value = std::max(target_value, min_thread);
target_value = std::min(target_value, max_thread);
thread_num_cp.value = std::to_string(target_value);
change_list->push_back(thread_num_cp);
}