Skip to content

feat(core): Add Supabase Queues support #15921

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import * as Sentry from '@sentry/browser';
import { createClient } from '@supabase/supabase-js';

window.Sentry = Sentry;

const supabaseClient = createClient('https://test.supabase.co', 'test-key', {
db: {
schema: 'pgmq_public',
},
});

Sentry.init({
dsn: 'https://[email protected]/1337',
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })],
tracesSampleRate: 1.0,
});

// Simulate queue operations
async function performQueueOperations() {
try {
await supabaseClient.rpc('send', {
queue_name: 'todos',
msg: { title: 'Test Todo' },
});

await supabaseClient.rpc('pop', {
queue_name: 'todos',
});
} catch (error) {
Sentry.captureException(error);
}
}

performQueueOperations();
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type { Page } from '@playwright/test';
import { expect } from '@playwright/test';
import type { Event } from '@sentry/core';
import { sentryTest } from '../../../../utils/fixtures';
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';

async function mockSupabaseRoute(page: Page) {
await page.route('**/rpc/**/send', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([0]),
headers: {
'Content-Type': 'application/json',
},
});
});

await page.route('**/rpc/**/pop', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([
{
msg_id: 0,
},
]),
headers: {
'Content-Type': 'application/json',
},
});
});
}

const bundle = process.env.PW_BUNDLE || '';
// We only want to run this in non-CDN bundle mode
if (bundle.startsWith('bundle')) {
sentryTest.skip();
}

sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => {
if (shouldSkipTracingTest()) {
return;
}

await mockSupabaseRoute(page);

const url = await getLocalTestUrl({ testDir: __dirname });

const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.'));

expect(queueSpans).toHaveLength(2);

expect(queueSpans![0]).toMatchObject({
description: 'supabase.db.rpc',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.publish',
'sentry.origin': 'auto.db.supabase',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});

expect(queueSpans![1]).toMatchObject({
description: 'supabase.db.rpc',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.process',
'sentry.origin': 'auto.db.supabase',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import * as Sentry from '@sentry/browser';
import { createClient } from '@supabase/supabase-js';

window.Sentry = Sentry;

const supabaseClient = createClient('https://test.supabase.co', 'test-key', {
db: {
schema: 'pgmq_public',
},
});

Sentry.init({
dsn: 'https://[email protected]/1337',
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })],
tracesSampleRate: 1.0,
});

// Simulate queue operations
async function performQueueOperations() {
try {
await supabaseClient.schema('pgmq_public').rpc('send', {
queue_name: 'todos',
msg: { title: 'Test Todo' },
});

await supabaseClient.schema('pgmq_public').rpc('pop', {
queue_name: 'todos',
});
} catch (error) {
Sentry.captureException(error);
}
}

performQueueOperations();
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { type Page, expect } from '@playwright/test';
import type { Event } from '@sentry/core';
import { sentryTest } from '../../../../utils/fixtures';
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';

async function mockSupabaseRoute(page: Page) {
await page.route('**/rpc/**/send', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([0]),
headers: {
'Content-Type': 'application/json',
},
});
});

await page.route('**/rpc/**/pop', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([
{
msg_id: 0,
},
]),
headers: {
'Content-Type': 'application/json',
},
});
});
}

const bundle = process.env.PW_BUNDLE || '';
// We only want to run this in non-CDN bundle mode
if (bundle.startsWith('bundle')) {
sentryTest.skip();
}

sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => {
if (shouldSkipTracingTest()) {
return;
}

await mockSupabaseRoute(page);

const url = await getLocalTestUrl({ testDir: __dirname });

const event = await getFirstSentryEnvelopeRequest<Event>(page, url);

const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.'));

expect(queueSpans).toHaveLength(2);

expect(queueSpans![0]).toMatchObject({
description: 'supabase.db.rpc',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.publish',
'sentry.origin': 'auto.db.supabase',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});

expect(queueSpans![1]).toMatchObject({
description: 'supabase.db.rpc',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.process',
'sentry.origin': 'auto.db.supabase',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"build": "next build",
"start": "next start",
"clean": "npx rimraf node_modules pnpm-lock.yaml .next",
"start-local-supabase": "supabase init --force --workdir . && supabase start -o env && supabase db reset",
"start-local-supabase": "supabase start -o env && supabase db reset",
"test:prod": "TEST_ENV=production playwright test",
"test:build": "pnpm install && pnpm start-local-supabase && pnpm build",
"test:assert": "pnpm test:prod"
Expand All @@ -25,7 +25,7 @@
"next": "14.2.25",
"react": "18.2.0",
"react-dom": "18.2.0",
"supabase": "2.19.7",
"supabase": "2.23.4",
"typescript": "4.9.5"
},
"devDependencies": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Enqueue a job to the queue

import { NextApiRequest, NextApiResponse } from 'next';
import { createClient } from '@supabase/supabase-js';
import * as Sentry from '@sentry/nextjs';

// These are the default development keys for a local Supabase instance
const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
const SUPABASE_SERVICE_ROLE_KEY =
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';

const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);

Sentry.instrumentSupabaseClient(supabaseClient);

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
// Enqueue a job to the queue
const { data, error } = await supabaseClient.schema('pgmq_public').rpc('pop', {
queue_name: 'non-existing-queue',
});

if (error) {
return res.status(500).json({ error: error.message });
}

return res.status(200).json({ data });
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Enqueue a job to the queue

import { NextApiRequest, NextApiResponse } from 'next';
import { createClient } from '@supabase/supabase-js';
import * as Sentry from '@sentry/nextjs';

// These are the default development keys for a local Supabase instance
const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
const SUPABASE_SERVICE_ROLE_KEY =
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';

const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, {
db: {
schema: 'pgmq_public',
},
});

Sentry.instrumentSupabaseClient(supabaseClient);

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
// Enqueue a job to the queue
const { data, error } = await supabaseClient.rpc('pop', {
queue_name: 'todos',
});

if (error) {
return res.status(500).json({ error: error.message });
}

return res.status(200).json({ data });
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { NextApiRequest, NextApiResponse } from 'next';
import { createClient } from '@supabase/supabase-js';
import * as Sentry from '@sentry/nextjs';

// These are the default development keys for a local Supabase instance
const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
const SUPABASE_SERVICE_ROLE_KEY =
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';

const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);

Sentry.instrumentSupabaseClient(supabaseClient);

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
// Process a job from the queue
const { data, error } = await supabaseClient.schema('pgmq_public').rpc('pop', {
queue_name: 'todos',
});

if (error) {
return res.status(500).json({ error: error.message });
}

return res.status(200).json({ data });
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { NextApiRequest, NextApiResponse } from 'next';
import { createClient } from '@supabase/supabase-js';
import * as Sentry from '@sentry/nextjs';

// These are the default development keys for a local Supabase instance
const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
const SUPABASE_SERVICE_ROLE_KEY =
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';

const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, {
db: {
schema: 'pgmq_public',
},
});

Sentry.instrumentSupabaseClient(supabaseClient);

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
// Enqueue a job to the queue
const { data, error } = await supabaseClient.rpc('send_batch', {
queue_name: 'todos',
messages: [
{
title: 'Test Todo 1',
},
{
title: 'Test Todo 2',
},
],
});

if (error) {
return res.status(500).json({ error: error.message });
}

return res.status(200).json({ data });
}
Loading