Skip to content

Commit e2ebf25

Browse files
committed
1 parent 474a166 commit e2ebf25

7 files changed

Lines changed: 56 additions & 50 deletions

File tree

src/main/kotlin/com/github/mgramin/sqlboot/model/resourcetype/impl/sql/SqlResourceType.kt

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,14 @@ import com.github.mgramin.sqlboot.model.resource.impl.DbResourceImpl
3030
import com.github.mgramin.sqlboot.model.resourcetype.ResourceType
3131
import com.github.mgramin.sqlboot.model.uri.Uri
3232
import com.github.mgramin.sqlboot.model.uri.impl.DbUri
33+
import com.github.mgramin.sqlboot.sql.select.SelectQuery
3334
import com.github.mgramin.sqlboot.sql.select.impl.SimpleSelectQuery
3435
import com.github.mgramin.sqlboot.sql.select.wrappers.JdbcSelectQuery
3536
import com.github.mgramin.sqlboot.sql.select.wrappers.OrderedSelectQuery
3637
import com.github.mgramin.sqlboot.sql.select.wrappers.PaginatedSelectQuery
3738
import com.github.mgramin.sqlboot.template.generator.impl.GroovyTemplateGenerator
3839
import org.apache.commons.lang3.StringUtils.strip
3940
import reactor.core.publisher.Flux
40-
import reactor.core.publisher.Mono
41-
import reactor.core.publisher.toFlux
4241
import reactor.core.scheduler.Schedulers
4342

4443
/**
@@ -63,35 +62,28 @@ class SqlResourceType(
6362
}
6463

6564
override fun read(uri: Uri): Sequence<DbResource> {
66-
val result: ArrayList<Mono<Sequence<DbResourceImpl>>> = arrayListOf()
67-
for (connection in connections) {
65+
val mergeSequential: Flux<Map<String, Any>> =
66+
Flux.mergeSequential(
67+
connections
68+
.map { connection -> createQuery(uri, connection).execute(hashMapOf("uri" to uri)) }
69+
.map { it.parallel().runOn(Schedulers.parallel()) }
70+
.map { connection -> connection.map { it } })
6871

69-
val fromCallable = Mono.fromCallable {
70-
val map: Sequence<DbResourceImpl> =
71-
createQuery(uri, connection)
72-
.execute(hashMapOf("uri" to uri))
73-
.map { o ->
74-
val path = o.entries
75-
.filter { v -> v.key.startsWith("@") }
76-
.map { it.value.toString() }
77-
val name = path[path.size - 1]
78-
val headers = o.entries
79-
.map { strip(it.key, "@") to it.value }
80-
.toMap()
81-
val newHeaders = headers.toMutableMap()
82-
newHeaders["database"] = connection.name()
83-
DbResourceImpl(name, this, DbUri(this.name(), path), newHeaders)
84-
}
85-
return@fromCallable map
86-
}.publishOn(Schedulers.parallel())
87-
88-
result.add(fromCallable)
89-
}
90-
91-
val mergeSequential: Flux<Sequence<DbResourceImpl>> = Flux.mergeSequential(result)
92-
val map = mergeSequential.map { it.toFlux() }.flatMap { it }.collectList().block()
93-
94-
return map.asSequence()
72+
return mergeSequential
73+
.parallel()
74+
.runOn(Schedulers.parallel())
75+
.map { o ->
76+
val path = o.entries
77+
.filter { v -> v.key.startsWith("@") }
78+
.map { it.value.toString() }
79+
val name = path[path.size - 1]
80+
val headers = o.entries
81+
.map { strip(it.key, "@") to it.value }
82+
.toMap()
83+
// val newHeaders = headers.toMutableMap()
84+
// newHeaders["database"] = connection.name()
85+
DbResourceImpl(name, this, DbUri(this.name(), path), headers) as DbResource
86+
}.sequential().collectList().block().asSequence()
9587
}
9688

9789
override fun metaData(): Map<String, String> {
@@ -101,7 +93,7 @@ class SqlResourceType(
10193
return newColumns
10294
}
10395

104-
private fun createQuery(uri: Uri, connection: DbConnection): JdbcSelectQuery {
96+
private fun createQuery(uri: Uri, connection: DbConnection): SelectQuery {
10597
return JdbcSelectQuery(
10698
PaginatedSelectQuery(
10799
OrderedSelectQuery(

src/main/kotlin/com/github/mgramin/sqlboot/sql/select/SelectQuery.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
package com.github.mgramin.sqlboot.sql.select
2626

27+
import reactor.core.publisher.Flux
28+
2729
/**
2830
* Simple select SQL-query
2931
*
@@ -51,7 +53,7 @@ interface SelectQuery {
5153
*
5254
* @return query result
5355
*/
54-
fun execute(variables: Map<String, Any>): Sequence<Map<String, Any>>
56+
fun execute(variables: Map<String, Any>): Flux<Map<String, Any>>
5557

5658

5759
class Column(private val name: String, private val comment: String, private val properties: Map<String, String>) {

src/main/kotlin/com/github/mgramin/sqlboot/sql/select/impl/FakeSelectQuery.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
package com.github.mgramin.sqlboot.sql.select.impl
2626

2727
import com.github.mgramin.sqlboot.sql.select.SelectQuery
28+
import reactor.core.publisher.Flux
2829

2930
class FakeSelectQuery : SelectQuery {
3031

@@ -40,7 +41,7 @@ class FakeSelectQuery : SelectQuery {
4041
return mapOf("n" to "First name", "mail" to "Personal email")
4142
}
4243

43-
override fun execute(variables: Map<String, Any>): Sequence<Map<String, Any>> {
44+
override fun execute(variables: Map<String, Any>): Flux<Map<String, Any>> {
4445
TODO("not implemented")
4546
}
4647

src/main/kotlin/com/github/mgramin/sqlboot/sql/select/impl/SimpleSelectQuery.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import com.github.mgramin.sqlboot.sql.select.impl.parser.SELECTParser
3131
import com.github.mgramin.sqlboot.template.generator.TemplateGenerator
3232
import org.antlr.v4.runtime.ANTLRInputStream
3333
import org.antlr.v4.runtime.CommonTokenStream
34+
import reactor.core.publisher.Flux
3435

3536
class SimpleSelectQuery(private val templateGenerator: TemplateGenerator) : SelectQuery {
3637

@@ -45,7 +46,7 @@ class SimpleSelectQuery(private val templateGenerator: TemplateGenerator) : Sele
4546
.toMap()
4647
}
4748

48-
override fun execute(variables: Map<String, Any>): Sequence<Map<String, Any>> {
49+
override fun execute(variables: Map<String, Any>): Flux<Map<String, Any>> {
4950
throw RuntimeException("Not allow here")
5051
}
5152

src/main/kotlin/com/github/mgramin/sqlboot/sql/select/wrappers/JdbcSelectQuery.kt

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ import com.github.mgramin.sqlboot.sql.select.SelectQuery
2828
import com.github.mgramin.sqlboot.template.generator.impl.GroovyTemplateGenerator
2929
import org.slf4j.LoggerFactory
3030
import org.springframework.jdbc.core.JdbcTemplate
31+
import reactor.core.publisher.Flux
32+
import reactor.core.publisher.Mono
33+
import reactor.core.publisher.toFlux
3134
import javax.sql.DataSource
3235

3336
/**
@@ -39,7 +42,6 @@ import javax.sql.DataSource
3942
*/
4043
class JdbcSelectQuery(
4144
private val origin: SelectQuery,
42-
@Deprecated("")
4345
private val dataSource: DataSource,
4446
private val nullAlias: String
4547
) : SelectQuery {
@@ -56,23 +58,31 @@ class JdbcSelectQuery(
5658
return origin.columns()
5759
}
5860

59-
override fun execute(variables: Map<String, Any>): Sequence<Map<String, Any>> {
61+
override fun execute(variables: Map<String, Any>): Flux<Map<String, Any>> {
6062
val sqlText = GroovyTemplateGenerator(origin.query()).generate(variables)
6163
logger.info(sqlText)
62-
val rowSet = JdbcTemplate(dataSource).queryForRowSet(sqlText)
63-
return object : Iterator<Map<String, Any>> {
64-
override fun hasNext(): Boolean {
65-
return rowSet.next()
66-
}
6764

68-
override fun next(): Map<String, Any> {
69-
return rowSet
70-
.metaData
71-
.columnNames
72-
.map { it.toLowerCase() to (rowSet.getObject(it) ?: nullAlias) }
73-
.toMap()
65+
return Mono.fromCallable {
66+
val rowSet = JdbcTemplate(dataSource).queryForRowSet(sqlText)
67+
return@fromCallable object : Iterator<Map<String, Any>> {
68+
override fun hasNext(): Boolean {
69+
return rowSet.next()
70+
}
71+
72+
override fun next(): Map<String, Any> {
73+
return rowSet
74+
.metaData
75+
.columnNames
76+
.map { it.toLowerCase() to (rowSet.getObject(it) ?: nullAlias) }
77+
.toMap()
78+
}
7479
}
75-
}.asSequence()
80+
}
81+
.map { it.toFlux() }
82+
.toFlux()
83+
.flatMap { it }
84+
85+
7686
}
7787

7888
}

src/test/kotlin/com/github/mgramin/sqlboot/sql/select/wrappers/JdbcSelectQueryTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class JdbcSelectQueryTest {
4747

4848
@Test
4949
fun execute() {
50-
val rows = JdbcSelectQuery(FakeSelectQuery(), this.dataSource!!).execute(hashMapOf()).toList()
50+
val rows = JdbcSelectQuery(FakeSelectQuery(), this.dataSource!!).execute(hashMapOf()).collectList().block()
5151
assertEquals(arrayListOf(linkedMapOf("n" to "mkyong", "mail" to "mkyong@gmail.com"),
5252
linkedMapOf("n" to "alex", "mail" to "alex@yahoo.com"),
5353
linkedMapOf("n" to "joel", "mail" to "joel@gmail.com")),

src/test/kotlin/com/github/mgramin/sqlboot/sql/select/wrappers/OrderedSelectQueryTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ internal class OrderedSelectQueryTest {
6060
FakeSelectQuery(),
6161
mapOf("n" to "asc", "mail" to "desc")),
6262
this.dataSource!!)
63-
val execute: Sequence<Map<String, Any>> = selectQuery.execute(emptyMap())
63+
val execute: Sequence<Map<String, Any>> = selectQuery.execute(emptyMap()).collectList().block().asSequence()
6464
println("result = ${execute.toList()}")
6565
}
6666

0 commit comments

Comments
 (0)