2525package com.github.mgramin.sqlboot.model.resourcetype.impl.sql
2626
2727import com.github.mgramin.sqlboot.model.connection.DbConnection
28- import com.github.mgramin.sqlboot.model.connection.SimpleDbConnection
2928import com.github.mgramin.sqlboot.model.resource.DbResource
3029import com.github.mgramin.sqlboot.model.resource.impl.DbResourceImpl
3130import com.github.mgramin.sqlboot.model.resourcetype.ResourceType
3231import com.github.mgramin.sqlboot.model.uri.Uri
3332import com.github.mgramin.sqlboot.model.uri.impl.DbUri
3433import com.github.mgramin.sqlboot.sql.select.impl.SimpleSelectQuery
3534import com.github.mgramin.sqlboot.sql.select.wrappers.JdbcSelectQuery
36- import com.github.mgramin.sqlboot.sql.select.wrappers.TemplatedSelectQuery
35+ import com.github.mgramin.sqlboot.sql.select.wrappers.OrderedSelectQuery
36+ import com.github.mgramin.sqlboot.sql.select.wrappers.PaginatedSelectQuery
3737import com.github.mgramin.sqlboot.template.generator.impl.GroovyTemplateGenerator
3838import org.apache.commons.lang3.StringUtils.strip
39+ import reactor.core.publisher.Flux
40+ import reactor.core.publisher.Mono
41+ import reactor.core.publisher.toFlux
42+ import reactor.core.scheduler.Schedulers
3943
4044/* *
4145 * Created by MGramin on 12.07.2017.
@@ -59,60 +63,35 @@ class SqlResourceType(
5963 }
6064
6165 override fun read (uri : Uri ): Sequence <DbResource > {
62-
63- /* val map: Sequence<DbResourceImpl> = connections
64- .asSequence()
65- .map {
66- JdbcSelectQuery(
67- TemplatedSelectQuery(simpleSelectQuery, mapOf("uri" to uri), it as SimpleDbConnection),
68- dataSource = it.getDataSource())
69- }
70- .map { it.execute(hashMapOf("uri" to uri)) }
71- .flatMap { it.asSequence() }
72- .map { o ->
73- val path = o.entries
74- .filter { v -> v.key.startsWith("@") }
75- .map { it.value.toString() }
76- val name = path[path.size - 1]
77- val headers = o.entries
78- .map { strip(it.key, "@") to it.value }
79- .toMap()
80-
81- return@map DbResourceImpl(name, this, DbUri(this.name(), path), headers)
82- }.asSequence()
83- return map*/
84-
85- var result: ArrayList <DbResourceImpl > = arrayListOf ()
86-
66+ val result: ArrayList <Mono <Sequence <DbResourceImpl >>> = arrayListOf ()
8767 for (connection in connections) {
88- val selectQuery =
89- JdbcSelectQuery (
90- TemplatedSelectQuery (
91- simpleSelectQuery,
92- variables = mapOf (" uri" to uri),
93- dbConnection = connection as SimpleDbConnection ),
94- dataSource = connection.getDataSource())
9568
96- val map: Sequence <DbResourceImpl > = selectQuery.execute(hashMapOf(" uri" to uri))
97- .map { o ->
98- val path = o.entries
99- .filter { v -> v.key.startsWith(" @" ) }
100- .map { it.value.toString() }
101- val name = path[path.size - 1 ]
102- val headers = o.entries
103- .map { strip(it.key, " @" ) to it.value }
104- .toMap()
105- val newHeaders = headers.toMutableMap()
106- newHeaders[" database" ] = connection.name()
107-
108- DbResourceImpl (name, this , DbUri (this .name(), path), newHeaders)
109- }
110-
111- result.addAll(map.toList())
69+ val fromCallable = Mono .fromCallable {
70+ val map: Sequence <DbResourceImpl > =
71+ createQuery(uri, connection)
72+ .execute(hashMapOf(" uri" to uri))
73+ .map { o ->
74+ val path = o.entries
75+ .filter { v -> v.key.startsWith(" @" ) }
76+ .map { it.value.toString() }
77+ val name = path[path.size - 1 ]
78+ val headers = o.entries
79+ .map { strip(it.key, " @" ) to it.value }
80+ .toMap()
81+ val newHeaders = headers.toMutableMap()
82+ newHeaders[" database" ] = connection.name()
83+ DbResourceImpl (name, this , DbUri (this .name(), path), newHeaders)
84+ }
85+ return @fromCallable map
86+ }.publishOn(Schedulers .parallel())
87+
88+ result.add(fromCallable)
11289 }
11390
91+ val mergeSequential: Flux <Sequence <DbResourceImpl >> = Flux .mergeSequential(result)
92+ val map = mergeSequential.map { it.toFlux() }.flatMap { it }.collectList().block()
11493
115- return result .asSequence()
94+ return map .asSequence()
11695 }
11796
11897 override fun metaData (): Map <String , String > {
@@ -122,4 +101,15 @@ class SqlResourceType(
122101 return newColumns
123102 }
124103
104+ private fun createQuery (uri : Uri , connection : DbConnection ): JdbcSelectQuery {
105+ return JdbcSelectQuery (
106+ PaginatedSelectQuery (
107+ OrderedSelectQuery (
108+ simpleSelectQuery,
109+ uri.orderedColumns()),
110+ uri,
111+ connection.paginationQueryTemplate()),
112+ dataSource = connection.getDataSource())
113+ }
114+
125115}
0 commit comments