@@ -66,7 +66,7 @@ public class PostgresNativeDriver(private var conn: CPointer<PGconn>) : SqlDrive
66
66
paramValues = preparedStatement?.values(this ),
67
67
paramFormats = preparedStatement?.formats?.refTo(0 ),
68
68
paramLengths = preparedStatement?.lengths?.refTo(0 ),
69
- resultFormat = BINARY_RESULT_FORMAT
69
+ resultFormat = TEXT_RESULT_FORMAT
70
70
)
71
71
}
72
72
} else {
@@ -78,7 +78,7 @@ public class PostgresNativeDriver(private var conn: CPointer<PGconn>) : SqlDrive
78
78
paramValues = preparedStatement?.values(this ),
79
79
paramFormats = preparedStatement?.formats?.refTo(0 ),
80
80
paramLengths = preparedStatement?.lengths?.refTo(0 ),
81
- resultFormat = BINARY_RESULT_FORMAT ,
81
+ resultFormat = TEXT_RESULT_FORMAT ,
82
82
paramTypes = preparedStatement?.types?.refTo(0 )
83
83
)
84
84
}
@@ -87,11 +87,12 @@ public class PostgresNativeDriver(private var conn: CPointer<PGconn>) : SqlDrive
87
87
return QueryResult .Value (value = result.rows)
88
88
}
89
89
90
- private val CPointer <PGresult >.rows: Long get() {
91
- val rows = PQcmdTuples (this )!! .toKString()
92
- clear()
93
- return rows.toLongOrNull() ? : 0
94
- }
90
+ private val CPointer <PGresult >.rows: Long
91
+ get() {
92
+ val rows = PQcmdTuples (this )!! .toKString()
93
+ clear()
94
+ return rows.toLongOrNull() ? : 0
95
+ }
95
96
96
97
private fun preparedStatementExists (identifier : Int ): Boolean {
97
98
val result =
@@ -104,35 +105,92 @@ public class PostgresNativeDriver(private var conn: CPointer<PGconn>) : SqlDrive
104
105
return result.value != null
105
106
}
106
107
107
- private fun Int.escapeNegative (): String = if (this < 0 ) " _${toString().substring(1 )} " else toString()
108
+ private fun Int.escapeNegative (): String = if (this < 0 ) " _${toString().substring(1 )} " else toString()
108
109
109
- override fun <R > executeQuery (
110
+ private fun preparedStatement (
111
+ parameters : Int ,
112
+ binders : (SqlPreparedStatement .() -> Unit )?
113
+ ): PostgresPreparedStatement ? = if (parameters != 0 ) {
114
+ PostgresPreparedStatement (parameters).apply {
115
+ if (binders != null ) {
116
+ binders()
117
+ }
118
+ }
119
+ } else null
120
+
121
+ public fun <R > executeQueryWithNativeCursor (
110
122
identifier : Int? ,
111
123
sql : String ,
112
124
mapper : (SqlCursor ) -> R ,
113
125
parameters : Int ,
126
+ fetchSize : Int = 1,
114
127
binders : (SqlPreparedStatement .() -> Unit )?
115
128
): QueryResult .Value <R > {
116
129
val cursorName = if (identifier == null ) " myCursor" else " cursor${identifier.escapeNegative()} "
117
130
val cursor = " DECLARE $cursorName CURSOR FOR"
118
- val preparedStatement = if (parameters != 0 ) {
119
- PostgresPreparedStatement (parameters).apply {
120
- if (binders != null ) {
121
- binders()
122
- }
123
- }
124
- } else null
131
+
132
+ val preparedStatement = preparedStatement(parameters, binders)
125
133
val result = if (identifier != null ) {
126
- if (! preparedStatementExists(identifier)) {
127
- PQprepare (
134
+ checkPreparedStatement(identifier, " $cursor $sql " , parameters, preparedStatement)
135
+ conn.exec(" BEGIN" )
136
+ memScoped {
137
+ PQexecPrepared (
128
138
conn,
129
139
stmtName = identifier.toString(),
130
- query = " $cursor $sql " ,
131
140
nParams = parameters,
132
- paramTypes = preparedStatement?.types?.refTo(0 )
133
- ).check(conn).clear()
141
+ paramValues = preparedStatement?.values(this ),
142
+ paramLengths = preparedStatement?.lengths?.refTo(0 ),
143
+ paramFormats = preparedStatement?.formats?.refTo(0 ),
144
+ resultFormat = TEXT_RESULT_FORMAT
145
+ )
134
146
}
147
+ } else {
135
148
conn.exec(" BEGIN" )
149
+ memScoped {
150
+ PQexecParams (
151
+ conn,
152
+ command = " $cursor $sql " ,
153
+ nParams = parameters,
154
+ paramValues = preparedStatement?.values(this ),
155
+ paramLengths = preparedStatement?.lengths?.refTo(0 ),
156
+ paramFormats = preparedStatement?.formats?.refTo(0 ),
157
+ paramTypes = preparedStatement?.types?.refTo(0 ),
158
+ resultFormat = TEXT_RESULT_FORMAT
159
+ )
160
+ }
161
+ }.check(conn)
162
+
163
+ val value = PostgresCursor .RealCursor (result, cursorName, conn, fetchSize).use(mapper)
164
+ return QueryResult .Value (value = value)
165
+ }
166
+
167
+ private fun checkPreparedStatement (
168
+ identifier : Int ,
169
+ sql : String ,
170
+ parameters : Int ,
171
+ preparedStatement : PostgresPreparedStatement ?
172
+ ) {
173
+ if (! preparedStatementExists(identifier)) {
174
+ PQprepare (
175
+ conn,
176
+ stmtName = identifier.toString(),
177
+ query = sql,
178
+ nParams = parameters,
179
+ paramTypes = preparedStatement?.types?.refTo(0 )
180
+ ).check(conn).clear()
181
+ }
182
+ }
183
+
184
+ override fun <R > executeQuery (
185
+ identifier : Int? ,
186
+ sql : String ,
187
+ mapper : (SqlCursor ) -> R ,
188
+ parameters : Int ,
189
+ binders : (SqlPreparedStatement .() -> Unit )?
190
+ ): QueryResult .Value <R > {
191
+ val preparedStatement = preparedStatement(parameters, binders)
192
+ val result = if (identifier != null ) {
193
+ checkPreparedStatement(identifier, sql, parameters, preparedStatement)
136
194
memScoped {
137
195
PQexecPrepared (
138
196
conn,
@@ -141,26 +199,25 @@ public class PostgresNativeDriver(private var conn: CPointer<PGconn>) : SqlDrive
141
199
paramValues = preparedStatement?.values(this ),
142
200
paramLengths = preparedStatement?.lengths?.refTo(0 ),
143
201
paramFormats = preparedStatement?.formats?.refTo(0 ),
144
- resultFormat = BINARY_RESULT_FORMAT
202
+ resultFormat = TEXT_RESULT_FORMAT
145
203
)
146
204
}
147
205
} else {
148
- conn.exec(" BEGIN" )
149
206
memScoped {
150
207
PQexecParams (
151
208
conn,
152
- command = " $cursor $ sql" ,
209
+ command = sql,
153
210
nParams = parameters,
154
211
paramValues = preparedStatement?.values(this ),
155
212
paramLengths = preparedStatement?.lengths?.refTo(0 ),
156
213
paramFormats = preparedStatement?.formats?.refTo(0 ),
157
214
paramTypes = preparedStatement?.types?.refTo(0 ),
158
- resultFormat = BINARY_RESULT_FORMAT
215
+ resultFormat = TEXT_RESULT_FORMAT
159
216
)
160
217
}
161
218
}.check(conn)
162
219
163
- val value = PostgresCursor (result, cursorName, conn ).use(mapper)
220
+ val value = PostgresCursor . NoCursor (result).use(mapper)
164
221
return QueryResult .Value (value = value)
165
222
}
166
223
@@ -232,29 +289,73 @@ private fun CPointer<PGresult>?.check(conn: CPointer<PGconn>): CPointer<PGresult
232
289
return this !!
233
290
}
234
291
235
- /* *
236
- * Must be inside a transaction!
237
- */
238
- public class PostgresCursor (
239
- private var result : CPointer <PGresult >,
240
- private val name : String ,
241
- private val conn : CPointer <PGconn >
292
+ public sealed class PostgresCursor (
293
+ internal var result : CPointer <PGresult >
242
294
) : SqlCursor, Closeable {
243
- override fun close () {
244
- result.clear()
245
- conn.exec(" CLOSE $name " )
246
- conn.exec(" END" )
295
+ internal abstract val currentRowIndex: Int
296
+
297
+ /* *
298
+ * Must be inside a transaction!
299
+ */
300
+ internal class RealCursor (
301
+ result : CPointer <PGresult >,
302
+ private val name : String ,
303
+ private val conn : CPointer <PGconn >,
304
+ private val fetchSize : Int
305
+ ) : PostgresCursor(result) {
306
+ override fun close () {
307
+ result.clear()
308
+ conn.exec(" CLOSE $name " )
309
+ conn.exec(" END" )
310
+ }
311
+
312
+ override var currentRowIndex = - 1
313
+ private var maxRowIndex = - 1
314
+
315
+ override fun next (): Boolean {
316
+ if (currentRowIndex == maxRowIndex) {
317
+ currentRowIndex = - 1
318
+ }
319
+ if (currentRowIndex == - 1 ) {
320
+ result = PQexec (conn, " FETCH $fetchSize IN $name " ).check(conn)
321
+ maxRowIndex = PQntuples (result) - 1
322
+ }
323
+ return if (currentRowIndex < maxRowIndex) {
324
+ currentRowIndex + = 1
325
+ true
326
+ } else false
327
+ }
328
+ }
329
+
330
+ internal class NoCursor (
331
+ result : CPointer <PGresult >
332
+ ) : PostgresCursor(result) {
333
+ override fun close () {
334
+ result.clear()
335
+ }
336
+
337
+ private val maxRowIndex = PQntuples (result) - 1
338
+ override var currentRowIndex = - 1
339
+
340
+ override fun next (): Boolean {
341
+ return if (currentRowIndex < maxRowIndex) {
342
+ currentRowIndex + = 1
343
+ true
344
+ } else {
345
+ false
346
+ }
347
+ }
247
348
}
248
349
249
350
override fun getBoolean (index : Int ): Boolean? = getString(index)?.toBoolean()
250
351
251
352
override fun getBytes (index : Int ): ByteArray? {
252
- val isNull = PQgetisnull (result, tup_num = 0 , field_num = index) == 1
353
+ val isNull = PQgetisnull (result, tup_num = currentRowIndex , field_num = index) == 1
253
354
return if (isNull) {
254
355
null
255
356
} else {
256
- val bytes = PQgetvalue (result, tup_num = 0 , field_num = index)!!
257
- val length = PQgetlength (result, tup_num = 0 , field_num = index)
357
+ val bytes = PQgetvalue (result, tup_num = currentRowIndex , field_num = index)!!
358
+ val length = PQgetlength (result, tup_num = currentRowIndex , field_num = index)
258
359
bytes.fromHex(length)
259
360
}
260
361
}
@@ -284,11 +385,11 @@ public class PostgresCursor(
284
385
override fun getLong (index : Int ): Long? = getString(index)?.toLong()
285
386
286
387
override fun getString (index : Int ): String? {
287
- val isNull = PQgetisnull (result, tup_num = 0 , field_num = index) == 1
388
+ val isNull = PQgetisnull (result, tup_num = currentRowIndex , field_num = index) == 1
288
389
return if (isNull) {
289
390
null
290
391
} else {
291
- val value = PQgetvalue (result, tup_num = 0 , field_num = index)
392
+ val value = PQgetvalue (result, tup_num = currentRowIndex , field_num = index)
292
393
value!! .toKString()
293
394
}
294
395
}
@@ -302,11 +403,6 @@ public class PostgresCursor(
302
403
303
404
public fun getInterval (index : Int ): Duration ? = getString(index)?.let { Duration .parseIsoString(it) }
304
405
public fun getUUID (index : Int ): UUID ? = getString(index)?.toUUID()
305
-
306
- override fun next (): Boolean {
307
- result = PQexec (conn, " FETCH NEXT IN $name " ).check(conn)
308
- return PQcmdTuples (result)!! .toKString().toInt() == 1
309
- }
310
406
}
311
407
312
408
public class PostgresPreparedStatement (private val parameters : Int ) : SqlPreparedStatement {
0 commit comments