793 lines
33 KiB
JavaScript
793 lines
33 KiB
JavaScript
import { Transform, PassThrough } from 'stream';
|
|
import { pipeline } from 'stream/promises';
|
|
import { extname } from 'path';
|
|
import { EOL } from 'os';
|
|
import { chain } from 'stream-chain';
|
|
import { isEmpty, last, set, pick, isNumber, uniq } from 'lodash/fp';
|
|
import { diff as diff$1 } from 'semver';
|
|
import { compareSchemas } from './validation/schemas/index.mjs';
|
|
import { validateProvider } from './validation/provider.mjs';
|
|
import { TransferEngineError, TransferEngineValidationError } from './errors.mjs';
|
|
import * as errors from './errors.mjs';
|
|
export { errors };
|
|
import { createDiagnosticReporter } from '../utils/diagnostic.mjs';
|
|
import 'crypto';
|
|
import { filter, map } from '../utils/stream.mjs';
|
|
import { diff } from '../utils/json.mjs';
|
|
import 'events';
|
|
import { runMiddleware } from '../utils/middleware.mjs';
|
|
import { ProviderTransferError } from '../errors/providers.mjs';
|
|
|
|
function _class_private_field_loose_base(receiver, privateKey) {
|
|
if (!Object.prototype.hasOwnProperty.call(receiver, privateKey)) {
|
|
throw new TypeError("attempted to use private field on non-instance");
|
|
}
|
|
return receiver;
|
|
}
|
|
var id = 0;
|
|
function _class_private_field_loose_key(name) {
|
|
return "__private_" + id++ + "_" + name;
|
|
}
|
|
const TRANSFER_STAGES = Object.freeze([
|
|
'entities',
|
|
'links',
|
|
'assets',
|
|
'schemas',
|
|
'configuration'
|
|
]);
|
|
/**
|
|
* Preset filters for only/exclude options
|
|
* */ const TransferGroupPresets = {
|
|
content: {
|
|
links: true,
|
|
entities: true
|
|
},
|
|
files: {
|
|
assets: true
|
|
},
|
|
config: {
|
|
configuration: true
|
|
}
|
|
};
|
|
const DEFAULT_VERSION_STRATEGY = 'ignore';
|
|
const DEFAULT_SCHEMA_STRATEGY = 'strict';
|
|
var _metadata = /*#__PURE__*/ _class_private_field_loose_key("_metadata"), _schema = /*#__PURE__*/ _class_private_field_loose_key("_schema"), _handlers = /*#__PURE__*/ _class_private_field_loose_key("_handlers"), _currentStreamController = /*#__PURE__*/ _class_private_field_loose_key("_currentStreamController"), _aborted = /*#__PURE__*/ _class_private_field_loose_key("_aborted"), /**
|
|
* Create and return a transform stream based on the given stage and options.
|
|
*
|
|
* Allowed transformations includes 'filter' and 'map'.
|
|
*/ _createStageTransformStream = /*#__PURE__*/ _class_private_field_loose_key("_createStageTransformStream"), /**
|
|
* Update the Engine's transfer progress data for a given stage.
|
|
*
|
|
* Providing aggregate options enable custom computation to get the size (bytes) or the aggregate key associated with the data
|
|
*/ _updateTransferProgress = /*#__PURE__*/ _class_private_field_loose_key("_updateTransferProgress"), /**
|
|
* Create and return a PassThrough stream.
|
|
*
|
|
* Upon writing data into it, it'll update the Engine's transfer progress data and trigger stage update events.
|
|
*/ _progressTracker = /*#__PURE__*/ _class_private_field_loose_key("_progressTracker"), /**
|
|
* Shorthand method used to trigger transfer update events to every listeners
|
|
*/ _emitTransferUpdate = /*#__PURE__*/ _class_private_field_loose_key("_emitTransferUpdate"), /**
|
|
* Shorthand method used to trigger stage update events to every listeners
|
|
*/ _emitStageUpdate = /*#__PURE__*/ _class_private_field_loose_key("_emitStageUpdate"), /**
|
|
* Run a version check between two strapi version (source and destination) using the strategy given to the engine during initialization.
|
|
*
|
|
* If there is a mismatch, throws a validation error.
|
|
*/ _assertStrapiVersionIntegrity = /*#__PURE__*/ _class_private_field_loose_key("_assertStrapiVersionIntegrity"), /**
|
|
* Run a check between two set of schemas (source and destination) using the strategy given to the engine during initialization.
|
|
*
|
|
* If there are differences and/or incompatibilities between source and destination schemas, then throw a validation error.
|
|
*/ _assertSchemasMatching = /*#__PURE__*/ _class_private_field_loose_key("_assertSchemasMatching"), _transferStage = /*#__PURE__*/ _class_private_field_loose_key("_transferStage"), _resolveProviderResource = /*#__PURE__*/ _class_private_field_loose_key("_resolveProviderResource"), _getSchemas = /*#__PURE__*/ _class_private_field_loose_key("_getSchemas");
|
|
class TransferEngine {
|
|
onSchemaDiff(handler) {
|
|
_class_private_field_loose_base(this, _handlers)[_handlers]?.schemaDiff?.push(handler);
|
|
}
|
|
addErrorHandler(handlerName, handler) {
|
|
if (!_class_private_field_loose_base(this, _handlers)[_handlers].errors[handlerName]) {
|
|
_class_private_field_loose_base(this, _handlers)[_handlers].errors[handlerName] = [];
|
|
}
|
|
_class_private_field_loose_base(this, _handlers)[_handlers].errors[handlerName]?.push(handler);
|
|
}
|
|
async attemptResolveError(error) {
|
|
const context = {};
|
|
if (error instanceof ProviderTransferError && error.details?.details.code) {
|
|
const errorCode = error.details?.details.code;
|
|
if (!_class_private_field_loose_base(this, _handlers)[_handlers].errors[errorCode]) {
|
|
_class_private_field_loose_base(this, _handlers)[_handlers].errors[errorCode] = [];
|
|
}
|
|
await runMiddleware(context ?? {}, _class_private_field_loose_base(this, _handlers)[_handlers].errors[errorCode] ?? []);
|
|
}
|
|
return !!context.ignore;
|
|
}
|
|
/**
|
|
* Report a fatal error and throw it
|
|
*/ panic(error) {
|
|
this.reportError(error, 'fatal');
|
|
throw error;
|
|
}
|
|
/**
|
|
* Report an error diagnostic
|
|
*/ reportError(error, severity) {
|
|
this.diagnostics.report({
|
|
kind: 'error',
|
|
details: {
|
|
severity,
|
|
createdAt: new Date(),
|
|
name: error.name,
|
|
message: error.message,
|
|
error
|
|
}
|
|
});
|
|
}
|
|
/**
|
|
* Report a warning diagnostic
|
|
*/ reportWarning(message, origin1) {
|
|
this.diagnostics.report({
|
|
kind: 'warning',
|
|
details: {
|
|
createdAt: new Date(),
|
|
message,
|
|
origin: origin1
|
|
}
|
|
});
|
|
}
|
|
/**
|
|
* Report an info diagnostic
|
|
*/ reportInfo(message, params) {
|
|
this.diagnostics.report({
|
|
kind: 'info',
|
|
details: {
|
|
createdAt: new Date(),
|
|
message,
|
|
params,
|
|
origin: 'engine'
|
|
}
|
|
});
|
|
}
|
|
shouldSkipStage(stage) {
|
|
const { exclude, only } = this.options;
|
|
// schemas must always be included
|
|
if (stage === 'schemas') {
|
|
return false;
|
|
}
|
|
// everything is included by default unless 'only' has been set
|
|
let included = isEmpty(only);
|
|
if (only && only.length > 0) {
|
|
included = only.some((transferGroup)=>{
|
|
return TransferGroupPresets[transferGroup][stage];
|
|
});
|
|
}
|
|
if (exclude && exclude.length > 0) {
|
|
if (included) {
|
|
included = !exclude.some((transferGroup)=>{
|
|
return TransferGroupPresets[transferGroup][stage];
|
|
});
|
|
}
|
|
}
|
|
return !included;
|
|
}
|
|
// Cause an ongoing transfer to abort gracefully
|
|
async abortTransfer() {
|
|
_class_private_field_loose_base(this, _aborted)[_aborted] = true;
|
|
_class_private_field_loose_base(this, _currentStreamController)[_currentStreamController]?.abort();
|
|
throw new TransferEngineError('fatal', 'Transfer aborted.');
|
|
}
|
|
async init() {
|
|
// Resolve providers' resource and store
|
|
// them in the engine's internal state
|
|
await _class_private_field_loose_base(this, _resolveProviderResource)[_resolveProviderResource]();
|
|
// Update the destination provider's source metadata
|
|
const { source: sourceMetadata } = _class_private_field_loose_base(this, _metadata)[_metadata];
|
|
if (sourceMetadata) {
|
|
this.destinationProvider.setMetadata?.('source', sourceMetadata);
|
|
}
|
|
}
|
|
/**
|
|
* Run the bootstrap method in both source and destination providers
|
|
*/ async bootstrap() {
|
|
const results = await Promise.allSettled([
|
|
this.sourceProvider.bootstrap?.(this.diagnostics),
|
|
this.destinationProvider.bootstrap?.(this.diagnostics)
|
|
]);
|
|
results.forEach((result)=>{
|
|
if (result.status === 'rejected') {
|
|
this.panic(result.reason);
|
|
}
|
|
});
|
|
}
|
|
/**
|
|
* Run the close method in both source and destination providers
|
|
*/ async close() {
|
|
const results = await Promise.allSettled([
|
|
this.sourceProvider.close?.(),
|
|
this.destinationProvider.close?.()
|
|
]);
|
|
results.forEach((result)=>{
|
|
if (result.status === 'rejected') {
|
|
this.panic(result.reason);
|
|
}
|
|
});
|
|
}
|
|
async integrityCheck() {
|
|
const sourceMetadata = await this.sourceProvider.getMetadata();
|
|
const destinationMetadata = await this.destinationProvider.getMetadata();
|
|
if (sourceMetadata && destinationMetadata) {
|
|
_class_private_field_loose_base(this, _assertStrapiVersionIntegrity)[_assertStrapiVersionIntegrity](sourceMetadata?.strapi?.version, destinationMetadata?.strapi?.version);
|
|
}
|
|
const { sourceSchemas, destinationSchemas } = await _class_private_field_loose_base(this, _getSchemas)[_getSchemas]();
|
|
try {
|
|
if (sourceSchemas && destinationSchemas) {
|
|
_class_private_field_loose_base(this, _assertSchemasMatching)[_assertSchemasMatching](sourceSchemas, destinationSchemas);
|
|
}
|
|
} catch (error) {
|
|
// if this is a schema matching error, allow handlers to resolve it
|
|
if (error instanceof TransferEngineValidationError && error.details?.details?.diffs) {
|
|
const schemaDiffs = error.details?.details?.diffs;
|
|
const context = {
|
|
ignoredDiffs: {},
|
|
diffs: schemaDiffs,
|
|
source: this.sourceProvider,
|
|
destination: this.destinationProvider
|
|
};
|
|
// if we don't have any handlers, throw the original error
|
|
if (isEmpty(_class_private_field_loose_base(this, _handlers)[_handlers].schemaDiff)) {
|
|
throw error;
|
|
}
|
|
await runMiddleware(context, _class_private_field_loose_base(this, _handlers)[_handlers].schemaDiff);
|
|
// if there are any remaining diffs that weren't ignored
|
|
const unresolvedDiffs = diff(context.diffs, context.ignoredDiffs);
|
|
if (unresolvedDiffs.length) {
|
|
this.panic(new TransferEngineValidationError('Unresolved differences in schema', {
|
|
check: 'schema.changes',
|
|
unresolvedDiffs
|
|
}));
|
|
}
|
|
return;
|
|
}
|
|
throw error;
|
|
}
|
|
}
|
|
async transfer() {
|
|
// reset data between transfers
|
|
this.progress.data = {};
|
|
try {
|
|
_class_private_field_loose_base(this, _emitTransferUpdate)[_emitTransferUpdate]('init');
|
|
await this.bootstrap();
|
|
await this.init();
|
|
await this.integrityCheck();
|
|
_class_private_field_loose_base(this, _emitTransferUpdate)[_emitTransferUpdate]('start');
|
|
await this.beforeTransfer();
|
|
// Run the transfer stages
|
|
await this.transferSchemas();
|
|
await this.transferEntities();
|
|
await this.transferAssets();
|
|
await this.transferLinks();
|
|
await this.transferConfiguration();
|
|
// Gracefully close the providers
|
|
await this.close();
|
|
_class_private_field_loose_base(this, _emitTransferUpdate)[_emitTransferUpdate]('finish');
|
|
} catch (e) {
|
|
_class_private_field_loose_base(this, _emitTransferUpdate)[_emitTransferUpdate]('error', {
|
|
error: e
|
|
});
|
|
const lastDiagnostic = last(this.diagnostics.stack.items);
|
|
// Do not report an error diagnostic if the last one reported the same error
|
|
if (e instanceof Error && (!lastDiagnostic || lastDiagnostic.kind !== 'error' || lastDiagnostic.details.error !== e)) {
|
|
this.reportError(e, e.severity || 'fatal');
|
|
}
|
|
// Rollback the destination provider if an exception is thrown during the transfer
|
|
// Note: This will be configurable in the future
|
|
await this.destinationProvider.rollback?.(e);
|
|
throw e;
|
|
}
|
|
return {
|
|
source: this.sourceProvider.results,
|
|
destination: this.destinationProvider.results,
|
|
engine: this.progress.data
|
|
};
|
|
}
|
|
async beforeTransfer() {
|
|
const runWithDiagnostic = async (provider)=>{
|
|
try {
|
|
await provider.beforeTransfer?.();
|
|
} catch (error) {
|
|
if (error instanceof Error) {
|
|
const resolved = await this.attemptResolveError(error);
|
|
if (resolved) {
|
|
return;
|
|
}
|
|
this.panic(error);
|
|
} else {
|
|
this.panic(new Error(`Unknwon error when executing "beforeTransfer" on the ${origin} provider`));
|
|
}
|
|
}
|
|
};
|
|
await runWithDiagnostic(this.sourceProvider);
|
|
await runWithDiagnostic(this.destinationProvider);
|
|
}
|
|
async transferSchemas() {
|
|
const stage = 'schemas';
|
|
if (this.shouldSkipStage(stage)) {
|
|
return;
|
|
}
|
|
const source = await this.sourceProvider.createSchemasReadStream?.();
|
|
const destination = await this.destinationProvider.createSchemasWriteStream?.();
|
|
const transform = _class_private_field_loose_base(this, _createStageTransformStream)[_createStageTransformStream](stage);
|
|
const tracker = _class_private_field_loose_base(this, _progressTracker)[_progressTracker](stage, {
|
|
key: (value)=>value.modelType
|
|
});
|
|
await _class_private_field_loose_base(this, _transferStage)[_transferStage]({
|
|
stage,
|
|
source,
|
|
destination,
|
|
transform,
|
|
tracker
|
|
});
|
|
}
|
|
async transferEntities() {
|
|
const stage = 'entities';
|
|
if (this.shouldSkipStage(stage)) {
|
|
return;
|
|
}
|
|
const source = await this.sourceProvider.createEntitiesReadStream?.();
|
|
const destination = await this.destinationProvider.createEntitiesWriteStream?.();
|
|
const transform = chain([
|
|
_class_private_field_loose_base(this, _createStageTransformStream)[_createStageTransformStream](stage),
|
|
new Transform({
|
|
objectMode: true,
|
|
transform: async (entity, _encoding, callback)=>{
|
|
const { destinationSchemas: schemas } = await _class_private_field_loose_base(this, _getSchemas)[_getSchemas]();
|
|
if (!schemas) {
|
|
return callback(null, entity);
|
|
}
|
|
// TODO: this would be safer if we only ignored things in ignoredDiffs, otherwise continue and let an error be thrown
|
|
const availableContentTypes = Object.entries(schemas).filter(([, schema])=>schema.modelType === 'contentType').map(([uid])=>uid);
|
|
// If the type of the transferred entity doesn't exist in the destination, then discard it
|
|
if (!availableContentTypes.includes(entity.type)) {
|
|
return callback(null, undefined);
|
|
}
|
|
const { type, data } = entity;
|
|
const attributes = schemas[type].attributes;
|
|
const attributesToKeep = Object.keys(attributes).concat('documentId');
|
|
const updatedEntity = set('data', pick(attributesToKeep, data), entity);
|
|
callback(null, updatedEntity);
|
|
}
|
|
})
|
|
]);
|
|
const tracker = _class_private_field_loose_base(this, _progressTracker)[_progressTracker](stage, {
|
|
key: (value)=>value.type
|
|
});
|
|
await _class_private_field_loose_base(this, _transferStage)[_transferStage]({
|
|
stage,
|
|
source,
|
|
destination,
|
|
transform,
|
|
tracker
|
|
});
|
|
}
|
|
async transferLinks() {
|
|
const stage = 'links';
|
|
if (this.shouldSkipStage(stage)) {
|
|
return;
|
|
}
|
|
const source = await this.sourceProvider.createLinksReadStream?.();
|
|
const destination = await this.destinationProvider.createLinksWriteStream?.();
|
|
const transform = chain([
|
|
_class_private_field_loose_base(this, _createStageTransformStream)[_createStageTransformStream](stage),
|
|
new Transform({
|
|
objectMode: true,
|
|
transform: async (link, _encoding, callback)=>{
|
|
const { destinationSchemas: schemas } = await _class_private_field_loose_base(this, _getSchemas)[_getSchemas]();
|
|
if (!schemas) {
|
|
return callback(null, link);
|
|
}
|
|
// TODO: this would be safer if we only ignored things in ignoredDiffs, otherwise continue and let an error be thrown
|
|
const availableContentTypes = Object.keys(schemas);
|
|
const isValidType = (uid)=>availableContentTypes.includes(uid);
|
|
if (!isValidType(link.left.type) || !isValidType(link.right.type)) {
|
|
return callback(null, undefined); // ignore the link
|
|
}
|
|
callback(null, link);
|
|
}
|
|
})
|
|
]);
|
|
const tracker = _class_private_field_loose_base(this, _progressTracker)[_progressTracker](stage);
|
|
await _class_private_field_loose_base(this, _transferStage)[_transferStage]({
|
|
stage,
|
|
source,
|
|
destination,
|
|
transform,
|
|
tracker
|
|
});
|
|
}
|
|
async transferAssets() {
|
|
const stage = 'assets';
|
|
if (this.shouldSkipStage(stage)) {
|
|
return;
|
|
}
|
|
const source = await this.sourceProvider.createAssetsReadStream?.();
|
|
const destination = await this.destinationProvider.createAssetsWriteStream?.();
|
|
const transform = _class_private_field_loose_base(this, _createStageTransformStream)[_createStageTransformStream](stage);
|
|
const tracker = _class_private_field_loose_base(this, _progressTracker)[_progressTracker](stage, {
|
|
size: (value)=>value.stats.size,
|
|
key: (value)=>extname(value.filename) || 'No extension'
|
|
});
|
|
await _class_private_field_loose_base(this, _transferStage)[_transferStage]({
|
|
stage,
|
|
source,
|
|
destination,
|
|
transform,
|
|
tracker
|
|
});
|
|
}
|
|
async transferConfiguration() {
|
|
const stage = 'configuration';
|
|
if (this.shouldSkipStage(stage)) {
|
|
return;
|
|
}
|
|
const source = await this.sourceProvider.createConfigurationReadStream?.();
|
|
const destination = await this.destinationProvider.createConfigurationWriteStream?.();
|
|
const transform = _class_private_field_loose_base(this, _createStageTransformStream)[_createStageTransformStream](stage);
|
|
const tracker = _class_private_field_loose_base(this, _progressTracker)[_progressTracker](stage);
|
|
await _class_private_field_loose_base(this, _transferStage)[_transferStage]({
|
|
stage,
|
|
source,
|
|
destination,
|
|
transform,
|
|
tracker
|
|
});
|
|
}
|
|
constructor(sourceProvider, destinationProvider, options){
|
|
Object.defineProperty(this, _createStageTransformStream, {
|
|
value: createStageTransformStream
|
|
});
|
|
Object.defineProperty(this, _updateTransferProgress, {
|
|
value: updateTransferProgress
|
|
});
|
|
Object.defineProperty(this, _progressTracker, {
|
|
value: progressTracker
|
|
});
|
|
Object.defineProperty(this, _emitTransferUpdate, {
|
|
value: emitTransferUpdate
|
|
});
|
|
Object.defineProperty(this, _emitStageUpdate, {
|
|
value: emitStageUpdate
|
|
});
|
|
Object.defineProperty(this, _assertStrapiVersionIntegrity, {
|
|
value: assertStrapiVersionIntegrity
|
|
});
|
|
Object.defineProperty(this, _assertSchemasMatching, {
|
|
value: assertSchemasMatching
|
|
});
|
|
Object.defineProperty(this, _transferStage, {
|
|
value: transferStage
|
|
});
|
|
Object.defineProperty(this, _resolveProviderResource, {
|
|
value: resolveProviderResource
|
|
});
|
|
Object.defineProperty(this, _getSchemas, {
|
|
value: getSchemas
|
|
});
|
|
Object.defineProperty(this, _metadata, {
|
|
writable: true,
|
|
value: void 0
|
|
});
|
|
Object.defineProperty(this, _schema, {
|
|
writable: true,
|
|
value: void 0
|
|
});
|
|
Object.defineProperty(this, _handlers, {
|
|
writable: true,
|
|
value: void 0
|
|
});
|
|
Object.defineProperty(this, _currentStreamController, {
|
|
writable: true,
|
|
value: void 0
|
|
});
|
|
Object.defineProperty(this, _aborted, {
|
|
writable: true,
|
|
value: void 0
|
|
});
|
|
_class_private_field_loose_base(this, _metadata)[_metadata] = {};
|
|
_class_private_field_loose_base(this, _schema)[_schema] = {};
|
|
_class_private_field_loose_base(this, _handlers)[_handlers] = {
|
|
schemaDiff: [],
|
|
errors: {}
|
|
};
|
|
_class_private_field_loose_base(this, _aborted)[_aborted] = false;
|
|
this.diagnostics = createDiagnosticReporter();
|
|
validateProvider('source', sourceProvider);
|
|
validateProvider('destination', destinationProvider);
|
|
this.sourceProvider = sourceProvider;
|
|
this.destinationProvider = destinationProvider;
|
|
this.options = options;
|
|
this.progress = {
|
|
data: {},
|
|
stream: new PassThrough({
|
|
objectMode: true
|
|
})
|
|
};
|
|
}
|
|
}
|
|
function createStageTransformStream(key, options = {}) {
|
|
const { includeGlobal = true } = options;
|
|
const { throttle } = this.options;
|
|
const { global: globalTransforms, [key]: stageTransforms } = this.options?.transforms ?? {};
|
|
let stream = new PassThrough({
|
|
objectMode: true
|
|
});
|
|
const applyTransforms = (transforms = [])=>{
|
|
const chainTransforms = [];
|
|
for (const transform of transforms){
|
|
if ('filter' in transform) {
|
|
chainTransforms.push(filter(transform.filter));
|
|
}
|
|
if ('map' in transform) {
|
|
chainTransforms.push(map(transform.map));
|
|
}
|
|
}
|
|
if (chainTransforms.length) {
|
|
stream = stream.pipe(chain(chainTransforms));
|
|
}
|
|
};
|
|
if (includeGlobal) {
|
|
applyTransforms(globalTransforms);
|
|
}
|
|
if (isNumber(throttle) && throttle > 0) {
|
|
stream = stream.pipe(new PassThrough({
|
|
objectMode: true,
|
|
async transform (data, _encoding, callback) {
|
|
await new Promise((resolve)=>{
|
|
setTimeout(resolve, throttle);
|
|
});
|
|
callback(null, data);
|
|
}
|
|
}));
|
|
}
|
|
applyTransforms(stageTransforms);
|
|
return stream;
|
|
}
|
|
function updateTransferProgress(stage, data, aggregate) {
|
|
if (!this.progress.data[stage]) {
|
|
this.progress.data[stage] = {
|
|
count: 0,
|
|
bytes: 0,
|
|
startTime: Date.now()
|
|
};
|
|
}
|
|
const stageProgress = this.progress.data[stage];
|
|
if (!stageProgress) {
|
|
return;
|
|
}
|
|
const size = aggregate?.size?.(data) ?? JSON.stringify(data).length;
|
|
const key = aggregate?.key?.(data);
|
|
stageProgress.count += 1;
|
|
stageProgress.bytes += size;
|
|
// Handle aggregate updates if necessary
|
|
if (key) {
|
|
if (!stageProgress.aggregates) {
|
|
stageProgress.aggregates = {};
|
|
}
|
|
const { aggregates } = stageProgress;
|
|
if (!aggregates[key]) {
|
|
aggregates[key] = {
|
|
count: 0,
|
|
bytes: 0
|
|
};
|
|
}
|
|
aggregates[key].count += 1;
|
|
aggregates[key].bytes += size;
|
|
}
|
|
}
|
|
function progressTracker(stage, aggregate) {
|
|
return new PassThrough({
|
|
objectMode: true,
|
|
transform: (data, _encoding, callback)=>{
|
|
_class_private_field_loose_base(this, _updateTransferProgress)[_updateTransferProgress](stage, data, aggregate);
|
|
_class_private_field_loose_base(this, _emitStageUpdate)[_emitStageUpdate]('progress', stage);
|
|
callback(null, data);
|
|
}
|
|
});
|
|
}
|
|
function emitTransferUpdate(type, payload) {
|
|
this.progress.stream.emit(`transfer::${type}`, payload);
|
|
}
|
|
function emitStageUpdate(type, transferStage) {
|
|
this.progress.stream.emit(`stage::${type}`, {
|
|
data: this.progress.data,
|
|
stage: transferStage
|
|
});
|
|
}
|
|
function assertStrapiVersionIntegrity(sourceVersion, destinationVersion) {
|
|
const strategy = this.options.versionStrategy || DEFAULT_VERSION_STRATEGY;
|
|
const reject = ()=>{
|
|
throw new TransferEngineValidationError(`The source and destination provide are targeting incompatible Strapi versions (using the "${strategy}" strategy). The source (${this.sourceProvider.name}) version is ${sourceVersion} and the destination (${this.destinationProvider.name}) version is ${destinationVersion}`, {
|
|
check: 'strapi.version',
|
|
strategy,
|
|
versions: {
|
|
source: sourceVersion,
|
|
destination: destinationVersion
|
|
}
|
|
});
|
|
};
|
|
if (!sourceVersion || !destinationVersion || strategy === 'ignore' || destinationVersion === sourceVersion) {
|
|
return;
|
|
}
|
|
let diff;
|
|
try {
|
|
diff = diff$1(sourceVersion, destinationVersion);
|
|
} catch {
|
|
reject();
|
|
}
|
|
if (!diff) {
|
|
return;
|
|
}
|
|
const validPatch = [
|
|
'prelease',
|
|
'build'
|
|
];
|
|
const validMinor = [
|
|
...validPatch,
|
|
'patch',
|
|
'prepatch'
|
|
];
|
|
const validMajor = [
|
|
...validMinor,
|
|
'minor',
|
|
'preminor'
|
|
];
|
|
if (strategy === 'patch' && validPatch.includes(diff)) {
|
|
return;
|
|
}
|
|
if (strategy === 'minor' && validMinor.includes(diff)) {
|
|
return;
|
|
}
|
|
if (strategy === 'major' && validMajor.includes(diff)) {
|
|
return;
|
|
}
|
|
reject();
|
|
}
|
|
function assertSchemasMatching(sourceSchemas, destinationSchemas) {
|
|
const strategy = this.options.schemaStrategy || DEFAULT_SCHEMA_STRATEGY;
|
|
if (strategy === 'ignore') {
|
|
return;
|
|
}
|
|
const keys = uniq(Object.keys(sourceSchemas).concat(Object.keys(destinationSchemas)));
|
|
const diffs = {};
|
|
keys.forEach((key)=>{
|
|
const sourceSchema = sourceSchemas[key];
|
|
const destinationSchema = destinationSchemas[key];
|
|
const schemaDiffs = compareSchemas(sourceSchema, destinationSchema, strategy);
|
|
if (schemaDiffs.length) {
|
|
diffs[key] = schemaDiffs;
|
|
}
|
|
});
|
|
if (!isEmpty(diffs)) {
|
|
const formattedDiffs = Object.entries(diffs).map(([uid, ctDiffs])=>{
|
|
let msg = `- ${uid}:${EOL}`;
|
|
msg += ctDiffs.sort((a, b)=>a.kind > b.kind ? -1 : 1).map((diff)=>{
|
|
const path = diff.path.join('.');
|
|
if (diff.kind === 'added') {
|
|
return `${path} exists in destination schema but not in source schema and the data will not be transferred.`;
|
|
}
|
|
if (diff.kind === 'deleted') {
|
|
return `${path} exists in source schema but not in destination schema and the data will not be transferred.`;
|
|
}
|
|
if (diff.kind === 'modified') {
|
|
if (diff.types[0] === diff.types[1]) {
|
|
return `Schema value changed at "${path}": "${diff.values[0]}" (${diff.types[0]}) => "${diff.values[1]}" (${diff.types[1]})`;
|
|
}
|
|
return `Schema has differing data types at "${path}": "${diff.values[0]}" (${diff.types[0]}) => "${diff.values[1]}" (${diff.types[1]})`;
|
|
}
|
|
throw new TransferEngineValidationError(`Invalid diff found for "${uid}"`, {
|
|
check: `schema on ${uid}`
|
|
});
|
|
}).map((line)=>` - ${line}`).join(EOL);
|
|
return msg;
|
|
}).join(EOL);
|
|
throw new TransferEngineValidationError(`Invalid schema changes detected during integrity checks (using the ${strategy} strategy). Please find a summary of the changes below:\n${formattedDiffs}`, {
|
|
check: 'schema.changes',
|
|
strategy,
|
|
diffs
|
|
});
|
|
}
|
|
}
|
|
async function transferStage(options) {
|
|
if (_class_private_field_loose_base(this, _aborted)[_aborted]) {
|
|
throw new TransferEngineError('fatal', 'Transfer aborted.');
|
|
}
|
|
const { stage, source, destination, transform, tracker } = options;
|
|
const updateEndTime = ()=>{
|
|
const stageData = this.progress.data[stage];
|
|
if (stageData) {
|
|
stageData.endTime = Date.now();
|
|
}
|
|
};
|
|
if (!source || !destination || this.shouldSkipStage(stage)) {
|
|
// Wait until source and destination are closed
|
|
const results = await Promise.allSettled([
|
|
source,
|
|
destination
|
|
].map((stream)=>{
|
|
// if stream is undefined or already closed, resolve immediately
|
|
if (!stream || stream.destroyed) {
|
|
return Promise.resolve();
|
|
}
|
|
// Wait until the close event is produced and then destroy the stream and resolve
|
|
return new Promise((resolve, reject)=>{
|
|
stream.on('close', resolve).on('error', reject).destroy();
|
|
});
|
|
}));
|
|
results.forEach((state)=>{
|
|
if (state.status === 'rejected') {
|
|
this.reportWarning(state.reason, `transfer(${stage})`);
|
|
}
|
|
});
|
|
_class_private_field_loose_base(this, _emitStageUpdate)[_emitStageUpdate]('skip', stage);
|
|
return;
|
|
}
|
|
_class_private_field_loose_base(this, _emitStageUpdate)[_emitStageUpdate]('start', stage);
|
|
try {
|
|
const streams = [
|
|
source
|
|
];
|
|
if (transform) {
|
|
streams.push(transform);
|
|
}
|
|
if (tracker) {
|
|
streams.push(tracker);
|
|
}
|
|
streams.push(destination);
|
|
// NOTE: to debug/confirm backpressure issues from misbehaving stream, uncomment the following lines
|
|
// source.on('pause', () => console.log(`[${stage}] Source paused due to backpressure`));
|
|
// source.on('resume', () => console.log(`[${stage}] Source resumed`));
|
|
// destination.on('drain', () =>
|
|
// console.log(`[${stage}] Destination drained, resuming data flow`)
|
|
// );
|
|
// destination.on('error', (err) => console.error(`[${stage}] Destination error:`, err));
|
|
const controller = new AbortController();
|
|
const { signal } = controller;
|
|
// Store the controller so you can cancel later
|
|
_class_private_field_loose_base(this, _currentStreamController)[_currentStreamController] = controller;
|
|
await pipeline(streams, {
|
|
signal
|
|
});
|
|
_class_private_field_loose_base(this, _emitStageUpdate)[_emitStageUpdate]('finish', stage);
|
|
} catch (e) {
|
|
updateEndTime();
|
|
_class_private_field_loose_base(this, _emitStageUpdate)[_emitStageUpdate]('error', stage);
|
|
this.reportError(e, 'error');
|
|
if (!destination.destroyed) {
|
|
destination.destroy(e);
|
|
}
|
|
} finally{
|
|
updateEndTime();
|
|
}
|
|
}
|
|
async function resolveProviderResource() {
|
|
const sourceMetadata = await this.sourceProvider.getMetadata();
|
|
const destinationMetadata = await this.destinationProvider.getMetadata();
|
|
if (sourceMetadata) {
|
|
_class_private_field_loose_base(this, _metadata)[_metadata].source = sourceMetadata;
|
|
}
|
|
if (destinationMetadata) {
|
|
_class_private_field_loose_base(this, _metadata)[_metadata].destination = destinationMetadata;
|
|
}
|
|
}
|
|
async function getSchemas() {
|
|
if (!_class_private_field_loose_base(this, _schema)[_schema].source) {
|
|
_class_private_field_loose_base(this, _schema)[_schema].source = await this.sourceProvider.getSchemas?.();
|
|
}
|
|
if (!_class_private_field_loose_base(this, _schema)[_schema].destination) {
|
|
_class_private_field_loose_base(this, _schema)[_schema].destination = await this.destinationProvider.getSchemas?.();
|
|
}
|
|
return {
|
|
sourceSchemas: _class_private_field_loose_base(this, _schema)[_schema].source,
|
|
destinationSchemas: _class_private_field_loose_base(this, _schema)[_schema].destination
|
|
};
|
|
}
|
|
const createTransferEngine = (sourceProvider, destinationProvider, options)=>{
|
|
return new TransferEngine(sourceProvider, destinationProvider, options);
|
|
};
|
|
|
|
export { DEFAULT_SCHEMA_STRATEGY, DEFAULT_VERSION_STRATEGY, TRANSFER_STAGES, TransferGroupPresets, createTransferEngine };
|
|
//# sourceMappingURL=index.mjs.map
|