|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from contextlib import closing |
| 4 | + |
| 5 | +import pytest |
| 6 | +from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker |
| 7 | +from fixtures.common_types import Lsn, TenantShardId |
| 8 | +from fixtures.log_helper import log |
| 9 | +from fixtures.neon_fixtures import ( |
| 10 | + NeonEnvBuilder, |
| 11 | + tenant_get_shards, |
| 12 | + wait_for_last_flush_lsn, |
| 13 | +) |
| 14 | + |
| 15 | + |
| 16 | +@pytest.mark.timeout(600) |
| 17 | +@pytest.mark.parametrize("shard_count", [1, 8, 32]) |
| 18 | +def test_sharded_ingest( |
| 19 | + neon_env_builder: NeonEnvBuilder, |
| 20 | + zenbenchmark: NeonBenchmarker, |
| 21 | + shard_count: int, |
| 22 | +): |
| 23 | + """ |
| 24 | + Benchmarks sharded ingestion throughput, by ingesting a large amount of WAL into a Safekeeper |
| 25 | + and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case |
| 26 | + (shard_count=1) to the sharded case indicates the overhead of sharding. |
| 27 | + """ |
| 28 | + |
| 29 | + ROW_COUNT = 100_000_000 # about 7 GB of WAL |
| 30 | + |
| 31 | + neon_env_builder.num_pageservers = shard_count |
| 32 | + env = neon_env_builder.init_start() |
| 33 | + |
| 34 | + # Create a sharded tenant and timeline, and migrate it to the respective pageservers. Ensure |
| 35 | + # the storage controller doesn't mess with shard placements. |
| 36 | + # |
| 37 | + # TODO: there should be a way to disable storage controller background reconciliations. |
| 38 | + # Currently, disabling reconciliation also disables foreground operations. |
| 39 | + tenant_id, timeline_id = env.create_tenant(shard_count=shard_count) |
| 40 | + |
| 41 | + for shard_number in range(0, shard_count): |
| 42 | + tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count) |
| 43 | + pageserver_id = shard_number + 1 |
| 44 | + env.storage_controller.tenant_shard_migrate(tenant_shard_id, pageserver_id) |
| 45 | + |
| 46 | + shards = tenant_get_shards(env, tenant_id) |
| 47 | + env.storage_controller.reconcile_until_idle() |
| 48 | + assert tenant_get_shards(env, tenant_id) == shards, "shards moved" |
| 49 | + |
| 50 | + # Start the endpoint. |
| 51 | + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) |
| 52 | + start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0]) |
| 53 | + |
| 54 | + # Ingest data and measure WAL volume and duration. |
| 55 | + with closing(endpoint.connect()) as conn: |
| 56 | + with conn.cursor() as cur: |
| 57 | + log.info("Ingesting data") |
| 58 | + cur.execute("set statement_timeout = 0") |
| 59 | + cur.execute("create table huge (i int, j int)") |
| 60 | + |
| 61 | + with zenbenchmark.record_duration("pageserver_ingest"): |
| 62 | + with zenbenchmark.record_duration("wal_ingest"): |
| 63 | + cur.execute(f"insert into huge values (generate_series(1, {ROW_COUNT}), 0)") |
| 64 | + |
| 65 | + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) |
| 66 | + |
| 67 | + end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0]) |
| 68 | + wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024)) |
| 69 | + zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM) |
| 70 | + |
| 71 | + assert tenant_get_shards(env, tenant_id) == shards, "shards moved" |
0 commit comments