Thread: Can portals interleave with other portals
Greetings!
A Postgresql backend is capable of operating multiple portals within a transaction and switching between them on and off. For instance such a code results in an expected outcome:
```
// The table users_Fetch contains users with ids between 1 and 20
val fetchedIds = Flux.defer {
Flux.usingWhen(
// 1. Establish cursors
databaseClient.sql(
"DECLARE fetch_test1 SCROLL CURSOR FOR SELECT * FROM users_Fetch;" +
"DECLARE fetch_test2 SCROLL CURSOR FOR SELECT * FROM users_Fetch;")
.flatMap { _ -> Mono.just(databaseClient) },
// 2. Iterate cursors
{
// concat subscribes its internals sequentially
Flux.concat(
databaseClient.sql("MOVE FORWARD 3 FROM fetch_test1; FETCH FORWARD 5 FROM fetch_test1;")
.flatMap { r -> r.map { row, _ -> row.get("userId") as Int } },
databaseClient.sql("FETCH FORWARD 5 FROM fetch_test2;")
.flatMap { r -> r.map { row, _ -> row.get("userId") as Int } },
// An extended query select in between
databaseClient.sql("select userId from users_Fetch;")
.flatMap { r -> r.map { row, _ -> row.get("userId") as Int } },
databaseClient.sql("FETCH BACKWARD 5 FROM fetch_test1;")
.flatMap { r -> r.map { row, _ -> row.get("userId") as Int } },
databaseClient.sql("FETCH FORWARD 5 FROM fetch_test2;")
.flatMap { r -> r.map { row, _ -> row.get("userId") as Int } }
)
},
// 3. Close cursors
{
databaseClient.sql(
"CLOSE fetch_test1;" +
"CLOSE fetch_test2;").then()
}
)
}.`as`(transactionalOperatorFetch::transactional)
assertEquals(
listOf(
4, 5, 6, 7, 8, // MOVE FORWARD 3 FROM fetch_test1; FETCH FORWARD 5 FROM fetch_test1;
1, 2, 3, 4, 5, // FETCH FORWARD 5 FROM fetch_test2;
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, // select userId from users_Fetch;
7, 6, 5, 4, 3, // FETCH BACKWARD 5 FROM fetch_test1;
6, 7, 8, 9, 10), // FETCH FORWARD 5 FROM fetch_test2;
fetchedIds.collectList().block()
)
```
Is the same possible for conventional selects issued with extended query protocol? From the protocol perspective it would result in the following traffic:
231 53111 5432 PGSQL 109 >Q ———> BEGIN
232 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1 Ack=54 Win=6371 Len=0 TSval=2819351776 TSecr=589492423
237 5432 53111 PGSQL 73 <C/Z
238 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=54 Ack=18 Win=6366 Len=0 TSval=589492435 TSecr=2819351788
// A client issues a select
239 53111 5432 PGSQL 276 >P/B/D/E/H ———> select * from …; bind B_1; execute B_1, fetch 2 rows; flush
240 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=18 Ack=274 Win=6368 Len=0 TSval=2819351793 TSecr=589492440
245 5432 53111 PGSQL 552 <1/2/T/D/D/s ———> Data, Data, Portal suspended
…
// Then the same sequence for another prepared statement and portal (lets say B_2) but without a limit in the Execute command and sync at the end.
…
// Then the client proceeds with B_1 till the completion
270 53111 5432 PGSQL 69 > E ———> execute B_1, fetch 2 rows,
271 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=925 Ack=323 Win=6367 Len=0 TSval=2819351846 TSecr=589492493
272 53111 5432 PGSQL 61 >H ———> Flush
274 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=925 Ack=328 Win=6367 Len=0 TSval=2819351846 TSecr=589492493
282 5432 53111 PGSQL 144 <D/C ———> Command completion
283 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=328 Ack=1013 Win=6351 Len=0 TSval=589492496 TSecr=2819351849
284 53111 5432 PGSQL 66 >C ———> Close B_1
285 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1013 Ack=338 Win=6367 Len=0 TSval=2819351849 TSecr=589492496
286 53111 5432 PGSQL 61 >S ———> Sync
287 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1013 Ack=343 Win=6366 Len=0 TSval=2819351849 TSecr=589492496
293 5432 53111 PGSQL 67 <3/Z
294 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=343 Ack=1024 Win=6351 Len=0 TSval=589492498 TSecr=2819351851
295 53111 5432 PGSQL 68 >Q ———> COMMIT
I’m interested because such a communication is intrinsic to r2dbc scenarios like this
```
val usersWithAccouns = Flux.defer {
// Select all users
databaseClient.sql("select * from users where userId >= $1 and userId <= $2")
.bind("$1", 1)
.bind("$2", 255)
.flatMap { r -> r.map { row, meta -> … } }
.flatMap { user ->
// For each user select all its accounts
databaseClient.sql("select login from accounts where userId=$1 limit 1")
.bind("$1", user.id)
.flatMap { r -> r.map { row, meta -> … } }
.reduce …
}
}.`as`(transactionalOperator::transactional)
```
which results in failure owing to inner requests building up a queue inside the driver.
Thanks!
Evgeny Smirnov <evgeny.v.smirnov@gmail.com> writes: > A Postgresql backend is capable of operating multiple portals within a > transaction and switching between them on and off. For instance such a code > results in an expected outcome: > ... > Is the same possible for conventional selects issued with extended query > protocol? AFAIR, a portal is a portal. Have you tried it? If it didn't do what you expected, please provide a less handwavy example. regards, tom lane