Skip to content

Commit 79fefa7

Browse files
committed
1 parent 0c096fc commit 79fefa7

3 files changed

Lines changed: 29 additions & 19 deletions

File tree

src/main/kotlin/com/github/mgramin/sqlboot/model/resourcetype/impl/sql/SqlResourceType.kt

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import com.github.mgramin.sqlboot.sql.select.wrappers.PaginatedSelectQuery
3838
import com.github.mgramin.sqlboot.template.generator.impl.GroovyTemplateGenerator
3939
import org.apache.commons.lang3.StringUtils.strip
4040
import reactor.core.publisher.Flux
41-
import reactor.core.scheduler.Schedulers
4241

4342
/**
4443
* Created by MGramin on 12.07.2017.
@@ -63,15 +62,22 @@ class SqlResourceType(
6362

6463
override fun read(uri: Uri): Sequence<DbResource> {
6564
val mergeSequential: Flux<Map<String, Any>> =
66-
Flux.mergeSequential(
65+
Flux.merge(
6766
connections
68-
.map { connection -> createQuery(uri, connection).execute(hashMapOf("uri" to uri)) }
69-
.map { it.parallel().runOn(Schedulers.parallel()) }
70-
.map { connection -> connection.map { it } })
67+
.asSequence()
68+
.map { connection ->
69+
return@map createQuery(uri, connection).execute(hashMapOf("uri" to uri))
70+
.map<Map<String, Any>?> {
71+
val mutableMap = it.toMutableMap()
72+
mutableMap["database"] = connection.name()
73+
mutableMap
74+
}
75+
}
76+
.toList()
77+
78+
)
7179

7280
return mergeSequential
73-
.parallel()
74-
.runOn(Schedulers.parallel())
7581
.map { o ->
7682
val path = o.entries
7783
.filter { v -> v.key.startsWith("@") }
@@ -80,10 +86,11 @@ class SqlResourceType(
8086
val headers = o.entries
8187
.map { strip(it.key, "@") to it.value }
8288
.toMap()
83-
// val newHeaders = headers.toMutableMap()
84-
// newHeaders["database"] = connection.name()
8589
DbResourceImpl(name, this, DbUri(this.name(), path), headers) as DbResource
86-
}.sequential().collectList().block().asSequence()
90+
}
91+
.collectList()
92+
.block()
93+
.asSequence()
8794
}
8895

8996
override fun metaData(): Map<String, String> {

src/main/kotlin/com/github/mgramin/sqlboot/sql/select/SelectQuery.kt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,6 @@ interface SelectQuery {
4040
*/
4141
fun query(): String
4242

43-
/**
44-
* Select columns
45-
*
46-
* @return Map of column names and column comments
47-
*/
48-
fun columns(): Map<String, String>
49-
// fun columns(): List<Column>
5043

5144
/**
5245
* Execute select query with parameters
@@ -56,6 +49,15 @@ interface SelectQuery {
5649
fun execute(variables: Map<String, Any>): Flux<Map<String, Any>>
5750

5851

52+
/**
53+
* Select columns
54+
*
55+
* @return Map of column names and column comments
56+
*/
57+
fun columns(): Map<String, String>
58+
// fun columns(): List<Column>
59+
60+
5961
class Column(private val name: String, private val comment: String, private val properties: Map<String, String>) {
6062

6163
fun name(): String {

src/main/kotlin/com/github/mgramin/sqlboot/sql/select/wrappers/JdbcSelectQuery.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.springframework.jdbc.core.JdbcTemplate
3131
import reactor.core.publisher.Flux
3232
import reactor.core.publisher.Mono
3333
import reactor.core.publisher.toFlux
34+
import reactor.core.scheduler.Schedulers
3435
import javax.sql.DataSource
3536

3637
/**
@@ -63,6 +64,7 @@ class JdbcSelectQuery(
6364
logger.info(sqlText)
6465

6566
return Mono.fromCallable {
67+
logger.info(Thread.currentThread().toString())
6668
val rowSet = JdbcTemplate(dataSource).queryForRowSet(sqlText)
6769
return@fromCallable object : Iterator<Map<String, Any>> {
6870
override fun hasNext(): Boolean {
@@ -78,11 +80,10 @@ class JdbcSelectQuery(
7880
}
7981
}
8082
}
83+
.publishOn(Schedulers.parallel())
8184
.map { it.toFlux() }
8285
.toFlux()
8386
.flatMap { it }
84-
85-
8687
}
8788

8889
}

0 commit comments

Comments
 (0)