-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathProcessorAdapter.java
More file actions
511 lines (466 loc) · 20.3 KB
/
ProcessorAdapter.java
File metadata and controls
511 lines (466 loc) · 20.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
/*
* Copyright (C) 2012 Brockmann Consult GmbH (info@brockmann-consult.de)
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 3 of the License, or (at your option)
* any later version.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, see http://www.gnu.org/licenses/
*/
package com.bc.calvalus.processing;
import com.bc.calvalus.commons.CalvalusLogger;
import com.bc.calvalus.processing.beam.CalvalusProductIO;
import com.bc.calvalus.processing.beam.GpfUtils;
import com.bc.calvalus.processing.hadoop.ParameterizedSplit;
import com.bc.calvalus.processing.hadoop.ProductSplit;
import com.bc.calvalus.processing.utils.GeometryUtils;
import com.bc.ceres.core.ProgressMonitor;
import com.vividsolutions.jts.geom.Geometry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.esa.snap.core.dataio.ProductIO;
import org.esa.snap.core.datamodel.Product;
import org.esa.snap.core.datamodel.ProductData;
import org.esa.snap.runtime.Engine;
import java.awt.Rectangle;
import java.awt.geom.AffineTransform;
import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.util.logging.Logger;
/**
* Adapts different processors ( SNAP GPF, Shell executable, ...) to Calvalus Map-Reduce processing.
* Usage, simple version:
* <pre>
* ProcessorAdapter processorAdapter = ProcessorFactory.create(context);
* try {
* Product target = processorAdapter.getProcessedProduct();
* ....
* } finally {
* processorAdapter.dispose();
* }
* </pre>
* <p/>
* If more control is required, further adjust the processed region, ...):
* <pre>
* ProcessorAdapter processorAdapter = ProcessorFactory.create(context);
* try {
* // use points from reference data set to restrict roi even further
* Product inputProduct = processorAdapter.getInputProduct();
* Geometry roi; // from config
* Geometry referenceDataRoi; // from reference data
* roi = roi.intersection(referenceDataRoi);
*
* Rectangle srcProductRect = processorAdapter.computeIntersection(roi);
* if (!srcProductRect.isEmpty()) {
* processorAdapter.processSourceProduct(srcProductRect);
*
* // depending on the requirements:
* // save the result to HDFS
* processorAdapter.saveProcessedProducts();
*
* // or work with the resulting product
* Product processedProduct = processorAdapter.openProcessedProduct();
* }
* } finally {
* processorAdapter.dispose();
* }
* </pre>
*
* @author MarcoZ
*/
public abstract class ProcessorAdapter {
public enum MODE {TARGET, EXECUTE}
private static final Logger LOG = CalvalusLogger.getLogger();
public static final String[] EMPTY_PARAMETERS = new String[0];
private final MapContext mapContext;
private final Configuration conf;
private final InputSplit inputSplit;
private Product inputProduct;
private Rectangle inputRectangle;
private Rectangle roiRectangle;
private File inputFile;
private AffineTransform input2OutputTransform;
public ProcessorAdapter(MapContext mapContext) {
this.mapContext = mapContext;
this.inputSplit = mapContext.getInputSplit();
this.conf = mapContext.getConfiguration();
if (conf.getBoolean("calvalus.snap.setSnapProperties", true)) {
String cwd = new File(".").getAbsolutePath();
System.setProperty("snap.userdir", cwd);
System.setProperty("snap.home", cwd);
System.setProperty("snap.pythonModuleDir", cwd);
LOG.info("Set 'snap.userdir', 'snap.home', 'snap.pythonModuleDir' to CWD: " + cwd);
}
GpfUtils.init(conf);
Engine.start();
CalvalusLogger.restoreCalvalusLogFormatter();
}
protected MapContext getMapContext() {
return mapContext;
}
protected Configuration getConfiguration() {
return conf;
}
protected Logger getLogger() {
return LOG;
}
/**
* Prepares the processing.
* The default implementation does nothing except creating a shallow copy tree with symlinks to files of patch packages.
*/
public void prepareProcessing() throws IOException {
shallowCopyPatches(new File(".").getAbsolutePath());
}
protected static void shallowCopyPatches(String wd) throws IOException {
java.nio.file.Path dir = Paths.get(wd);
try (DirectoryStream<java.nio.file.Path> directoryStream =
Files.newDirectoryStream(dir,
new DirectoryStream.Filter<java.nio.file.Path>() {
@Override
public boolean accept(java.nio.file.Path entry) throws IOException {
return entry.getFileName().toString().endsWith("-patch") && Files.isSymbolicLink(entry);
}
})) {
for (java.nio.file.Path srcChild : directoryStream) {
String srcName = srcChild.getFileName().toString();
String destName = srcName.substring(0, srcName.length() - "-patch".length());
java.nio.file.Path destChild = dir.resolve(destName);
try {
Files.createDirectory(destChild);
} catch (FileAlreadyExistsException ignore) {}
shallowCopyRecursive(srcChild, destChild);
}
}
}
private static void shallowCopyRecursive(java.nio.file.Path src, java.nio.file.Path dest) throws IOException {
try (DirectoryStream<java.nio.file.Path> directoryStream = Files.newDirectoryStream(src)) {
for (java.nio.file.Path srcChild : directoryStream) {
String name = srcChild.getFileName().toString();
java.nio.file.Path destChild = dest.resolve(name);
if (Files.isDirectory(srcChild, LinkOption.NOFOLLOW_LINKS)) {
try {
Files.createDirectory(destChild);
} catch (FileAlreadyExistsException ignore) {}
shallowCopyRecursive(srcChild, destChild);
} else {
try {
Files.createSymbolicLink(destChild, srcChild);
} catch (FileAlreadyExistsException ignore) {}
}
}
}
}
/**
* Returns whether the adapter can skip processing the current input product.
* The adapters answer should be based on information about the input
* product and whether the corresponding output product already exists.
* <p/>
* Before this method is called prepare processing has to be invoked!
* <p/>
* This should enable fast (re-)processing of missing products.
*
* @return {@code true}, if the input product should not be processed.
*/
public boolean canSkipInputProduct() throws IOException {
return false;
}
/**
* Reads and processed a product from the given input split:
* <ul>
* <li>read the products</li>
* <li>creates a subset taking the geometries and processing lines into account (optional)</li>
* <li>performs a "L2" operation (optional)</li>
* </ul>
* the resulting products are contained in the adapter and can be opened using {@code #openProcessedProduct}.
* <p/>
* Before this method is called prepare processing has to be invoked!
* <p/>
*
* @param pm A progress monitor
* @return False, if the product has not be processed
* @throws java.io.IOException If an I/O error occurs
*/
public abstract boolean processSourceProduct(MODE mode, ProgressMonitor pm) throws IOException;
/**
* Returns the product resulting from the processing.
* Before this method can be called the {@code #processSourceProduct} method must be called.
* <p/>
* TODO use index to get all processed products
*
* @return The processed product
*/
public abstract Product openProcessedProduct() throws IOException;
/**
* Saves the processed products onto HDFS.
*
* @param pm A progress monitor
* @throws java.io.IOException If an I/O error occurs
*/
public abstract void saveProcessedProducts(ProgressMonitor pm) throws IOException;
/**
* Return the output path to the processed product.
* Can return {@code null} if no output product exist (yet).
*
* @return The output path of the output product.
*/
public abstract Path getOutputProductPath() throws IOException;
protected Path getOutputDirectoryPath() throws IOException {
Path outputPath = FileOutputFormat.getOutputPath(getMapContext());
return appendDatePart(outputPath);
}
protected Path getWorkOutputDirectoryPath() throws IOException {
try {
Path workOutputPath = FileOutputFormat.getWorkOutputPath(getMapContext());
return appendDatePart(workOutputPath);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
Path appendDatePart(Path path) {
if (getConfiguration().getBoolean(JobConfigNames.CALVALUS_OUTPUT_PRESERVE_DATE_TREE, false)) {
String datePart = getDatePart(getInputPaths()[0]);
if (datePart != null) {
path = new Path(path, datePart);
}
}
return path;
}
/**
* @param inputProductPath the path to the input product
* @return the "year/month/day" part of the inputProductPath,
* returns {@null}, if the input product path contains no date part
*/
static String getDatePart(Path inputProductPath) {
Path day = inputProductPath.getParent();
if (day != null && !day.getName().isEmpty()) {
Path month = day.getParent();
if (month != null && !month.getName().isEmpty()) {
Path year = month.getParent();
if (year != null && !year.getName().isEmpty()) {
return year.getName() + "/" + month.getName() + "/" + day.getName();
}
}
}
return null;
}
/**
* Return {code true}, if the processor adapter supports on-demand processing of distinct regions.
*
* @return {code true}, if pull processing is supported.
*/
public abstract boolean supportsPullProcessing();
/**
* Convenient method that returns the processed product and does all the necessary steps.
*
* @param pm A progress monitor
* @return The processed product
*/
public Product getProcessedProduct(ProgressMonitor pm) throws IOException {
Product processedProduct = openProcessedProduct();
if (processedProduct == null) {
Rectangle sourceRectangle = getInputRectangle();
if (sourceRectangle == null || !sourceRectangle.isEmpty()) {
prepareProcessing();
if (processSourceProduct(MODE.TARGET, pm)) {
processedProduct = openProcessedProduct();
}
}
}
return processedProduct;
}
/**
* Sets an additional rectangle in pixel coordinates that will be intersected with
* the geometry from the configuration (if any)
* to define the region to be processed.
*
* @param roiRectangle the additional ROI rectangle
*/
public void setProcessingRectangle(Rectangle roiRectangle) {
this.roiRectangle = roiRectangle;
inputRectangle = null;
}
/**
* Return the path to the input product.
*
* @return The path of the input product.
*/
public Path[] getInputPaths() {
if (inputSplit instanceof ProductSplit) {
ProductSplit productSplit = (ProductSplit) inputSplit;
return new Path[] {productSplit.getPath()};
} else if (inputSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) inputSplit;
return new Path[] {fileSplit.getPath()};
} else if (inputSplit instanceof CombineFileSplit) {
CombineFileSplit fileSplit = (CombineFileSplit) inputSplit;
return fileSplit.getPaths();
} else {
throw new IllegalArgumentException("input split is neither a FileSplit nor a ProductSplit");
}
}
/**
* Returns parameters from TableInputFormat/ParameterizedSplit, or empty array for other input formats.
* The return value is encoded as key1, value1, key2, value2 ... in a string array
*/
public String[] getInputParameters() {
if (inputSplit instanceof ParameterizedSplit) {
return ((ParameterizedSplit) inputSplit).getParameters();
} else {
return EMPTY_PARAMETERS;
}
}
/**
* Return the region of the input product that is processed (in pixel coordinates).
* This takes the geometries given in the configuration the additionalGeometry and processing lines into account.
*
* @return The input region to process, or {@code null} if no restriction is given.
*/
public Rectangle getInputRectangle() throws IOException {
// Unclear why we had introduced this in November 2018.
// Removed January 2019 because it is harmful for graphs that do resamping and output subsetting
// because they will process all inputs including the non-overlapping ones.
//boolean processCompleteInputFile = !getMapContext().getConfiguration().getBoolean(JobConfigNames.CALVALUS_INPUT_SUBSETTING, true);
//if (processCompleteInputFile) {
// return null;
//}
if (inputRectangle == null) {
boolean fullSwath = getConfiguration().getBoolean(JobConfigNames.CALVALUS_INPUT_FULL_SWATH, false);
String geometryWkt = getConfiguration().get(JobConfigNames.CALVALUS_REGION_GEOMETRY);
// check table for table input format
for (int i=0; i<getInputParameters().length; i += 2) {
if ("regionGeometry".equals(getInputParameters()[i])) {
geometryWkt = getInputParameters()[i+1];
break;
}
}
Geometry regionGeometry = GeometryUtils.createGeometry(geometryWkt);
LOG.info("getInputRectangle: for geometryWkt = " + geometryWkt);
ProcessingRectangleCalculator calculator = new ProcessingRectangleCalculator(regionGeometry,
roiRectangle,
inputSplit,
fullSwath) {
@Override
Product getProduct() throws IOException {
return getInputProduct();
}
};
try {
inputRectangle = calculator.computeRect();
} catch (IOException _) {}
LOG.info("getInputRectangle: calculated inputRectangle = " + inputRectangle);
}
return inputRectangle;
}
public AffineTransform getInput2OutputTransform() {
return input2OutputTransform;
}
public void setInput2OutputTransform(AffineTransform input2OutputTransform) {
this.input2OutputTransform = input2OutputTransform;
}
/**
* Return the input product.
*
* @return The input product
* @throws java.io.IOException If an I/O error occurs
*/
public Product getInputProduct() throws IOException {
if (inputProduct == null) {
inputProduct = openInputProduct();
}
return inputProduct;
}
private Product openInputProduct() throws IOException {
Configuration conf = getConfiguration();
String inputFormat = conf.get(JobConfigNames.CALVALUS_INPUT_FORMAT, null);
if (inputFile != null) {
Product product;
getMapContext().getCounter("Direct File System Counters", "INPUT_FILE_BYTES_READ").setValue(inputFile.length());
if (inputFormat != null) {
LOG.info(String.format("openInputProduct: inputFile = %s inputFormat = %s", inputFile, inputFormat));
product = ProductIO.readProduct(inputFile, inputFormat);
} else {
LOG.info(String.format("openInputProduct: inputFile = %s", inputFile));
product = ProductIO.readProduct(inputFile);
}
CalvalusProductIO.printProductOnStdout(product, "opened from local file");
return product;
} else {
LOG.info(String.format("openInputProduct: inputPath = %s inputFormat = %s", getInputPaths()[0], inputFormat));
Product product = CalvalusProductIO.readProduct(getInputPaths()[0], getConfiguration(), inputFormat);
if (inputSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) inputSplit;
getMapContext().getCounter("Direct File System Counters", "FILE_SPLIT_BYTES_READ").setValue(fileSplit.getLength());
}
File fileLocation = product.getFileLocation();
LOG.info(String.format("openInputProduct: fileLocation = %s", fileLocation));
if (fileLocation != null) {
setInputFile(fileLocation);
}
return product;
}
}
public void setInputFile(File inputFile) {
this.inputFile = inputFile;
}
public File getInputFile() {
return inputFile;
}
/**
* Disposes the resources allocated by this processor adapter.
* All products opened or processed by this adapter are disposed as well.
*/
public void dispose() {
closeInputProduct();
}
public void closeInputProduct() {
if (inputProduct != null) {
inputProduct.dispose();
inputProduct = null;
}
}
public static boolean hasInvalidStartAndStopTime(Product product) {
ProductData.UTC startTime = product.getStartTime();
ProductData.UTC endTime = product.getEndTime();
if (startTime == null || endTime == null) {
return true;
}
return endTime.getMJD() == 0.0 || startTime.getMJD() == 0.0;
}
public static void copySceneRasterStartAndStopTime(Product sourceProduct, Product targetProduct,
Rectangle inputRectangle) {
final ProductData.UTC startTime = sourceProduct.getStartTime();
final ProductData.UTC stopTime = sourceProduct.getEndTime();
boolean fullHeight = sourceProduct.getSceneRasterHeight() == targetProduct.getSceneRasterHeight();
if (startTime != null && stopTime != null && !fullHeight && inputRectangle != null) {
final double height = sourceProduct.getSceneRasterHeight();
final double regionY = inputRectangle.getY();
final double regionHeight = inputRectangle.getHeight();
final double dStart = startTime.getMJD();
final double dStop = stopTime.getMJD();
final double vPerLine = (dStop - dStart) / (height - 1);
final double newStart = vPerLine * regionY + dStart;
final double newStop = vPerLine * (regionHeight - 1) + newStart;
targetProduct.setStartTime(new ProductData.UTC(newStart));
targetProduct.setEndTime(new ProductData.UTC(newStop));
} else {
targetProduct.setStartTime(startTime);
targetProduct.setEndTime(stopTime);
}
}
}