Skip to content

Commit d99df7a

Browse files
committed
Merge 'Respect per-shard tablet goal and 10x default per-shard tablet count' from Tomasz Grabiec
This series achieves two things: 1) changes default number of tablet replicas per shard to be 10 in order to reduce load imbalance between shards This will result in new tables having at least 10 tablet replicas per shard by default. We want this to reduce tablet load imbalance due to differences in tablet count per shard, where some shards have 1 tablet and some shards have 2 tablets. With higher tablet count per shard, this difference-by-one is less relevant. Fixes scylladb#21967 2) introduces a global goal for tablet replica count per shard and adds logic to tablet scheduler to respect it by controlling per-table tablet count The per-shard goal is enforced by controlling average per-shard tablet replica count in a given DC, which is controlled by per-table tablet count. This is effective in respecting the limit on individual shards as long as tablet replicas are distributed evenly between shards. There is no attempt to move tablets around in order to enforce limits on individual shards in case of imbalance between shards. If the average per-shard tablet count exceeds the limit, all tables which contribute to it (have replicas in the DC) are scaled down by the same factor. Due to rounding up to the nearest power of 2, we may overshoot the per-shard goal by at most a factor of 2. The scaling is applied after computing desired tablet count due to all other factors: per-table tablet count hints, defaults, average tablet size. If different DCs want different scale factors of a given table, the lowest scale factor is chosen for a given table. When creating a new table, its tablet count is determined by tablet scheduler using the scheduler logic, as if the table was already created. So any scaling due to per-shard tablet count goal is reflected immediately when creating a table. It may however still take some time for the system to shrink existing tables. We don't reject requests to create new tables. Fixes scylladb#21458 Closes scylladb#22522 * github.com:scylladb/scylladb: config, tablets: Allow tablets_initial_scale_factor to be a fraction test: tablets_test: Test scaling when creating lots of tables test: tablets_test: Test tablet count changes on per-table option and config changes test: tablets_test: Add support for auto-split mode test: cql_test_env: Expose db config config: Make tablets_initial_scale_factor live-updateable tablets: load_balancer: Pick initial_scale_factor from config tablets, load_balancer: Fix and improve logging of resize decisions tablets, load_balancer: Log reason for target tablet count tablets: load_balancer: Move hints processing to tablet scheduler tablets: load_balancer: Scale down tablet count to respect per-shard tablet count goal tablets: Use scheduler's make_sizing_plan() to decide about tablet count of a new table tablets: load_balancer: Determine desired count from size separately from count from options tablets: load_balancer: Determine resize decision from target tablet count tablets: load_balancer: Allow splits even if table stats not available tablets: load_balancer: Extract make_sizing_plan() tablets: Add formatter for resize_decision::way_type tablets: load_balancer: Simplify resize_urgency_cmp() tablets: load_balancer: Keep config items as instance members locator: network_topology_strategy: Simplify calculate_initial_tablets_from_topology() tablets: Change the meaning of initial_scale to mean min-avg-tablets-per-shard tablets: Set default initial tablet count scale to 10 tablets: network_topology_stragy: Coroutinize calculate_initial_tablets_from_topology() tablets: load_balancer: Extract get_schema_and_rs() tablets: load_balancer: Drop test_mode
2 parents 9ec1a45 + 1a7023c commit d99df7a

20 files changed

+658
-204
lines changed

db/config.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1334,7 +1334,10 @@ db::config::config(std::shared_ptr<db::extensions> exts)
13341334
, minimum_replication_factor_warn_threshold(this, "minimum_replication_factor_warn_threshold", liveness::LiveUpdate, value_status::Used, 3, "")
13351335
, maximum_replication_factor_warn_threshold(this, "maximum_replication_factor_warn_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
13361336
, maximum_replication_factor_fail_threshold(this, "maximum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
1337-
, tablets_initial_scale_factor(this, "tablets_initial_scale_factor", value_status::Used, 1, "Calculated initial tablets are multiplied by this number")
1337+
, tablets_initial_scale_factor(this, "tablets_initial_scale_factor", liveness::LiveUpdate, value_status::Used, 10,
1338+
"Minimum average number of tablet replicas per shard per table. Suppressed by tablet options in table's schema: min_per_shard_tablet_count and min_tablet_count")
1339+
, tablets_per_shard_goal(this, "tablets_per_shard_goal", liveness::LiveUpdate, value_status::Used, 100,
1340+
"The goal for the maximum number of tablet replicas per shard. Tablet allocator tries to keep this goal.")
13381341
, target_tablet_size_in_bytes(this, "target_tablet_size_in_bytes", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size,
13391342
"Allows target tablet size to be configured. Defaults to 5G (in bytes). Maintaining tablets at reasonable sizes is important to be able to " \
13401343
"redistribute load. A higher value means tablet migration throughput can be reduced. A lower value may cause number of tablets to increase significantly, " \

db/config.hh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,8 @@ public:
502502
named_value<int> maximum_replication_factor_warn_threshold;
503503
named_value<int> maximum_replication_factor_fail_threshold;
504504

505-
named_value<int> tablets_initial_scale_factor;
505+
named_value<double> tablets_initial_scale_factor;
506+
named_value<unsigned> tablets_per_shard_goal;
506507
named_value<uint64_t> target_tablet_size_in_bytes;
507508

508509
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_warn_list;

locator/network_topology_strategy.cc

Lines changed: 2 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -296,64 +296,13 @@ effective_replication_map_ptr network_topology_strategy::make_replication_map(ta
296296
return do_make_replication_map(table, shared_from_this(), std::move(tm), _rep_factor);
297297
}
298298

299-
//
300-
// Try to use as many tablets initially, so that all shards in the current topology
301-
// are covered with at least `min_per_shard_tablet_count` tablets. In other words, the value is
302-
//
303-
// initial_tablets = max(nr_shards_in(dc) / RF_in(dc) for dc in datacenters)
304-
//
305-
306-
static unsigned calculate_initial_tablets_from_topology(const schema& s, token_metadata_ptr tm, const std::unordered_map<sstring, size_t>& rf, double min_per_shard_tablet_count = 0) {
307-
unsigned initial_tablets = std::numeric_limits<unsigned>::min();
308-
std::unordered_map<sstring, unsigned> shards_per_dc_map;
309-
tm->for_each_token_owner([&] (const node& node) {
310-
if (node.is_normal()) {
311-
shards_per_dc_map[node.dc_rack().dc] += node.get_shard_count();
312-
}
313-
});
314-
for (const auto& [dc, rf_in_dc] : rf) {
315-
if (!rf_in_dc) {
316-
continue;
317-
}
318-
unsigned shards_in_dc = shards_per_dc_map[dc];
319-
unsigned tablets_in_dc = (shards_in_dc + rf_in_dc - 1) / rf_in_dc;
320-
if (min_per_shard_tablet_count) {
321-
auto min_tablets_in_dc = std::ceil((double)(min_per_shard_tablet_count * shards_in_dc) / rf_in_dc);
322-
tablets_in_dc = std::max<unsigned>(tablets_in_dc, min_tablets_in_dc);
323-
}
324-
initial_tablets = std::max(initial_tablets, tablets_in_dc);
325-
}
326-
rslogger.debug("Estimated {} initial tablets for table {}.{}", initial_tablets, s.ks_name(), s.cf_name());
327-
return initial_tablets;
328-
}
329-
330-
size_t network_topology_strategy::calculate_min_tablet_count(schema_ptr s, token_metadata_ptr tm, uint64_t target_tablet_size, std::optional<unsigned> initial_scale) const {
331-
size_t tablet_count = get_initial_tablets();
332-
const auto& tablet_options = s->tablet_options();
333-
if (tablet_options.min_tablet_count) {
334-
tablet_count = std::max<size_t>(tablet_count, tablet_options.min_tablet_count.value());
335-
}
336-
if (tablet_options.expected_data_size_in_gb) {
337-
tablet_count = std::max<size_t>(tablet_count, (tablet_options.expected_data_size_in_gb.value() << 30) / target_tablet_size);
338-
}
339-
if (tablet_options.min_per_shard_tablet_count) {
340-
tablet_count = std::max<size_t>(tablet_count, calculate_initial_tablets_from_topology(*s, tm, _dc_rep_factor, tablet_options.min_per_shard_tablet_count.value()));
341-
}
342-
if (tablet_count == 0) {
343-
tablet_count = calculate_initial_tablets_from_topology(*s, tm, _dc_rep_factor) * initial_scale.value_or(1);
344-
}
345-
return tablet_count;
346-
}
347-
348-
future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(schema_ptr s, token_metadata_ptr tm, uint64_t target_tablet_size, std::optional<unsigned> initial_scale) const {
349-
size_t tablet_count = calculate_min_tablet_count(s, tm, target_tablet_size, initial_scale);
299+
future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(schema_ptr s, token_metadata_ptr tm, size_t tablet_count) const {
350300
auto aligned_tablet_count = 1ul << log2ceil(tablet_count);
351301
if (tablet_count != aligned_tablet_count) {
352302
rslogger.info("Rounding up tablet count from {} to {} for table {}.{}", tablet_count, aligned_tablet_count, s->ks_name(), s->cf_name());
353303
tablet_count = aligned_tablet_count;
354304
}
355-
356-
return reallocate_tablets(std::move(s), std::move(tm), tablet_map(tablet_count));
305+
co_return co_await reallocate_tablets(std::move(s), std::move(tm), tablet_map(tablet_count));
357306
}
358307

359308
future<tablet_map> network_topology_strategy::reallocate_tablets(schema_ptr s, token_metadata_ptr tm, tablet_map tablets) const {

locator/network_topology_strategy.hh

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public:
2929
return _rep_factor;
3030
}
3131

32-
size_t get_replication_factor(const sstring& dc) const {
32+
size_t get_replication_factor(const sstring& dc) const override {
3333
auto dc_factor = _dc_rep_factor.find(dc);
3434
return (dc_factor == _dc_rep_factor.end()) ? 0 : dc_factor->second;
3535
}
@@ -46,8 +46,7 @@ public:
4646

4747
public: // tablet_aware_replication_strategy
4848
virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata_ptr) const override;
49-
virtual size_t calculate_min_tablet_count(schema_ptr s, token_metadata_ptr tm, uint64_t target_tablet_size, std::optional<unsigned> initial_scale) const override;
50-
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr, uint64_t target_tablet_size, std::optional<unsigned> initial_scale = std::nullopt) const override;
49+
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr, size_t tablet_count) const override;
5150
virtual future<tablet_map> reallocate_tablets(schema_ptr, token_metadata_ptr, tablet_map cur_tablets) const override;
5251
protected:
5352
/**

locator/tablet_replication_strategy.hh

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,29 @@ private:
2929
protected:
3030
void validate_tablet_options(const abstract_replication_strategy&, const gms::feature_service&, const replication_strategy_config_options&) const;
3131
void process_tablet_options(abstract_replication_strategy&, replication_strategy_config_options&, replication_strategy_params);
32-
size_t get_initial_tablets() const { return _initial_tablets; }
3332
effective_replication_map_ptr do_make_replication_map(table_id,
3433
replication_strategy_ptr,
3534
token_metadata_ptr,
3635
size_t replication_factor) const;
3736

3837
public:
39-
/// Calculate the minimum tablet_count for a table, given the target_tablet_size, the per-table hints,
40-
/// the network topology, and the configured replication factors.
41-
virtual size_t calculate_min_tablet_count(schema_ptr s, token_metadata_ptr tm, uint64_t target_tablet_size, std::optional<unsigned> initial_scale) const = 0;
38+
size_t get_initial_tablets() const { return _initial_tablets; }
4239

4340
/// Generates tablet_map for a new table.
4441
/// Runs under group0 guard.
45-
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr, uint64_t target_tablet_size, std::optional<unsigned> initial_scale = std::nullopt) const = 0;
42+
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr, size_t tablet_count) const = 0;
4643

4744
/// Generates tablet_map for a new table or when increasing replication factor.
4845
/// For a new table, cur_tablets is initialized with the tablet_count,
4946
/// otherwise, cur_tablets is a copy of the current tablet_map.
5047
/// Runs under group0 guard.
5148
virtual future<tablet_map> reallocate_tablets(schema_ptr, token_metadata_ptr, tablet_map cur_tablets) const = 0;
49+
50+
/// Returns replication factor in a given DC.
51+
/// Note that individual tablets may lag behind desired replication factor in their
52+
/// current replica list, as replication factor changes involve table rebuilding transitions
53+
/// which are not instantaneous.
54+
virtual size_t get_replication_factor(const sstring& dc) const = 0;
5255
};
5356

5457
} // namespace locator

locator/tablets.cc

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -638,13 +638,7 @@ resize_decision::resize_decision(sstring decision, uint64_t seq_number)
638638
}
639639

640640
sstring resize_decision::type_name() const {
641-
static const std::array<sstring, 3> index_to_string = {
642-
"none",
643-
"split",
644-
"merge",
645-
};
646-
static_assert(std::variant_size_v<decltype(way)> == index_to_string.size());
647-
return index_to_string[way.index()];
641+
return fmt::format("{}", way);
648642
}
649643

650644
resize_decision::seq_number_t resize_decision::next_sequence_number() const {
@@ -1036,6 +1030,17 @@ void tablet_metadata_guard::subscribe() {
10361030

10371031
}
10381032

1033+
auto fmt::formatter<locator::resize_decision_way>::format(const locator::resize_decision_way& way, fmt::format_context& ctx) const
1034+
-> decltype(ctx.out()) {
1035+
static const std::array<sstring, 3> index_to_string = {
1036+
"none",
1037+
"split",
1038+
"merge",
1039+
};
1040+
static_assert(std::variant_size_v<locator::resize_decision_way> == index_to_string.size());
1041+
return fmt::format_to(ctx.out(), "{}", index_to_string[way.index()]);
1042+
}
1043+
10391044
auto fmt::formatter<locator::global_tablet_id>::format(const locator::global_tablet_id& id, fmt::format_context& ctx) const
10401045
-> decltype(ctx.out()) {
10411046
return fmt::format_to(ctx.out(), "{}:{}", id.table, id.tablet);

locator/tablets.hh

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,12 @@ struct resize_decision {
325325
struct merge {
326326
auto operator<=>(const merge&) const = default;
327327
};
328+
using way_type = std::variant<none, split, merge>;
328329

329330
using seq_number_t = int64_t;
330331

331-
std::variant<none, split, merge> way;
332+
way_type way;
333+
332334
// The sequence number globally identifies a resize decision.
333335
// It's monotonically increasing, globally.
334336
// Needed to distinguish stale decision from latest one, in case coordinator
@@ -354,6 +356,8 @@ struct resize_decision {
354356
seq_number_t next_sequence_number() const;
355357
};
356358

359+
using resize_decision_way = resize_decision::way_type;
360+
357361
struct table_load_stats {
358362
uint64_t size_in_bytes = 0;
359363
// Stores the minimum seq number among all replicas, as coordinator wants to know if
@@ -673,6 +677,11 @@ struct tablet_metadata_change_hint {
673677

674678
}
675679

680+
template <>
681+
struct fmt::formatter<locator::resize_decision_way> : fmt::formatter<string_view> {
682+
auto format(const locator::resize_decision_way&, fmt::format_context& ctx) const -> decltype(ctx.out());
683+
};
684+
676685
template <>
677686
struct fmt::formatter<locator::tablet_transition_stage> : fmt::formatter<string_view> {
678687
auto format(const locator::tablet_transition_stage&, fmt::format_context& ctx) const -> decltype(ctx.out());

main.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1658,7 +1658,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
16581658
gossiper.local(), feature_service.local(), sys_ks.local(), group0_client, dbcfg.gossip_scheduling_group};
16591659

16601660
service::tablet_allocator::config tacfg;
1661-
tacfg.initial_tablets_scale = cfg->tablets_initial_scale_factor();
16621661
distributed<service::tablet_allocator> tablet_allocator;
16631662
tablet_allocator.start(tacfg, std::ref(mm_notifier), std::ref(db)).get();
16641663
auto stop_tablet_allocator = defer_verbose_shutdown("tablet allocator", [&tablet_allocator] {
@@ -1693,6 +1692,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
16931692
auto stop_tsm = defer_verbose_shutdown("topology_state_machine", [&tsm] {
16941693
tsm.stop().get();
16951694
});
1695+
auto notify_topology = [&tsm] (auto) {
1696+
tsm.local().event.broadcast();
1697+
};
1698+
auto tablets_per_shard_goal_observer = cfg->tablets_per_shard_goal.observe(notify_topology);
1699+
auto tablets_initial_scale_factor_observer = cfg->tablets_initial_scale_factor.observe(notify_topology);
16961700

16971701
auto compression_dict_updated_callback = [] () -> future<> {
16981702
auto dict = co_await sys_ks.local().query_dict();

0 commit comments

Comments
 (0)