-
Notifications
You must be signed in to change notification settings - Fork 21
Expand file tree
/
Copy pathDataPassService.js
More file actions
295 lines (268 loc) · 12.5 KB
/
DataPassService.js
File metadata and controls
295 lines (268 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/
const { Op } = require('sequelize');
const { repositories: { DataPassRepository, LhcPeriodRepository } } = require('../../../database');
const { dataSource } = require('../../../database/DataSource.js');
const { dataPassAdapter } = require('../../../database/adapters');
const { DataPassVersionRepository, DataPassRunRepository } = require('../../../database/repositories');
const { getOneDataPass } = require('./getOneDataPass.js');
const { getOneDataPassOrFail } = require('./getOneDataPassOrFail.js');
const { PdpBeamType } = require('../../../domain/enums/PdpBeamType');
const { BadParameterError } = require('../../errors/BadParameterError');
const { SkimmingStage } = require('../../../domain/enums/SkimmingStage');
const { LogManager } = require('@aliceo2/web-ui');
const { NonPhysicsProductionsNamesWords } = require('../../../domain/enums/NonPhysicsProductionsNamesWords.js');
/**
* @typedef DataPassIdentifier
* @property {number} [id]
* @property {string} [name]
*/
/**
* Data Pass Service
*/
class DataPassService {
/**
* Constructor
*/
constructor() {
this._logger = LogManager.getLogger('DATA_PASS_SERVICE');
}
/**
* Return one data pass
* @param {DataPassIdentifier} identifier identifier of data pass
* @return {DataPass} data pass
*/
async getByIdentifier(identifier) {
const dataPass = await getOneDataPass(identifier, this.prepareQueryBuilder);
return dataPass ? dataPassAdapter.toEntity(dataPass) : null;
}
/**
* Find a Data Pass model by its name or id
* @param {DataPassIdentifier} identifier the criteria to find data pass
* @throws {NotFoundError} in case there is no Data Pass with given identifier
* @return {Promise<DataPass>} the data pass found
*/
async getOneOrFail(identifier) {
return dataPassAdapter.toEntity(await getOneDataPassOrFail(identifier, this.prepareQueryBuilder));
}
/**
* Get full name of data pass for given suffix and LHC period which given run belongs to
*
* @param {string} partialDataPassName data pass name without LHC period name, e.g. apass1, cpass0
* @param {number} runNumber run number
* @return {string} data pass name
*/
async getFullDataPassNameUsingRunPeriod(partialDataPassName, runNumber) {
const lhcPeriods = await LhcPeriodRepository.findOne({ include: [{ association: 'runs', where: { runNumber } }] });
if (!lhcPeriods) {
throw new Error(`Missing LHC Period information for run (${runNumber})`);
}
return `${lhcPeriods.name}_${partialDataPassName}`;
}
/**
* Get all data passes
* @param {object} [options={}] define which records should be fetched
* @param {object} [options.filter] definition of filtering
* @param {number} [options.offset] parameter related to pagination - offset
* @param {number} [options.limit] parameter related to pagination - limit
* @param {object<string, 'ASC'|'DESC'>[]} [options.sort] definition of sorting -
* array of mappings of field name to order type
* @returns {Promise<CountedItems<DataPass>>} result
*/
async getAll({
filter = {},
limit,
offset,
sort,
} = {}) {
const queryBuilder = this.prepareQueryBuilder();
/**
* @typedef
* @property {object} filter
* @property {number[]} [filter.lhcPeriodIds] lhcPeriod identifier to filter with
* @property {number[]} [filter.simulationPassIds] simulationPass identifier to filter with
* @property {number[]} [filter.ids] data passes identifier to filter with
* @property {string[]} [filter.names] data passes names to filter with
* @property {boolean} [filter.includeByName] list of tokens in data passes names which indicate
* a given data pass should not be excluded, possible tokens are 'test', 'debug'.
*/
const { ids, names, permittedNonPhysicsNames = [], lhcPeriodIds, simulationPassIds } = filter;
if (sort) {
for (const property in sort) {
queryBuilder.orderBy(property, sort[property]);
}
}
if (limit) {
queryBuilder.limit(limit);
}
if (offset) {
queryBuilder.offset(offset);
}
if (lhcPeriodIds) {
queryBuilder.where('lhcPeriodId').oneOf(...lhcPeriodIds);
}
if (simulationPassIds) {
queryBuilder.whereAssociation('anchoredSimulationPasses', 'id').oneOf(...simulationPassIds);
}
if (ids) {
queryBuilder.where('id').oneOf(...ids);
}
if (names) {
queryBuilder.where('name').oneOf(...names);
}
if (!permittedNonPhysicsNames.includes(NonPhysicsProductionsNamesWords.TEST)) {
queryBuilder.where('name').not().substring(`\\_${NonPhysicsProductionsNamesWords.TEST}`);
}
if (!permittedNonPhysicsNames.includes(NonPhysicsProductionsNamesWords.DEBUG)) {
queryBuilder.where('name').not().substring(`\\_${NonPhysicsProductionsNamesWords.DEBUG}`);
}
const { count, rows } = await DataPassRepository.findAndCountAll(queryBuilder);
const dataPassesVersions = await DataPassVersionRepository.findAll({
include: 'statusHistory',
where: { dataPassId: { [Op.in]: rows.map(({ id }) => id) } },
order: [['statusHistory', 'createdAt', 'ASC']],
});
const dataPassIdToVersions = dataPassesVersions.reduce((acc, version) => {
const { dataPassId } = version;
acc[dataPassId] = [...acc[dataPassId] ?? [], version];
return acc;
}, {});
for (const dataPass of rows) {
dataPass.versions = dataPassIdToVersions[dataPass.id];
}
return {
count: count.length, // When using grouping sequelize returns from finAndCountAll counts per each group
rows: rows.map(dataPassAdapter.toEntity),
};
}
/**
* Freeze/unfreeze a given data pass
*
* @param {DataPassIdentifier} dataPassIdentifier the identifier of the data pass to freeze/unfreeze
* @param {boolean} isFrozen if true, data pass will be frozen, else it will be unfrozen (independently of the current state)
* @return {Promise<void>} resolves once the data pass freeze state has been updated
*/
async setFrozenState(dataPassIdentifier, isFrozen) {
return dataSource.transaction(async () => {
// Check that the data pass exists
const dataPass = await getOneDataPassOrFail(dataPassIdentifier);
await DataPassRepository.update(dataPass, { isFrozen });
});
}
/**
* Set given production as skimmable
*
* @param {DataPassIdentifier} identifier identifier of data pass
* @return {Promise<void>} resolved once the production was marked as skimmable
*/
async markAsSkimmable(identifier) {
const validSkimmableProductionNameRegex = /_apass\d+(?!.*(skimming|skimmed))/;
return dataSource.transaction(async () => {
const dataPass = await this.getOneOrFail(identifier);
if (!dataPass.pdpBeamTypes.includes(PdpBeamType.PROTON_PROTON)) {
throw new BadParameterError(`Cannot mark ${dataPass.name} as skimmable.`
+ ' Only production for PROTON_PROTON runs can be marked as skimmable');
}
if (!validSkimmableProductionNameRegex.test(dataPass.name)) {
throw new BadParameterError(`Cannot mark ${dataPass.name} as skimmable. Only \`apass\` can be marked as skimmable`);
}
const dataPassDB = await DataPassRepository.findOne({ where: { id: dataPass.id } });
const previousSkimmable = await DataPassRepository.findOne({ where: {
lhcPeriodId: dataPassDB.lhcPeriodId,
skimmingStage: SkimmingStage.SKIMMABLE,
} });
if (previousSkimmable) {
await DataPassRepository.update(previousSkimmable, { skimmingStage: null });
await DataPassRunRepository.updateAll({ readyForSkimming: null }, { where: { dataPassId: previousSkimmable.id } });
}
await DataPassRepository.update(dataPassDB, { skimmingStage: SkimmingStage.SKIMMABLE });
this._logger.infoMessage(`Set ${dataPassDB.name} as skimmable` +
`${previousSkimmable ? `, previous one was ${previousSkimmable.name}` : ''}`);
});
}
/**
* Fetch skimmable runs list with information whether they are ready for skimming
*
* @param {DataPassIdentifier} identifier identifier of data pass
* @return {Promise<{ runNumber: number, readyForSkimming: boolean }[]>} resolves with list of skimmable runs with ready_for_skimming flag
*/
async getSkimmableRuns(identifier) {
const dataPass = await this.getOneOrFail(identifier);
if (dataPass.skimmingStage !== SkimmingStage.SKIMMABLE) {
throw new BadParameterError(`DataPass ${dataPass.name} is not marked as skimmable`);
}
const runs = await DataPassRunRepository.findAll({ where: { dataPassId: dataPass.id } });
return runs.map(({ runNumber, readyForSkimming }) => ({ runNumber, readyForSkimming }));
}
/**
* Update skimmable runs with information whether they are ready for skimming
*
* @param {DataPassIdentifier} identifier identifier of data pass
* @param {{ runNumber: number, readyForSkimming: boolean }[]} runsList list of skimmable runs with ready_for_skimming flag
* @return {Promise<{ runNumber: number, readyForSkimming: boolean }[]>} resolves with updated runs with ready_for_skimming flag
*/
updateReadyForSkimmingRuns(identifier, runsList) {
return dataSource.transaction(async () => {
const dataPass = await this.getOneOrFail(identifier);
if (dataPass.skimmingStage !== SkimmingStage.SKIMMABLE) {
throw new BadParameterError(`DataPass ${dataPass.name} is not marked as skimmable`);
}
for (const { runNumber, readyForSkimming } of runsList) {
const dataPassRun = await DataPassRunRepository.findOne({ where: { runNumber, dataPassId: dataPass.id } });
if (!dataPassRun) {
throw new Error(`No association between data pass ${dataPass.name} and run ${runNumber}`);
}
await DataPassRunRepository.update(dataPassRun, { readyForSkimming });
}
return runsList;
});
}
/**
* Prepare common fetch data query builder (create a new one if none is provided)
*
* @param {QueryBuilder|null} [queryBuilder=null] if specified, an existing query builder that will be reused
* @return {QueryBuilder} query builder with common includes
*/
prepareQueryBuilder(queryBuilder = null) {
return (queryBuilder ?? dataSource.createQueryBuilder())
.set('subQuery', false)
.includeAttribute({
query: (sequelize) => sequelize.fn('COUNT', sequelize.fn('DISTINCT', sequelize.col('`runs.run_number'))),
alias: 'runsCount',
})
.includeAttribute({
query: (sequelize) => sequelize.fn('COUNT', sequelize.fn('DISTINCT', sequelize.col('anchoredSimulationPasses.id'))),
alias: 'simulationPassesCount',
})
.includeAttribute({
query: (sequelize) => sequelize.fn('GROUP_CONCAT', sequelize.fn('DISTINCT', sequelize.col('runs.pdp_beam_type'))),
alias: 'pdpBeamTypes',
})
.include({
association: 'runs',
attributes: [],
required: false,
})
.include({
association: 'anchoredSimulationPasses',
attributes: [],
required: false,
})
.include({ association: 'versions', attributes: [], required: true })
.groupBy('`DataPass`.id');
}
}
module.exports = {
DataPassService,
dataPassService: new DataPassService(),
};