Skip to content

Add remote and local listener support #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions postgres-native-sqldelight-driver/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ kotlin {
sourceSets {
commonMain {
dependencies {
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")
api("app.cash.sqldelight:runtime:2.0.0-alpha04")
api("org.jetbrains.kotlinx:kotlinx-datetime:0.4.0")
api("app.softwork:kotlinx-uuid-core:0.0.17")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package app.softwork.sqldelight.postgresdriver

import kotlinx.cinterop.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import libpq.*
import platform.posix.*
import kotlin.coroutines.*
import kotlin.time.*

public sealed interface ListenerSupport {
public sealed interface ScopedListenerSupport : ListenerSupport {
public val notificationScope: CoroutineScope
}

public companion object {
public fun Local(notificationScope: CoroutineScope): Local {
val notifications = MutableSharedFlow<String>()
return Local(notificationScope, notifications) { notifications.emit(it) }
}
}

public class Local(
override val notificationScope: CoroutineScope,
public val notifications: Flow<String>,
public val notify: suspend (String) -> Unit
) : ScopedListenerSupport

public class Remote(
override val notificationScope: CoroutineScope,
public val notificationName: (String) -> String = { it }
) : ScopedListenerSupport {
internal fun remoteListener(conn: CPointer<PGconn>): Flow<String> = channelFlow {
@OptIn(ExperimentalCoroutinesApi::class)
val selectorContext = newSingleThreadContext("WorkerSelectorManager")
val job = Job()
val coroutineContext = selectorContext + job

try {
val socket = PQsocket(conn)
check(socket >= 0) {
"Error while connecting to the PostgreSql socket"
}
val pollfd: pollfd = memScoped { alloc() }
pollfd.fd = socket
val timeoutMs = 500

while (true) {
withContext(coroutineContext) {
@OptIn(UnsafeNumber::class)
poll(pollfd.ptr, 1, timeoutMs)
}
PQconsumeInput(conn)
var notification: PGnotify? = null
while (PQnotifies(conn)?.pointed?.also { notification = it } != null) {
notification?.let {
val tableName = it.relname!!.toKString()
PQfreemem(it.ptr)
send(tableName)
}
}
}
} finally {
job.cancel()
selectorContext.close()
}
}.shareIn(notificationScope, SharingStarted.WhileSubscribed(replayExpiration = Duration.ZERO))
}

public object None : ListenerSupport
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,102 @@ package app.softwork.sqldelight.postgresdriver
import app.cash.sqldelight.*
import app.cash.sqldelight.db.*
import kotlinx.cinterop.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import libpq.*

public class PostgresNativeDriver(private var conn: CPointer<PGconn>) : SqlDriver {
public class PostgresNativeDriver(
private val conn: CPointer<PGconn>,
private val listenerSupport: ListenerSupport
) : SqlDriver {
private var transaction: Transacter.Transaction? = null

private val notifications: Flow<String>

init {
require(PQstatus(conn) == ConnStatusType.CONNECTION_OK) {
conn.error()
}
setDateOutputs()

notifications = when (listenerSupport) {
is ListenerSupport.Local -> listenerSupport.notifications
is ListenerSupport.Remote -> listenerSupport.remoteListener(conn)
is ListenerSupport.None -> emptyFlow()
}
}

private fun setDateOutputs() {
execute(null, "SET intervalstyle = 'iso_8601';", 0)
execute(null, "SET datestyle = 'ISO';", 0)
}

private val listeners = mutableMapOf<Query.Listener, Job>()

private fun CoroutineScope.listen(queryKeysEscaped: List<String>, action: suspend (String) -> Unit) = launch {
notifications.filter {
it in queryKeysEscaped
}.collect {
action(it)
}
}

override fun addListener(listener: Query.Listener, queryKeys: Array<String>) {
when (listenerSupport) {
ListenerSupport.None -> return
is ListenerSupport.Local -> {
listeners[listener] = listenerSupport.notificationScope.listen(queryKeys.toList()) {
listener.queryResultsChanged()
}
}

is ListenerSupport.Remote -> {
val queryKeysRenamed = queryKeys.map {
listenerSupport.notificationName(it)
}
listeners[listener] = listenerSupport.notificationScope.listen(queryKeysRenamed) {
listener.queryResultsChanged()
}
for (queryKey in queryKeysRenamed) {
execute(null, "LISTEN ${conn.escaped(queryKey)}", parameters = 0)
}
}
}
}

override fun notifyListeners(queryKeys: Array<String>) {
when (listenerSupport) {
is ListenerSupport.Local -> {
listenerSupport.notificationScope.launch {
for (queryKey in queryKeys) {
listenerSupport.notify(queryKey)
}
}
}

is ListenerSupport.Remote -> {
for (queryKey in queryKeys) {
val name = listenerSupport.notificationName(queryKey)
execute(null, "NOTIFY ${conn.escaped(name)}", parameters = 0)
}
}

ListenerSupport.None -> return
}
}

override fun removeListener(listener: Query.Listener, queryKeys: Array<String>) {

val queryListeners = listeners[listener]
if (queryListeners != null) {
if (listenerSupport is ListenerSupport.Remote) {
for (queryKey in queryKeys) {
val name = listenerSupport.notificationName(queryKey)
execute(null, "UNLISTEN ${conn.escaped(name)}", parameters = 0)
}
}
queryListeners.cancel()
listeners.remove(listener)
}
}

override fun currentTransaction(): Transacter.Transaction? = transaction
Expand Down Expand Up @@ -225,6 +294,9 @@ public class PostgresNativeDriver(private var conn: CPointer<PGconn>) : SqlDrive

override fun close() {
PQfinish(conn)
for ((_, job) in listeners) {
job.cancel()
}
}

override fun newTransaction(): QueryResult.Value<Transacter.Transaction> {
Expand Down Expand Up @@ -286,8 +358,16 @@ internal fun CPointer<PGresult>?.check(conn: CPointer<PGconn>): CPointer<PGresul
return this!!
}

private fun CPointer<PGconn>.escaped(value: String): String {
val cString = PQescapeIdentifier(this, value, value.length.convert())
val escaped = cString!!.toKString()
PQfreemem(cString)
return escaped
}

public fun PostgresNativeDriver(
host: String, database: String, user: String, password: String, port: Int = 5432, options: String? = null
host: String, database: String, user: String, password: String, port: Int = 5432, options: String? = null,
listenerSupport: ListenerSupport = ListenerSupport.None
): PostgresNativeDriver {
val conn = PQsetdbLogin(
pghost = host,
Expand All @@ -301,5 +381,5 @@ public fun PostgresNativeDriver(
require(PQstatus(conn) == ConnStatusType.CONNECTION_OK) {
conn.error()
}
return PostgresNativeDriver(conn!!)
return PostgresNativeDriver(conn!!, listenerSupport = listenerSupport)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package app.softwork.sqldelight.postgresdriver

import app.cash.sqldelight.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.test.*
import kotlin.test.*
import kotlin.time.Duration.Companion.seconds

@ExperimentalCoroutinesApi
class PostgresNativeDriverTest {
@Test
fun simpleTest() {
Expand Down Expand Up @@ -208,4 +214,107 @@ class PostgresNativeDriverTest {
val results = driver.copy("1\n2\n")
assertEquals(2, results)
}

@Test
fun remoteListenerTest() = runTest {
val other = PostgresNativeDriver(
host = "localhost",
port = 5432,
user = "postgres",
database = "postgres",
password = "password",
listenerSupport = ListenerSupport.Remote(backgroundScope)
)

val driver = PostgresNativeDriver(
host = "localhost",
port = 5432,
user = "postgres",
database = "postgres",
password = "password",
listenerSupport = ListenerSupport.Remote(backgroundScope)
)

val results = MutableStateFlow(0)
val listener = object : Query.Listener {
override fun queryResultsChanged() {
results.update { it + 1 }
}
}
driver.addListener(listener, arrayOf("foo", "bar"))
withContext(Dispatchers.Default) {
val setupSocket = 2.seconds
delay(setupSocket)
}
other.notifyListeners(arrayOf("foo"))

other.notifyListeners(arrayOf("foo", "bar"))
other.notifyListeners(arrayOf("bar"))
withContext(Dispatchers.Default) {
val waitForRemoteNotifications = 2.seconds
delay(waitForRemoteNotifications)
}

driver.removeListener(listener, arrayOf("foo", "bar"))
driver.notifyListeners(arrayOf("foo"))
driver.notifyListeners(arrayOf("bar"))

withContext(Dispatchers.Default) {
val waitForRemoteNotifications = 2.seconds
delay(waitForRemoteNotifications)
}
assertEquals(4, results.value)

other.close()
driver.close()
}

@Test
fun localListenerTest() = runTest {
val notifications = MutableSharedFlow<String>()
val notificationList = async {
notifications.take(4).toList()
}

val driver = PostgresNativeDriver(
host = "localhost",
port = 5432,
user = "postgres",
database = "postgres",
password = "password",
listenerSupport = ListenerSupport.Local(
this,
notifications,
) {
notifications.emit(it)
}
)

val results = MutableStateFlow(0)
val listener = object : Query.Listener {
override fun queryResultsChanged() {
results.update { it + 1 }
}
}
driver.addListener(listener, arrayOf("foo", "bar"))
runCurrent()
driver.notifyListeners(arrayOf("foo"))
runCurrent()
driver.notifyListeners(arrayOf("foo", "bar"))
runCurrent()
driver.notifyListeners(arrayOf("bar"))
runCurrent()

driver.removeListener(listener, arrayOf("foo", "bar"))
runCurrent()
driver.notifyListeners(arrayOf("foo"))
runCurrent()
driver.notifyListeners(arrayOf("bar"))
runCurrent()

assertEquals(4, results.value)
assertEquals(listOf("foo", "foo", "bar", "bar"), notificationList.await())

driver.close()
}
}
2 changes: 2 additions & 0 deletions testing/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ kotlin {
commonMain {
dependencies {
implementation(projects.postgresNativeSqldelightDriver)
implementation("app.cash.sqldelight:coroutines-extensions:2.0.0-alpha04")
}
}
commonTest {
dependencies {
implementation(kotlin("test"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.6.4")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ CREATE TABLE IF NOT EXISTS users(
email VARCHAR(200) NOT NULL UNIQUE,
username VARCHAR(100) NOT NULL UNIQUE,
bio VARCHAR(1000) NOT NULL DEFAULT '',
image VARCHAR(255) NOT NULL DEFAULT ''
image VARCHAR(255) NOT NULL DEFAULT '',
fooID INT NULL
);
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
insertAndGet:
INSERT INTO users(email, username, bio, image)
VALUES (?, ?, ?, ?)
INSERT INTO users(email, username, bio, image, fooID)
VALUES (?, ?, ?, ?, ?)
RETURNING id;

selectByUsername:
SELECT email, username, bio, image
FROM users
WHERE username = :username;

selectByFoo:
SELECT * FROM users WHERE fooID = :fooID;

updateWhereFoo:
UPDATE users
SET email = :newEmail
WHERE fooID = :fooID;
Loading