Skip to content

Commit 5b53a48

Browse files
authored
Cleaner should not put deletion marker for blocks with no-compact marker (#6576)
Signed-off-by: Alex Le <[email protected]>
1 parent caaddfb commit 5b53a48

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

Diff for: pkg/compactor/blocks_cleaner_test.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,8 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
832832
startTime := ts(-10)
833833
endTime := ts(-8)
834834
block1 := createTSDBBlock(t, bucketClient, userID, startTime, endTime, nil)
835+
block2 := createTSDBBlock(t, bucketClient, userID, startTime, endTime, nil)
836+
createNoCompactionMark(t, bucketClient, userID, block2)
835837

836838
cfg := BlocksCleanerConfig{
837839
DeletionDelay: time.Hour,
@@ -862,7 +864,7 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
862864
Partitions: []Partition{
863865
{
864866
PartitionID: 0,
865-
Blocks: []ulid.ULID{block1},
867+
Blocks: []ulid.ULID{block1, block2},
866868
},
867869
},
868870
RangeStart: startTime,
@@ -893,6 +895,10 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
893895
require.NoError(t, err)
894896
require.True(t, block1DeletionMarkerExists)
895897

898+
block2DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block2.String(), metadata.DeletionMarkFilename))
899+
require.NoError(t, err)
900+
require.False(t, block2DeletionMarkerExists)
901+
896902
}
897903

898904
type mockConfigProvider struct {

Diff for: pkg/compactor/partitioned_group_info.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, use
241241
level.Info(userLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount)
242242
}()
243243
for _, blockID := range blocks {
244-
if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) {
244+
if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, userLogger, blockID) {
245245
if err := block.MarkForDeletion(ctx, userLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil {
246246
level.Warn(userLogger).Log("msg", "unable to mark block for deletion", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String())
247247
return err

0 commit comments

Comments
 (0)