44// Created by: Denis Krjuchkov
55// Created: 2009.08.20
66
7- using System ;
87using System . Collections . Generic ;
98using System . Linq ;
109using System . Threading ;
@@ -17,7 +16,7 @@ namespace Xtensive.Orm.Providers
1716 internal sealed class BatchingCommandProcessor : CommandProcessor , ISqlTaskProcessor
1817 {
1918 private readonly int batchSize ;
20- private readonly Queue < SqlTask > tasks ;
19+ private Queue < SqlTask > tasks ;
2120
2221 void ISqlTaskProcessor . ProcessTask ( SqlLoadTask task , CommandProcessorContext context )
2322 {
@@ -61,8 +60,17 @@ public override void ExecuteTasks(CommandProcessorContext context)
6160 _ = ExecuteBatch ( batchSize , null , context ) ;
6261 }
6362
64- while ( ! context . AllowPartialExecution && context . ProcessingTasks . Count > 0 ) {
65- _ = ExecuteBatch ( context . ProcessingTasks . Count , null , context ) ;
63+ var allowPartialExecution = context . AllowPartialExecution ;
64+ while ( context . ProcessingTasks . Count > 0 ) {
65+ if ( allowPartialExecution ) {
66+ //re-register task
67+ RegisterTask ( context . ProcessingTasks . Dequeue ( ) ) ;
68+ }
69+ else {
70+ _ = context . ProcessingTasks . Count > batchSize
71+ ? ExecuteBatch ( batchSize , null , context )
72+ : ExecuteBatch ( context . ProcessingTasks . Count , null , context ) ;
73+ }
6674 }
6775 }
6876
@@ -74,8 +82,17 @@ public override async Task ExecuteTasksAsync(CommandProcessorContext context, Ca
7482 _ = await ExecuteBatchAsync ( batchSize , null , context , token ) . ConfigureAwait ( false ) ;
7583 }
7684
77- while ( ! context . AllowPartialExecution && context . ProcessingTasks . Count > 0 ) {
78- _ = await ExecuteBatchAsync ( context . ProcessingTasks . Count , null , context , token ) . ConfigureAwait ( false ) ;
85+ var allowPartialExecution = context . AllowPartialExecution ;
86+ while ( context . ProcessingTasks . Count > 0 ) {
87+ if ( allowPartialExecution ) {
88+ //re-register task
89+ RegisterTask ( context . ProcessingTasks . Dequeue ( ) ) ;
90+ }
91+ else {
92+ _ = await ( ( context . ProcessingTasks . Count > batchSize )
93+ ? ExecuteBatchAsync ( batchSize , null , context , token )
94+ : ExecuteBatchAsync ( context . ProcessingTasks . Count , null , context , token ) ) ;
95+ }
7996 }
8097 }
8198
@@ -88,8 +105,9 @@ public override IEnumerator<Tuple> ExecuteTasksWithReader(QueryRequest request,
88105 _ = ExecuteBatch ( batchSize , null , context ) ;
89106 }
90107
91- for ( ; ; ) {
92- var result = ExecuteBatch ( context . ProcessingTasks . Count , request , context ) ;
108+ for ( ; ; ) {
109+ var currentBatchSize = ( context . ProcessingTasks . Count > batchSize ) ? batchSize : context . ProcessingTasks . Count ;
110+ var result = ExecuteBatch ( currentBatchSize , request , context ) ;
93111 if ( result != null && context . ProcessingTasks . Count == 0 ) {
94112 return result . AsReaderOf ( request ) ;
95113 }
@@ -106,7 +124,8 @@ public override async Task<IEnumerator<Tuple>> ExecuteTasksWithReaderAsync(Query
106124 }
107125
108126 for ( ; ; ) {
109- var result = await ExecuteBatchAsync ( context . ProcessingTasks . Count , request , context , token ) . ConfigureAwait ( false ) ;
127+ var currentBatchSize = ( context . ProcessingTasks . Count > batchSize ) ? batchSize : context . ProcessingTasks . Count ;
128+ var result = await ExecuteBatchAsync ( currentBatchSize , request , context , token ) . ConfigureAwait ( false ) ;
110129 if ( result != null && context . ProcessingTasks . Count == 0 ) {
111130 return result . AsReaderOf ( request ) ;
112131 }
@@ -117,21 +136,20 @@ public override async Task<IEnumerator<Tuple>> ExecuteTasksWithReaderAsync(Query
117136
118137 private Command ExecuteBatch ( int numberOfTasks , QueryRequest lastRequest , CommandProcessorContext context )
119138 {
120- if ( numberOfTasks == 0 && lastRequest == null ) {
139+ if ( numberOfTasks == 0 && lastRequest == null ) {
121140 return null ;
122141 }
123142
124- var tasksToProcess = context . ProcessingTasks ;
125-
126143 AllocateCommand ( context ) ;
127144
128145 var shouldReturnReader = false ;
146+ var tasksToProcess = context . ProcessingTasks ;
129147 try {
130148 while ( numberOfTasks > 0 && tasksToProcess . Count > 0 ) {
131149 var task = tasksToProcess . Peek ( ) ;
132150 context . CurrentTask = task ;
133151 task . ProcessWith ( this , context ) ;
134- if ( context . CurrentTask == null ) {
152+ if ( context . CurrentTask == null ) {
135153 numberOfTasks -- ;
136154 _ = tasksToProcess . Dequeue ( ) ;
137155 }
@@ -148,15 +166,18 @@ private Command ExecuteBatch(int numberOfTasks, QueryRequest lastRequest, Comman
148166 }
149167 }
150168
151- if ( context . ActiveCommand . Count == 0 ) {
169+ if ( context . ActiveCommand . Count == 0 ) {
152170 return null ;
153171 }
172+
154173 var hasQueryTasks = context . ActiveTasks . Count > 0 ;
174+
155175 if ( ! hasQueryTasks && ! shouldReturnReader ) {
156176 _ = context . ActiveCommand . ExecuteNonQuery ( ) ;
157177 return null ;
158178 }
159179 context . ActiveCommand . ExecuteReader ( ) ;
180+
160181 if ( hasQueryTasks ) {
161182 var currentQueryTask = 0 ;
162183 while ( currentQueryTask < context . ActiveTasks . Count ) {
@@ -182,15 +203,14 @@ private Command ExecuteBatch(int numberOfTasks, QueryRequest lastRequest, Comman
182203
183204 private async Task < Command > ExecuteBatchAsync ( int numberOfTasks , QueryRequest lastRequest , CommandProcessorContext context , CancellationToken token )
184205 {
185- if ( numberOfTasks == 0 && lastRequest == null ) {
206+ if ( numberOfTasks == 0 && lastRequest == null ) {
186207 return null ;
187208 }
188209
189- var tasksToProcess = context . ProcessingTasks ;
190-
191210 AllocateCommand ( context ) ;
192211
193212 var shouldReturnReader = false ;
213+ var tasksToProcess = context . ProcessingTasks ;
194214 try {
195215 while ( numberOfTasks > 0 && tasksToProcess . Count > 0 ) {
196216 var task = tasksToProcess . Peek ( ) ;
@@ -212,9 +232,10 @@ private async Task<Command> ExecuteBatchAsync(int numberOfTasks, QueryRequest la
212232 }
213233 }
214234
215- if ( context . ActiveCommand . Count == 0 ) {
235+ if ( context . ActiveCommand . Count == 0 ) {
216236 return null ;
217237 }
238+
218239 var hasQueryTasks = context . ActiveTasks . Count > 0 ;
219240 if ( ! hasQueryTasks && ! shouldReturnReader ) {
220241 _ = await context . ActiveCommand . ExecuteNonQueryAsync ( token ) . ConfigureAwait ( false ) ;
@@ -259,7 +280,7 @@ private void ExecuteUnbatchedTask(SqlPersistTask task)
259280 var sequence = Factory . CreatePersistParts ( task ) ;
260281 foreach ( var part in sequence ) {
261282 using ( var command = Factory . CreateCommand ( ) ) {
262- ValidateCommandParameters ( part ) ;
283+ ValidateCommandPartParameters ( part ) ;
263284 command . AddPart ( part ) ;
264285 var affectedRowsCount = command . ExecuteNonQuery ( ) ;
265286 if ( affectedRowsCount == 0 ) {
@@ -273,19 +294,15 @@ private void ExecuteUnbatchedTask(SqlPersistTask task)
273294 private void PutTasksForExecution ( CommandProcessorContext context )
274295 {
275296 if ( context . AllowPartialExecution ) {
276- context . ProcessingTasks = new Queue < SqlTask > ( ) ;
277- var batchesCount = ( int ) tasks . Count / batchSize ;
278- if ( batchesCount == 0 ) {
279- return ;
280- }
281- context . ProcessingTasks = new Queue < SqlTask > ( ) ;
282- while ( context . ProcessingTasks . Count < batchesCount * batchSize ) {
297+ var processingTasksCount = tasks . Count / batchSize * batchSize ;
298+ context . ProcessingTasks = new Queue < SqlTask > ( processingTasksCount ) ;
299+ while ( context . ProcessingTasks . Count < processingTasksCount ) {
283300 context . ProcessingTasks . Enqueue ( tasks . Dequeue ( ) ) ;
284301 }
285302 }
286303 else {
287- context . ProcessingTasks = new Queue < SqlTask > ( tasks ) ;
288- tasks . Clear ( ) ;
304+ context . ProcessingTasks = tasks ;
305+ tasks = new Queue < SqlTask > ( batchSize ) ;
289306 }
290307 }
291308
@@ -304,13 +321,6 @@ private bool PendCommandParts(Command currentCommand, ICollection<CommandPart> p
304321 : true ;
305322 }
306323
307- private void ValidateCommandParameters ( CommandPart commandPart )
308- {
309- if ( GetCommandExecutionBehavior ( new [ ] { commandPart } , 0 ) == ExecutionBehavior . TooLargeForAnyCommand ) {
310- throw new ParametersLimitExceededException ( commandPart . Parameters . Count , MaxQueryParameterCount ) ;
311- }
312- }
313-
314324 private static string GetParameterPrefix ( CommandProcessorContext context ) =>
315325 string . Format ( "p{0}_" , context . ActiveCommand . Count + 1 ) ;
316326
0 commit comments