Skip to content

Commit bf66741

Browse files
committed
parallelism: streamline builder interfaces
1 parent f1e6ee8 commit bf66741

19 files changed

Lines changed: 364 additions & 144 deletions

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
99
### Added
1010

1111
* LearnLib now supports JPMS modules. All artifacts now provide a `module-info` descriptor except of the distribution artifacts (for Maven-less environments) which only provide an `Automatic-Module-Name` due to non-modular dependencies. Note that while this is a Java 9+ feature, LearnLib still supports Java 8 byte code for the remaining class files.
12-
* Added an `InterningMembershipOracle` (including refinements) to the `learnlib-cache` artifact that interns query responses to reduce memory consumption of large data structures. This exports the internal concepts of the DHC learner (which no longer interns query responses automatically).
1312
* The `ADTLearner` has been refactored to no longer use the (now-removed) `SymbolQueryOracle` but a new `AdaptiveMembershipOracle` instead which supports answering queries in parallel (thanks to [Leon Vitorovic](https://github.com/leonthalee)).
13+
* Added an `InterningMembershipOracle` (including refinements) to the `learnlib-cache` artifact that interns query responses to reduce memory consumption of large data structures. This exports the internal concepts of the DHC learner (which no longer interns query responses automatically).
14+
* `StaticParallelOracleBuilder` now supports custom executor services.
1415

1516
### Changed
1617

filters/cache/src/test/java/de/learnlib/filter/cache/CacheConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import de.learnlib.filter.cache.LearningCacheOracle.MooreLearningCacheOracle;
3333
import de.learnlib.oracle.MembershipOracle;
3434
import de.learnlib.oracle.ParallelOracle;
35-
import de.learnlib.oracle.parallelism.AbstractStaticBatchProcessor;
35+
import de.learnlib.oracle.parallelism.BatchProcessorDefaults;
3636
import de.learnlib.oracle.parallelism.ParallelOracleBuilders;
3737
import de.learnlib.sul.SUL;
3838
import de.learnlib.sul.StateLocalInputSUL;
@@ -188,9 +188,9 @@ public CacheConfig<I, D, C> apply(Alphabet<I> alphabet, M oracle) {
188188
final List<C> oracles;
189189

190190
{
191-
List<M> suls = new ArrayList<>(AbstractStaticBatchProcessor.NUM_INSTANCES);
191+
List<M> suls = new ArrayList<>(BatchProcessorDefaults.POOL_SIZE);
192192

193-
for (int i = 0; i < AbstractStaticBatchProcessor.NUM_INSTANCES; i++) {
193+
for (int i = 0; i < BatchProcessorDefaults.POOL_SIZE; i++) {
194194
suls.add(oracle);
195195
}
196196
this.oracles = new ArrayList<>(provider.apply(alphabet, suls));

oracles/parallelism/src/main/java/de/learnlib/oracle/parallelism/AbstractDynamicBatchProcessor.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@
2626
import de.learnlib.exception.BatchInterruptedException;
2727
import de.learnlib.oracle.BatchProcessor;
2828
import de.learnlib.oracle.ThreadPool;
29-
import de.learnlib.setting.LearnLibProperty;
30-
import de.learnlib.setting.LearnLibSettings;
3129
import net.automatalib.common.util.exception.ExceptionUtil;
3230
import org.checkerframework.checker.index.qual.NonNegative;
3331

3432
/**
3533
* A batch processor that dynamically distributes queries to worker threads.
34+
* <p>
35+
* An incoming set of queries is split into batches of the given size. The number of batches may exceed the available
36+
* threads so that they are dynamically scheduled once a job finishes.
3637
*
3738
* @param <Q>
3839
* query type
@@ -42,20 +43,6 @@
4243
public abstract class AbstractDynamicBatchProcessor<Q, P extends BatchProcessor<Q>>
4344
implements ThreadPool, BatchProcessor<Q> {
4445

45-
public static final int BATCH_SIZE;
46-
public static final int POOL_SIZE;
47-
public static final PoolPolicy POOL_POLICY;
48-
49-
static {
50-
LearnLibSettings settings = LearnLibSettings.getInstance();
51-
52-
int numProcessors = Runtime.getRuntime().availableProcessors();
53-
54-
BATCH_SIZE = settings.getInt(LearnLibProperty.PARALLEL_BATCH_SIZE_DYNAMIC, 1);
55-
POOL_SIZE = settings.getInt(LearnLibProperty.PARALLEL_POOL_SIZE, numProcessors);
56-
POOL_POLICY = settings.getEnumValue(LearnLibProperty.PARALLEL_POOL_POLICY, PoolPolicy.class, PoolPolicy.CACHED);
57-
}
58-
5946
private final ThreadLocal<P> threadLocalOracle;
6047
private final ExecutorService executor;
6148
private final @NonNegative int batchSize;

oracles/parallelism/src/main/java/de/learnlib/oracle/parallelism/AbstractDynamicBatchProcessorBuilder.java

Lines changed: 77 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package de.learnlib.oracle.parallelism;
1717

18+
import java.util.Arrays;
1819
import java.util.Collection;
1920
import java.util.concurrent.ExecutorService;
2021
import java.util.concurrent.Executors;
@@ -25,6 +26,7 @@
2526

2627
import de.learnlib.oracle.BatchProcessor;
2728
import de.learnlib.oracle.ThreadPool.PoolPolicy;
29+
import net.automatalib.common.util.array.ArrayStorage;
2830
import net.automatalib.common.util.concurrent.ScalingThreadPoolExecutor;
2931
import org.checkerframework.checker.index.qual.NonNegative;
3032
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -45,77 +47,113 @@ public abstract class AbstractDynamicBatchProcessorBuilder<Q, P extends BatchPro
4547

4648
private final @Nullable Supplier<? extends P> oracleSupplier;
4749
private final @Nullable Collection<? extends P> oracles;
50+
4851
private ExecutorService customExecutor;
49-
private @NonNegative int batchSize = AbstractDynamicBatchProcessor.BATCH_SIZE;
50-
private @NonNegative int poolSize = AbstractDynamicBatchProcessor.POOL_SIZE;
51-
private PoolPolicy poolPolicy = AbstractDynamicBatchProcessor.POOL_POLICY;
52+
private @NonNegative int batchSize = BatchProcessorDefaults.BATCH_SIZE;
53+
private @NonNegative int poolSize = BatchProcessorDefaults.POOL_SIZE;
54+
private PoolPolicy poolPolicy = BatchProcessorDefaults.POOL_POLICY;
5255

5356
public AbstractDynamicBatchProcessorBuilder(Supplier<? extends P> oracleSupplier) {
5457
this.oracleSupplier = oracleSupplier;
5558
this.oracles = null;
5659
}
5760

5861
public AbstractDynamicBatchProcessorBuilder(Collection<? extends P> oracles) {
59-
this(validateInputs(oracles), oracles);
60-
}
61-
62-
// utility constructor to prevent finalizer attacks, see SEI CERT Rule OBJ-11
63-
@SuppressWarnings("PMD.UnusedFormalParameter")
64-
private AbstractDynamicBatchProcessorBuilder(boolean valid, Collection<? extends P> oracles) {
6562
this.oracles = oracles;
6663
this.oracleSupplier = null;
6764
}
6865

69-
private static boolean validateInputs(Collection<?> oracles) {
70-
if (oracles.isEmpty()) {
71-
throw new IllegalArgumentException("No oracles specified");
72-
}
73-
return true;
66+
/**
67+
* Sets the size of batches that are submitted.
68+
*
69+
* @param batchSize
70+
* the minimal size of batches
71+
*
72+
* @return {@code this}
73+
*/
74+
public AbstractDynamicBatchProcessorBuilder<Q, P, OR> withBatchSize(int batchSize) {
75+
this.batchSize = batchSize;
76+
return this;
7477
}
7578

79+
/**
80+
* Sets the executor service to use for submitting batches. Note that if the builder is initialized with a
81+
* collection of processors, an exception may be thrown if the thread pool tries to spawn more threads than oracles
82+
* are available.
83+
*
84+
* @param executor
85+
* the executor to use
86+
*
87+
* @return {@code this}
88+
*/
7689
public AbstractDynamicBatchProcessorBuilder<Q, P, OR> withCustomExecutor(ExecutorService executor) {
7790
this.customExecutor = executor;
7891
return this;
7992
}
8093

81-
public AbstractDynamicBatchProcessorBuilder<Q, P, OR> withBatchSize(int batchSize) {
82-
this.batchSize = batchSize;
94+
/**
95+
* Sets the pool policy in case the builder creates its own executor for processing batches.
96+
*
97+
* @param policy
98+
* the policy
99+
*
100+
* @return {@code this}
101+
*/
102+
public AbstractDynamicBatchProcessorBuilder<Q, P, OR> withPoolPolicy(PoolPolicy policy) {
103+
this.poolPolicy = policy;
83104
return this;
84105
}
85106

107+
/**
108+
* Sets the number of instances that should process batches. Note that this value is ignored if the builder has been
109+
* initialized with a collection of processors in order to guarantee that no unavailable resources are accessed.
110+
*
111+
* @param poolSize
112+
* the number of instances to delegate batches to
113+
*
114+
* @return {@code this}
115+
*/
86116
public AbstractDynamicBatchProcessorBuilder<Q, P, OR> withPoolSize(@NonNegative int poolSize) {
87117
this.poolSize = poolSize;
88118
return this;
89119
}
90120

91-
public AbstractDynamicBatchProcessorBuilder<Q, P, OR> withPoolPolicy(PoolPolicy policy) {
92-
this.poolPolicy = policy;
93-
return this;
94-
}
95-
121+
/**
122+
* Create the batch processor.
123+
*
124+
* @return the batch processor
125+
*/
96126
public OR create() {
97-
98127
final Supplier<? extends P> supplier;
99-
final ExecutorService executor;
128+
final int size;
129+
130+
if (oracleSupplier == null) {
131+
if (oracles == null || oracles.isEmpty()) {
132+
throw new IllegalArgumentException("No oracles specified");
133+
}
100134

101-
if (oracles != null) {
102-
executor = Executors.newFixedThreadPool(oracles.size());
135+
size = oracles.size();
103136
supplier = new StaticOracleProvider<>(oracles);
104-
} else if (customExecutor != null) {
105-
executor = customExecutor;
137+
} else {
138+
size = poolSize;
106139
supplier = oracleSupplier;
140+
}
141+
142+
final ExecutorService executor;
143+
144+
if (customExecutor != null) {
145+
executor = customExecutor;
107146
} else {
108147
switch (poolPolicy) {
109148
case FIXED:
110-
executor = Executors.newFixedThreadPool(poolSize);
149+
executor = Executors.newFixedThreadPool(size);
111150
break;
112151
case CACHED:
113-
executor = new ScalingThreadPoolExecutor(0, poolSize, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS);
152+
executor = new ScalingThreadPoolExecutor(0, size, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS);
114153
break;
115154
default:
116155
throw new IllegalStateException("Unknown pool policy: " + poolPolicy);
117156
}
118-
supplier = oracleSupplier;
119157
}
120158

121159
return buildOracle(supplier, batchSize, executor);
@@ -125,33 +163,33 @@ public OR create() {
125163

126164
static class StaticOracleProvider<P extends BatchProcessor<?>> implements Supplier<P> {
127165

128-
private final P[] oracles;
166+
private final ArrayStorage<P> oracles;
129167
private final Lock lock;
130168
private int idx;
131169

132-
@SuppressWarnings("unchecked")
133-
StaticOracleProvider(Collection<? extends P> oracles) {
134-
this(oracles.toArray((P[]) new BatchProcessor[oracles.size()]));
170+
StaticOracleProvider(P[] oracles) {
171+
this(Arrays.asList(oracles));
135172
}
136173

137-
StaticOracleProvider(P[] oracles) {
138-
this.oracles = oracles;
174+
StaticOracleProvider(Collection<? extends P> oracles) {
175+
this.oracles = new ArrayStorage<>(oracles);
139176
this.lock = new ReentrantLock();
140177
}
141178

142179
@Override
143180
public P get() {
144181
try {
145182
lock.lock();
146-
if (idx < oracles.length) {
147-
return oracles[idx++];
183+
if (idx < oracles.size()) {
184+
return oracles.get(idx++);
148185
}
149186
} finally {
150187
lock.unlock();
151188
}
152189

153190
throw new IllegalStateException(
154-
"The supplier should not have been called more than " + oracles.length + " times");
191+
"The executor service tried to spawn more threads than there are oracles available (" +
192+
oracles.size() + ')');
155193
}
156194
}
157195
}

oracles/parallelism/src/main/java/de/learnlib/oracle/parallelism/AbstractStaticBatchProcessor.java

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,12 @@
2121
import java.util.List;
2222
import java.util.concurrent.ExecutionException;
2323
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.Executors;
2524
import java.util.concurrent.Future;
2625

2726
import de.learnlib.exception.BatchInterruptedException;
2827
import de.learnlib.oracle.BatchProcessor;
2928
import de.learnlib.oracle.ThreadPool;
30-
import de.learnlib.setting.LearnLibProperty;
31-
import de.learnlib.setting.LearnLibSettings;
32-
import net.automatalib.common.util.array.ArrayStorage;
29+
import net.automatalib.common.util.collection.CollectionUtil;
3330
import net.automatalib.common.util.exception.ExceptionUtil;
3431
import org.checkerframework.checker.index.qual.NonNegative;
3532

@@ -48,37 +45,16 @@
4845
public abstract class AbstractStaticBatchProcessor<Q, P extends BatchProcessor<Q>>
4946
implements ThreadPool, BatchProcessor<Q> {
5047

51-
private static final int DEFAULT_MIN_BATCH_SIZE = 10;
52-
public static final int MIN_BATCH_SIZE;
53-
public static final int NUM_INSTANCES;
54-
public static final PoolPolicy POOL_POLICY;
55-
56-
static {
57-
LearnLibSettings settings = LearnLibSettings.getInstance();
58-
59-
int numCores = Runtime.getRuntime().availableProcessors();
60-
61-
MIN_BATCH_SIZE = settings.getInt(LearnLibProperty.PARALLEL_BATCH_SIZE_STATIC, DEFAULT_MIN_BATCH_SIZE);
62-
NUM_INSTANCES = settings.getInt(LearnLibProperty.PARALLEL_POOL_SIZE, numCores);
63-
POOL_POLICY = settings.getEnumValue(LearnLibProperty.PARALLEL_POOL_POLICY, PoolPolicy.class, PoolPolicy.CACHED);
64-
}
65-
6648
private final @NonNegative int minBatchSize;
67-
private final ArrayStorage<P> oracles;
49+
private final List<? extends P> oracles;
6850
private final ExecutorService executor;
6951

7052
public AbstractStaticBatchProcessor(Collection<? extends P> oracles,
7153
@NonNegative int minBatchSize,
72-
PoolPolicy policy) {
73-
74-
this.oracles = new ArrayStorage<>(oracles);
75-
76-
if (policy == PoolPolicy.FIXED) {
77-
this.executor = Executors.newFixedThreadPool(this.oracles.size() - 1);
78-
} else { // default is cached
79-
this.executor = Executors.newCachedThreadPool();
80-
}
54+
ExecutorService executor) {
55+
this.oracles = CollectionUtil.randomAccessList(oracles);
8156
this.minBatchSize = minBatchSize;
57+
this.executor = executor;
8258
}
8359

8460
@Override

0 commit comments

Comments
 (0)