node_modules ignore

This commit is contained in:
2025-05-08 23:43:47 +02:00
parent e19d52f172
commit 4574544c9f
65041 changed files with 10593536 additions and 0 deletions

View File

@@ -0,0 +1,76 @@
/// <reference types="node" />
import type { WebSocket, RawData } from 'ws';
import type { ValidTransferCommand } from './constants';
import type { TransferMethod } from '../constants';
import type { IDiagnosticReporter } from '../../../utils/diagnostic';
type BufferLike = Parameters<WebSocket['send']>[0];
export interface TransferState {
id?: string;
startedAt?: number;
response?: {
uuid?: string;
e?: Error | null;
data?: unknown;
};
}
export interface Handler {
get transferID(): TransferState['id'];
set transferID(id: TransferState['id']);
get startedAt(): TransferState['startedAt'];
set startedAt(id: TransferState['startedAt']);
get response(): TransferState['response'];
set response(response: TransferState['response']);
get diagnostics(): IDiagnosticReporter;
addUUID(uuid: string): void;
hasUUID(uuid: string): boolean;
/**
* Returns whether a transfer is currently in progress or not
*/
isTransferStarted(): boolean;
/**
* Make sure the current transfer is started and initialized
*/
assertValidTransfer(): void;
/**
* Checks that the given string is a valid transfer command
*/
assertValidTransferCommand(command: string): asserts command is ValidTransferCommand;
/**
* Respond to a specific message
*/
respond<T = unknown>(uuid?: string, e?: Error | null, data?: T): Promise<void>;
/**
* It sends a message to the client
*/
send<T extends BufferLike>(message: T, cb?: (err?: Error) => void): void;
/**
* It sends a message to the client and waits for a confirmation
*/
confirm<T = unknown>(message: T): Promise<void>;
/**
* Check the current auth has the permission for the given scope
*/
verifyAuth(scope?: TransferMethod): Promise<void>;
/**
* Invoke a function and return its result to the client
*/
executeAndRespond<T = unknown>(uuid: string, fn: () => T): Promise<void>;
/**
* Lifecycle called on error or when the ws connection is closed
*/
teardown(): Promise<void> | void;
/**
* Lifecycle called to cleanup the transfer state
*/
cleanup(): Promise<void> | void;
init(...args: unknown[]): unknown;
end(...args: unknown[]): unknown;
status(...args: unknown[]): unknown;
onMessage(message: RawData, isBinary: boolean): Promise<void> | void;
onClose(code: number, reason: Buffer): Promise<void> | void;
onError(err: Error): Promise<void> | void;
onInfo(message: string): Promise<void> | void;
onWarning(message: string): Promise<void> | void;
}
export {};
//# sourceMappingURL=abstract.d.ts.map

View File

@@ -0,0 +1 @@
{"version":3,"file":"abstract.d.ts","sourceRoot":"","sources":["../../../../src/strapi/remote/handlers/abstract.ts"],"names":[],"mappings":";AAAA,OAAO,KAAK,EAAE,SAAS,EAAE,OAAO,EAAE,MAAM,IAAI,CAAC;AAE7C,OAAO,KAAK,EAAE,oBAAoB,EAAE,MAAM,aAAa,CAAC;AACxD,OAAO,KAAK,EAAE,cAAc,EAAE,MAAM,cAAc,CAAC;AACnD,OAAO,KAAK,EAAE,mBAAmB,EAAE,MAAM,2BAA2B,CAAC;AAErE,KAAK,UAAU,GAAG,UAAU,CAAC,SAAS,CAAC,MAAM,CAAC,CAAC,CAAC,CAAC,CAAC,CAAC;AAEnD,MAAM,WAAW,aAAa;IAC5B,EAAE,CAAC,EAAE,MAAM,CAAC;IACZ,SAAS,CAAC,EAAE,MAAM,CAAC;IACnB,QAAQ,CAAC,EAAE;QACT,IAAI,CAAC,EAAE,MAAM,CAAC;QACd,CAAC,CAAC,EAAE,KAAK,GAAG,IAAI,CAAC;QACjB,IAAI,CAAC,EAAE,OAAO,CAAC;KAChB,CAAC;CACH;AAED,MAAM,WAAW,OAAO;IAEtB,IAAI,UAAU,IAAI,aAAa,CAAC,IAAI,CAAC,CAAC;IACtC,IAAI,UAAU,CAAC,EAAE,EAAE,aAAa,CAAC,IAAI,CAAC,EAAE;IAGxC,IAAI,SAAS,IAAI,aAAa,CAAC,WAAW,CAAC,CAAC;IAC5C,IAAI,SAAS,CAAC,EAAE,EAAE,aAAa,CAAC,WAAW,CAAC,EAAE;IAE9C,IAAI,QAAQ,IAAI,aAAa,CAAC,UAAU,CAAC,CAAC;IAC1C,IAAI,QAAQ,CAAC,QAAQ,EAAE,aAAa,CAAC,UAAU,CAAC,EAAE;IAElD,IAAI,WAAW,IAAI,mBAAmB,CAAC;IAGvC,OAAO,CAAC,IAAI,EAAE,MAAM,GAAG,IAAI,CAAC;IAG5B,OAAO,CAAC,IAAI,EAAE,MAAM,GAAG,OAAO,CAAC;IAE/B;;OAEG;IACH,iBAAiB,IAAI,OAAO,CAAC;IAE7B;;OAEG;IACH,mBAAmB,IAAI,IAAI,CAAC;IAE5B;;OAEG;IACH,0BAA0B,CAAC,OAAO,EAAE,MAAM,GAAG,OAAO,CAAC,OAAO,IAAI,oBAAoB,CAAC;IAIrF;;OAEG;IACH,OAAO,CAAC,CAAC,GAAG,OAAO,EAAE,IAAI,CAAC,EAAE,MAAM,EAAE,CAAC,CAAC,EAAE,KAAK,GAAG,IAAI,EAAE,IAAI,CAAC,EAAE,CAAC,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;IAE/E;;OAEG;IACH,IAAI,CAAC,CAAC,SAAS,UAAU,EAAE,OAAO,EAAE,CAAC,EAAE,EAAE,CAAC,EAAE,CAAC,GAAG,CAAC,EAAE,KAAK,KAAK,IAAI,GAAG,IAAI,CAAC;IAEzE;;OAEG;IACH,OAAO,CAAC,CAAC,GAAG,OAAO,EAAE,OAAO,EAAE,CAAC,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;IAIhD;;OAEG;IACH,UAAU,CAAC,KAAK,CAAC,EAAE,cAAc,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;IAElD;;OAEG;IACH,iBAAiB,CAAC,CAAC,GAAG,OAAO,EAAE,IAAI,EAAE,MAAM,EAAE,EAAE,EAAE,MAAM,CAAC,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;IAIzE;;OAEG;IACH,QAAQ,IAAI,OAAO,CAAC,IAAI,CAAC,GAAG,IAAI,CAAC;IAEjC;;OAEG;IACH,OAAO,IAAI,OAAO,CAAC,IAAI,CAAC,GAAG,IAAI,CAAC;IAGhC,IAAI,CAAC,GAAG,IAAI,EAAE,OAAO,EAAE,GAAG,OAAO,CAAC;IAClC,GAAG,CAAC,GAAG,IAAI,EAAE,OAAO,EAAE,GAAG,OAAO,CAAC;IACjC,MAAM,CAAC,GAAG,IAAI,EAAE,OAAO,EAAE,GAAG,OAAO,CAAC;IAGpC,SAAS,CAAC,OAAO,EAAE,OAAO,EAAE,QAAQ,EAAE,OAAO,GAAG,OAAO,CAAC,IAAI,CAAC,GAAG,IAAI,CAAC;IACrE,OAAO,CAAC,IAAI,EAAE,MAAM,EAAE,MAAM,EAAE,MAAM,GAAG,OAAO,CAAC,IAAI,CAAC,GAAG,IAAI,CAAC;IAC5D,OAAO,CAAC,GAAG,EAAE,KAAK,GAAG,OAAO,CAAC,IAAI,CAAC,GAAG,IAAI,CAAC;IAC1C,MAAM,CAAC,OAAO,EAAE,MAAM,GAAG,OAAO,CAAC,IAAI,CAAC,GAAG,IAAI,CAAC;IAC9C,SAAS,CAAC,OAAO,EAAE,MAAM,GAAG,OAAO,CAAC,IAAI,CAAC,GAAG,IAAI,CAAC;CAClD"}

View File

@@ -0,0 +1,3 @@
export declare const VALID_TRANSFER_COMMANDS: readonly ["init", "end", "status"];
export type ValidTransferCommand = (typeof VALID_TRANSFER_COMMANDS)[number];
//# sourceMappingURL=constants.d.ts.map

View File

@@ -0,0 +1 @@
{"version":3,"file":"constants.d.ts","sourceRoot":"","sources":["../../../../src/strapi/remote/handlers/constants.ts"],"names":[],"mappings":"AAAA,eAAO,MAAM,uBAAuB,oCAAqC,CAAC;AAC1E,MAAM,MAAM,oBAAoB,GAAG,CAAC,OAAO,uBAAuB,CAAC,CAAC,MAAM,CAAC,CAAC"}

View File

@@ -0,0 +1,10 @@
'use strict';
const VALID_TRANSFER_COMMANDS = [
'init',
'end',
'status'
];
exports.VALID_TRANSFER_COMMANDS = VALID_TRANSFER_COMMANDS;
//# sourceMappingURL=constants.js.map

View File

@@ -0,0 +1 @@
{"version":3,"file":"constants.js","sources":["../../../../src/strapi/remote/handlers/constants.ts"],"sourcesContent":["export const VALID_TRANSFER_COMMANDS = ['init', 'end', 'status'] as const;\nexport type ValidTransferCommand = (typeof VALID_TRANSFER_COMMANDS)[number];\n"],"names":["VALID_TRANSFER_COMMANDS"],"mappings":";;MAAaA,uBAA0B,GAAA;AAAC,IAAA,MAAA;AAAQ,IAAA,KAAA;AAAO,IAAA;;;;;"}

View File

@@ -0,0 +1,8 @@
const VALID_TRANSFER_COMMANDS = [
'init',
'end',
'status'
];
export { VALID_TRANSFER_COMMANDS };
//# sourceMappingURL=constants.mjs.map

View File

@@ -0,0 +1 @@
{"version":3,"file":"constants.mjs","sources":["../../../../src/strapi/remote/handlers/constants.ts"],"sourcesContent":["export const VALID_TRANSFER_COMMANDS = ['init', 'end', 'status'] as const;\nexport type ValidTransferCommand = (typeof VALID_TRANSFER_COMMANDS)[number];\n"],"names":["VALID_TRANSFER_COMMANDS"],"mappings":"MAAaA,uBAA0B,GAAA;AAAC,IAAA,MAAA;AAAQ,IAAA,KAAA;AAAO,IAAA;;;;;"}

View File

@@ -0,0 +1,4 @@
export { createPushController, type PushHandler } from './push';
export { createPullController, type PullHandler } from './pull';
export { handlerControllerFactory, type HandlerOptions } from './utils';
//# sourceMappingURL=index.d.ts.map

View File

@@ -0,0 +1 @@
{"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../../../../src/strapi/remote/handlers/index.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,oBAAoB,EAAE,KAAK,WAAW,EAAE,MAAM,QAAQ,CAAC;AAChE,OAAO,EAAE,oBAAoB,EAAE,KAAK,WAAW,EAAE,MAAM,QAAQ,CAAC;AAChE,OAAO,EAAE,wBAAwB,EAAE,KAAK,cAAc,EAAE,MAAM,SAAS,CAAC"}

View File

@@ -0,0 +1,12 @@
'use strict';
var push = require('./push.js');
var pull = require('./pull.js');
var utils = require('./utils.js');
exports.createPushController = push.createPushController;
exports.createPullController = pull.createPullController;
exports.handlerControllerFactory = utils.handlerControllerFactory;
//# sourceMappingURL=index.js.map

View File

@@ -0,0 +1 @@
{"version":3,"file":"index.js","sources":[],"sourcesContent":[],"names":[],"mappings":";;;;;;;;;;"}

View File

@@ -0,0 +1,4 @@
export { createPushController } from './push.mjs';
export { createPullController } from './pull.mjs';
export { handlerControllerFactory } from './utils.mjs';
//# sourceMappingURL=index.mjs.map

View File

@@ -0,0 +1 @@
{"version":3,"file":"index.mjs","sources":[],"sourcesContent":[],"names":[],"mappings":";;"}

View File

@@ -0,0 +1,23 @@
/// <reference types="node" />
/// <reference types="koa" />
import { Readable } from 'stream';
import { Handler } from './abstract';
import { ILocalStrapiSourceProvider } from '../../providers';
import type { TransferStage, Protocol } from '../../../../types';
declare const VALID_TRANSFER_ACTIONS: readonly ["bootstrap", "close", "getMetadata", "getSchemas"];
type PullTransferAction = (typeof VALID_TRANSFER_ACTIONS)[number];
export interface PullHandler extends Handler {
provider?: ILocalStrapiSourceProvider;
streams?: {
[stage in TransferStage]?: Readable;
};
assertValidTransferAction(action: string): asserts action is PullTransferAction;
onTransferMessage(msg: Protocol.Client.TransferMessage): Promise<unknown> | unknown;
onTransferAction(msg: Protocol.Client.Action): Promise<unknown> | unknown;
onTransferStep(msg: Protocol.Client.TransferPullMessage): Promise<unknown> | unknown;
createReadableStreamForStep(step: TransferStage): Promise<void>;
flush(stage: TransferStage, id: string): Promise<void> | void;
}
export declare const createPullController: (options: import("./utils").HandlerOptions) => (ctx: import("koa").Context) => Promise<void>;
export {};
//# sourceMappingURL=pull.d.ts.map

View File

@@ -0,0 +1 @@
{"version":3,"file":"pull.d.ts","sourceRoot":"","sources":["../../../../src/strapi/remote/handlers/pull.ts"],"names":[],"mappings":";;AAAA,OAAO,EAAE,QAAQ,EAAE,MAAM,QAAQ,CAAC;AAIlC,OAAO,EAAE,OAAO,EAAE,MAAM,YAAY,CAAC;AAErC,OAAO,EAAmC,0BAA0B,EAAE,MAAM,iBAAiB,CAAC;AAE9F,OAAO,KAAK,EAAU,aAAa,EAAE,QAAQ,EAAE,MAAM,mBAAmB,CAAC;AAIzE,QAAA,MAAM,sBAAsB,8DAA+D,CAAC;AAE5F,KAAK,kBAAkB,GAAG,CAAC,OAAO,sBAAsB,CAAC,CAAC,MAAM,CAAC,CAAC;AAElE,MAAM,WAAW,WAAY,SAAQ,OAAO;IAC1C,QAAQ,CAAC,EAAE,0BAA0B,CAAC;IAEtC,OAAO,CAAC,EAAE;SAAG,KAAK,IAAI,aAAa,CAAC,CAAC,EAAE,QAAQ;KAAE,CAAC;IAElD,yBAAyB,CAAC,MAAM,EAAE,MAAM,GAAG,OAAO,CAAC,MAAM,IAAI,kBAAkB,CAAC;IAEhF,iBAAiB,CAAC,GAAG,EAAE,QAAQ,CAAC,MAAM,CAAC,eAAe,GAAG,OAAO,CAAC,OAAO,CAAC,GAAG,OAAO,CAAC;IACpF,gBAAgB,CAAC,GAAG,EAAE,QAAQ,CAAC,MAAM,CAAC,MAAM,GAAG,OAAO,CAAC,OAAO,CAAC,GAAG,OAAO,CAAC;IAC1E,cAAc,CAAC,GAAG,EAAE,QAAQ,CAAC,MAAM,CAAC,mBAAmB,GAAG,OAAO,CAAC,OAAO,CAAC,GAAG,OAAO,CAAC;IAErF,2BAA2B,CAAC,IAAI,EAAE,aAAa,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;IAEhE,KAAK,CAAC,KAAK,EAAE,aAAa,EAAE,EAAE,EAAE,MAAM,GAAG,OAAO,CAAC,IAAI,CAAC,GAAG,IAAI,CAAC;CAC/D;AAED,eAAO,MAAM,oBAAoB,8FAyW9B,CAAC"}

View File

@@ -0,0 +1,348 @@
'use strict';
var stream = require('stream');
var crypto = require('crypto');
var utils = require('./utils.js');
require('path');
require('fs-extra');
var providers = require('../../../errors/providers.js');
require('../../queries/entity.js');
require('lodash/fp');
require('events');
require('lodash');
require('@strapi/utils');
require('../../providers/local-destination/strategies/restore/configuration.js');
var index = require('../../providers/local-source/index.js');
require('ws');
const TRANSFER_KIND = 'pull';
const VALID_TRANSFER_ACTIONS = [
'bootstrap',
'close',
'getMetadata',
'getSchemas'
];
const createPullController = utils.handlerControllerFactory((proto)=>({
isTransferStarted () {
return proto.isTransferStarted.call(this) && this.provider !== undefined;
},
verifyAuth () {
return proto.verifyAuth.call(this, TRANSFER_KIND);
},
cleanup () {
proto.cleanup.call(this);
this.streams = {};
delete this.provider;
},
onInfo (message) {
this.diagnostics?.report({
details: {
message,
origin: 'pull-handler',
createdAt: new Date()
},
kind: 'info'
});
},
onWarning (message) {
this.diagnostics?.report({
details: {
message,
createdAt: new Date(),
origin: 'pull-handler'
},
kind: 'warning'
});
},
onError (error) {
this.diagnostics?.report({
details: {
message: error.message,
error,
createdAt: new Date(),
name: error.name,
severity: 'fatal'
},
kind: 'error'
});
},
assertValidTransferAction (action) {
// Abstract the constant to string[] to allow looser check on the given action
const validActions = VALID_TRANSFER_ACTIONS;
if (validActions.includes(action)) {
return;
}
throw new providers.ProviderTransferError(`Invalid action provided: "${action}"`, {
action,
validActions: Object.keys(VALID_TRANSFER_ACTIONS)
});
},
async onMessage (raw) {
const msg = JSON.parse(raw.toString());
if (!utils.isDataTransferMessage(msg)) {
return;
}
if (!msg.uuid) {
await this.respond(undefined, new Error('Missing uuid in message'));
}
if (proto.hasUUID(msg.uuid)) {
const previousResponse = proto.response;
if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
}
return;
}
const { uuid, type } = msg;
proto.addUUID(uuid);
// Regular command message (init, end, status)
if (type === 'command') {
const { command } = msg;
this.onInfo(`received command:${command} uuid:${uuid}`);
await this.executeAndRespond(uuid, ()=>{
this.assertValidTransferCommand(command);
// The status command don't have params
if (command === 'status') {
return this.status();
}
return this[command](msg.params);
});
} else if (type === 'transfer') {
this.onInfo(`received transfer action:${msg.action} step:${msg.kind} uuid:${uuid}`);
await this.executeAndRespond(uuid, async ()=>{
await this.verifyAuth();
this.assertValidTransfer();
return this.onTransferMessage(msg);
});
} else {
await this.respond(uuid, new Error('Bad Request'));
}
},
async onTransferMessage (msg) {
const { kind } = msg;
if (kind === 'action') {
return this.onTransferAction(msg);
}
if (kind === 'step') {
return this.onTransferStep(msg);
}
},
async onTransferAction (msg) {
const { action } = msg;
this.assertValidTransferAction(action);
if (action === 'bootstrap') {
return this.provider?.[action](this.diagnostics);
}
return this.provider?.[action]();
},
async flush (stage, id) {
const batchSize = 1024 * 1024;
let batch = [];
const stream = this.streams?.[stage];
const batchLength = ()=>Buffer.byteLength(JSON.stringify(batch));
const maybeConfirm = async (data)=>{
try {
await this.confirm(data);
} catch (error) {
// Handle the error, log it, or take other appropriate actions
strapi?.log.error(`[Data transfer] Message confirmation failed: ${error?.message}`);
this.onError(error);
}
};
const sendBatch = async ()=>{
await this.confirm({
type: 'transfer',
data: batch,
ended: false,
error: null,
id
});
batch = [];
};
if (!stream) {
throw new providers.ProviderTransferError(`No available stream found for ${stage}`);
}
try {
for await (const chunk of stream){
if (stage !== 'assets') {
batch.push(chunk);
if (batchLength() >= batchSize) {
await sendBatch();
}
} else {
await this.confirm({
type: 'transfer',
data: [
chunk
],
ended: false,
error: null,
id
});
}
}
if (batch.length > 0 && stage !== 'assets') {
await sendBatch();
}
await this.confirm({
type: 'transfer',
data: null,
ended: true,
error: null,
id
});
} catch (e) {
// TODO: if this confirm fails, can we abort the whole transfer?
await maybeConfirm({
type: 'transfer',
data: null,
ended: true,
error: e,
id
});
}
},
async onTransferStep (msg) {
const { step, action } = msg;
if (action === 'start') {
if (this.streams?.[step] instanceof stream.Readable) {
throw new Error('Stream already created, something went wrong');
}
const flushUUID = crypto.randomUUID();
await this.createReadableStreamForStep(step);
this.flush(step, flushUUID);
return {
ok: true,
id: flushUUID
};
}
if (action === 'end') {
const stream = this.streams?.[step];
if (stream?.readableEnded === false) {
await new Promise((resolve)=>{
stream?.on('close', resolve).destroy();
});
}
delete this.streams?.[step];
return {
ok: true
};
}
},
async createReadableStreamForStep (step) {
const mapper = {
entities: ()=>this.provider?.createEntitiesReadStream(),
links: ()=>this.provider?.createLinksReadStream(),
configuration: ()=>this.provider?.createConfigurationReadStream(),
assets: ()=>{
const assets = this.provider?.createAssetsReadStream();
let batch = [];
const batchLength = ()=>{
return batch.reduce((acc, chunk)=>chunk.action === 'stream' ? acc + chunk.data.byteLength : acc, 0);
};
const BATCH_MAX_SIZE = 1024 * 1024; // 1MB
if (!assets) {
throw new Error('Assets read stream could not be created');
}
/**
* Generates batches of 1MB of data from the assets stream to avoid
* sending too many small chunks
*
* @param stream Assets stream from the local source provider
*/ async function* generator(stream) {
let hasStarted = false;
let assetID = '';
for await (const chunk of stream){
const { stream: assetStream, ...assetData } = chunk;
if (!hasStarted) {
assetID = crypto.randomUUID();
// Start the transfer of a new asset
batch.push({
action: 'start',
assetID,
data: assetData
});
hasStarted = true;
}
for await (const assetChunk of assetStream){
// Add the asset data to the batch
batch.push({
action: 'stream',
assetID,
data: assetChunk
});
// if the batch size is bigger than BATCH_MAX_SIZE stream the batch
if (batchLength() >= BATCH_MAX_SIZE) {
yield batch;
batch = [];
}
}
// All the asset data has been streamed and gets ready for the next one
hasStarted = false;
batch.push({
action: 'end',
assetID
});
yield batch;
batch = [];
}
}
return stream.Readable.from(generator(assets));
}
};
if (!(step in mapper)) {
throw new Error('Invalid transfer step, impossible to create a stream');
}
if (!this.streams) {
throw new Error('Invalid transfer state');
}
this.streams[step] = await mapper[step]();
},
// Commands
async init () {
if (this.transferID || this.provider) {
throw new Error('Transfer already in progress');
}
await this.verifyAuth();
this.transferID = crypto.randomUUID();
this.startedAt = Date.now();
this.streams = {};
this.provider = index.createLocalStrapiSourceProvider({
autoDestroy: false,
getStrapi: ()=>strapi
});
return {
transferID: this.transferID
};
},
async end (params) {
await this.verifyAuth();
if (this.transferID !== params?.transferID) {
throw new providers.ProviderTransferError('Bad transfer ID provided');
}
this.cleanup();
return {
ok: true
};
},
async status () {
const isStarted = this.isTransferStarted();
if (!isStarted) {
const startedAt = this.startedAt;
return {
active: true,
kind: TRANSFER_KIND,
startedAt,
elapsed: Date.now() - startedAt
};
}
return {
active: false,
kind: null,
elapsed: null,
startedAt: null
};
}
}));
exports.createPullController = createPullController;
//# sourceMappingURL=pull.js.map

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,346 @@
import { Readable } from 'stream';
import { randomUUID } from 'crypto';
import { handlerControllerFactory, isDataTransferMessage } from './utils.mjs';
import 'path';
import 'fs-extra';
import { ProviderTransferError } from '../../../errors/providers.mjs';
import '../../queries/entity.mjs';
import 'lodash/fp';
import 'events';
import 'lodash';
import '@strapi/utils';
import '../../providers/local-destination/strategies/restore/configuration.mjs';
import { createLocalStrapiSourceProvider } from '../../providers/local-source/index.mjs';
import 'ws';
const TRANSFER_KIND = 'pull';
const VALID_TRANSFER_ACTIONS = [
'bootstrap',
'close',
'getMetadata',
'getSchemas'
];
const createPullController = handlerControllerFactory((proto)=>({
isTransferStarted () {
return proto.isTransferStarted.call(this) && this.provider !== undefined;
},
verifyAuth () {
return proto.verifyAuth.call(this, TRANSFER_KIND);
},
cleanup () {
proto.cleanup.call(this);
this.streams = {};
delete this.provider;
},
onInfo (message) {
this.diagnostics?.report({
details: {
message,
origin: 'pull-handler',
createdAt: new Date()
},
kind: 'info'
});
},
onWarning (message) {
this.diagnostics?.report({
details: {
message,
createdAt: new Date(),
origin: 'pull-handler'
},
kind: 'warning'
});
},
onError (error) {
this.diagnostics?.report({
details: {
message: error.message,
error,
createdAt: new Date(),
name: error.name,
severity: 'fatal'
},
kind: 'error'
});
},
assertValidTransferAction (action) {
// Abstract the constant to string[] to allow looser check on the given action
const validActions = VALID_TRANSFER_ACTIONS;
if (validActions.includes(action)) {
return;
}
throw new ProviderTransferError(`Invalid action provided: "${action}"`, {
action,
validActions: Object.keys(VALID_TRANSFER_ACTIONS)
});
},
async onMessage (raw) {
const msg = JSON.parse(raw.toString());
if (!isDataTransferMessage(msg)) {
return;
}
if (!msg.uuid) {
await this.respond(undefined, new Error('Missing uuid in message'));
}
if (proto.hasUUID(msg.uuid)) {
const previousResponse = proto.response;
if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
}
return;
}
const { uuid, type } = msg;
proto.addUUID(uuid);
// Regular command message (init, end, status)
if (type === 'command') {
const { command } = msg;
this.onInfo(`received command:${command} uuid:${uuid}`);
await this.executeAndRespond(uuid, ()=>{
this.assertValidTransferCommand(command);
// The status command don't have params
if (command === 'status') {
return this.status();
}
return this[command](msg.params);
});
} else if (type === 'transfer') {
this.onInfo(`received transfer action:${msg.action} step:${msg.kind} uuid:${uuid}`);
await this.executeAndRespond(uuid, async ()=>{
await this.verifyAuth();
this.assertValidTransfer();
return this.onTransferMessage(msg);
});
} else {
await this.respond(uuid, new Error('Bad Request'));
}
},
async onTransferMessage (msg) {
const { kind } = msg;
if (kind === 'action') {
return this.onTransferAction(msg);
}
if (kind === 'step') {
return this.onTransferStep(msg);
}
},
async onTransferAction (msg) {
const { action } = msg;
this.assertValidTransferAction(action);
if (action === 'bootstrap') {
return this.provider?.[action](this.diagnostics);
}
return this.provider?.[action]();
},
async flush (stage, id) {
const batchSize = 1024 * 1024;
let batch = [];
const stream = this.streams?.[stage];
const batchLength = ()=>Buffer.byteLength(JSON.stringify(batch));
const maybeConfirm = async (data)=>{
try {
await this.confirm(data);
} catch (error) {
// Handle the error, log it, or take other appropriate actions
strapi?.log.error(`[Data transfer] Message confirmation failed: ${error?.message}`);
this.onError(error);
}
};
const sendBatch = async ()=>{
await this.confirm({
type: 'transfer',
data: batch,
ended: false,
error: null,
id
});
batch = [];
};
if (!stream) {
throw new ProviderTransferError(`No available stream found for ${stage}`);
}
try {
for await (const chunk of stream){
if (stage !== 'assets') {
batch.push(chunk);
if (batchLength() >= batchSize) {
await sendBatch();
}
} else {
await this.confirm({
type: 'transfer',
data: [
chunk
],
ended: false,
error: null,
id
});
}
}
if (batch.length > 0 && stage !== 'assets') {
await sendBatch();
}
await this.confirm({
type: 'transfer',
data: null,
ended: true,
error: null,
id
});
} catch (e) {
// TODO: if this confirm fails, can we abort the whole transfer?
await maybeConfirm({
type: 'transfer',
data: null,
ended: true,
error: e,
id
});
}
},
async onTransferStep (msg) {
const { step, action } = msg;
if (action === 'start') {
if (this.streams?.[step] instanceof Readable) {
throw new Error('Stream already created, something went wrong');
}
const flushUUID = randomUUID();
await this.createReadableStreamForStep(step);
this.flush(step, flushUUID);
return {
ok: true,
id: flushUUID
};
}
if (action === 'end') {
const stream = this.streams?.[step];
if (stream?.readableEnded === false) {
await new Promise((resolve)=>{
stream?.on('close', resolve).destroy();
});
}
delete this.streams?.[step];
return {
ok: true
};
}
},
async createReadableStreamForStep (step) {
const mapper = {
entities: ()=>this.provider?.createEntitiesReadStream(),
links: ()=>this.provider?.createLinksReadStream(),
configuration: ()=>this.provider?.createConfigurationReadStream(),
assets: ()=>{
const assets = this.provider?.createAssetsReadStream();
let batch = [];
const batchLength = ()=>{
return batch.reduce((acc, chunk)=>chunk.action === 'stream' ? acc + chunk.data.byteLength : acc, 0);
};
const BATCH_MAX_SIZE = 1024 * 1024; // 1MB
if (!assets) {
throw new Error('Assets read stream could not be created');
}
/**
* Generates batches of 1MB of data from the assets stream to avoid
* sending too many small chunks
*
* @param stream Assets stream from the local source provider
*/ async function* generator(stream) {
let hasStarted = false;
let assetID = '';
for await (const chunk of stream){
const { stream: assetStream, ...assetData } = chunk;
if (!hasStarted) {
assetID = randomUUID();
// Start the transfer of a new asset
batch.push({
action: 'start',
assetID,
data: assetData
});
hasStarted = true;
}
for await (const assetChunk of assetStream){
// Add the asset data to the batch
batch.push({
action: 'stream',
assetID,
data: assetChunk
});
// if the batch size is bigger than BATCH_MAX_SIZE stream the batch
if (batchLength() >= BATCH_MAX_SIZE) {
yield batch;
batch = [];
}
}
// All the asset data has been streamed and gets ready for the next one
hasStarted = false;
batch.push({
action: 'end',
assetID
});
yield batch;
batch = [];
}
}
return Readable.from(generator(assets));
}
};
if (!(step in mapper)) {
throw new Error('Invalid transfer step, impossible to create a stream');
}
if (!this.streams) {
throw new Error('Invalid transfer state');
}
this.streams[step] = await mapper[step]();
},
// Commands
async init () {
if (this.transferID || this.provider) {
throw new Error('Transfer already in progress');
}
await this.verifyAuth();
this.transferID = randomUUID();
this.startedAt = Date.now();
this.streams = {};
this.provider = createLocalStrapiSourceProvider({
autoDestroy: false,
getStrapi: ()=>strapi
});
return {
transferID: this.transferID
};
},
async end (params) {
await this.verifyAuth();
if (this.transferID !== params?.transferID) {
throw new ProviderTransferError('Bad transfer ID provided');
}
this.cleanup();
return {
ok: true
};
},
async status () {
const isStarted = this.isTransferStarted();
if (!isStarted) {
const startedAt = this.startedAt;
return {
active: true,
kind: TRANSFER_KIND,
startedAt,
elapsed: Date.now() - startedAt
};
}
return {
active: false,
kind: null,
elapsed: null,
startedAt: null
};
}
}));
export { createPullController };
//# sourceMappingURL=pull.mjs.map

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,79 @@
/// <reference types="node" />
/// <reference types="koa" />
import { Writable, PassThrough } from 'stream';
import type { TransferFlow } from '../flows';
import type { TransferStage, IAsset, Protocol } from '../../../../types';
import { createLocalStrapiDestinationProvider } from '../../providers';
import { Handler } from './abstract';
declare const VALID_TRANSFER_ACTIONS: readonly ["bootstrap", "close", "rollback", "beforeTransfer", "getMetadata", "getSchemas"];
type PushTransferAction = (typeof VALID_TRANSFER_ACTIONS)[number];
export interface PushHandler extends Handler {
/**
* Local Strapi Destination Provider used to write data to the current Strapi instance
*/
provider?: ReturnType<typeof createLocalStrapiDestinationProvider>;
/**
* Holds all the stages' stream for the current transfer handler (one registry per connection)
*/
streams?: {
[stage in TransferStage]?: Writable;
};
stats: {
[stage in Exclude<TransferStage, 'schemas'>]: Protocol.Client.Stats;
};
/**
* Holds all the transferred assets for the current transfer handler (one registry per connection)
*/
assets: {
[filepath: string]: IAsset & {
stream: PassThrough;
};
};
/**
* Ochestrate and manage the transfer messages' ordering
*/
flow?: TransferFlow;
/**
* Checks that the given action is a valid push transfer action
*/
assertValidTransferAction(action: string): asserts action is PushTransferAction;
/**
* Create a new writable stream for the given step in the handler's stream registry
*/
createWritableStreamForStep(step: TransferStage): Promise<void>;
/**
* Simple override of the auth verification
*/
verifyAuth(): Promise<void>;
/**
* Callback when receiving a regular transfer message
*/
onTransferMessage(msg: Protocol.Client.TransferMessage): Promise<unknown> | unknown;
/**
* Callback when receiving a transfer action message
*/
onTransferAction(msg: Protocol.Client.Action): Promise<unknown> | unknown;
/**
* Callback when receiving a transfer step message
*/
onTransferStep(msg: Protocol.Client.TransferPushMessage): Promise<unknown> | unknown;
/**
* Start streaming an asset
*/
streamAsset(this: PushHandler, payload: Protocol.Client.GetTransferPushStreamData<'assets'>): Promise<void>;
/**
* Try to move to a specific transfer stage & lock the step
*/
lockTransferStep(stage: TransferStage): void;
/**
* Try to move to unlock the current step
*/
unlockTransferStep(stage: TransferStage): void;
/**
* Checks whether it's possible to stream a chunk for the given stage
*/
assertValidStreamTransferStep(stage: TransferStage): void;
}
export declare const createPushController: (options: import("./utils").HandlerOptions) => (ctx: import("koa").Context) => Promise<void>;
export {};
//# sourceMappingURL=push.d.ts.map

View File

@@ -0,0 +1 @@
{"version":3,"file":"push.d.ts","sourceRoot":"","sources":["../../../../src/strapi/remote/handlers/push.ts"],"names":[],"mappings":";;AACA,OAAO,EAAE,QAAQ,EAAE,WAAW,EAAE,MAAM,QAAQ,CAAC;AAG/C,OAAO,KAAK,EAAE,YAAY,EAAQ,MAAM,UAAU,CAAC;AACnD,OAAO,KAAK,EAAE,aAAa,EAAE,MAAM,EAAE,QAAQ,EAAE,MAAM,mBAAmB,CAAC;AAGzE,OAAO,EAAE,oCAAoC,EAAE,MAAM,iBAAiB,CAAC;AAEvE,OAAO,EAAE,OAAO,EAAE,MAAM,YAAY,CAAC;AAGrC,QAAA,MAAM,sBAAsB,4FAOlB,CAAC;AAEX,KAAK,kBAAkB,GAAG,CAAC,OAAO,sBAAsB,CAAC,CAAC,MAAM,CAAC,CAAC;AAIlE,MAAM,WAAW,WAAY,SAAQ,OAAO;IAC1C;;OAEG;IACH,QAAQ,CAAC,EAAE,UAAU,CAAC,OAAO,oCAAoC,CAAC,CAAC;IAEnE;;OAEG;IACH,OAAO,CAAC,EAAE;SAAG,KAAK,IAAI,aAAa,CAAC,CAAC,EAAE,QAAQ;KAAE,CAAC;IAElD,KAAK,EAAE;SACJ,KAAK,IAAI,OAAO,CAAC,aAAa,EAAE,SAAS,CAAC,GAAG,QAAQ,CAAC,MAAM,CAAC,KAAK;KACpE,CAAC;IAEF;;OAEG;IACH,MAAM,EAAE;QAAE,CAAC,QAAQ,EAAE,MAAM,GAAG,MAAM,GAAG;YAAE,MAAM,EAAE,WAAW,CAAA;SAAE,CAAA;KAAE,CAAC;IAEjE;;OAEG;IACH,IAAI,CAAC,EAAE,YAAY,CAAC;IAEpB;;OAEG;IACH,yBAAyB,CAAC,MAAM,EAAE,MAAM,GAAG,OAAO,CAAC,MAAM,IAAI,kBAAkB,CAAC;IAEhF;;OAEG;IACH,2BAA2B,CAAC,IAAI,EAAE,aAAa,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;IAEhE;;OAEG;IACH,UAAU,IAAI,OAAO,CAAC,IAAI,CAAC,CAAC;IAE5B;;OAEG;IACH,iBAAiB,CAAC,GAAG,EAAE,QAAQ,CAAC,MAAM,CAAC,eAAe,GAAG,OAAO,CAAC,OAAO,CAAC,GAAG,OAAO,CAAC;IAEpF;;OAEG;IACH,gBAAgB,CAAC,GAAG,EAAE,QAAQ,CAAC,MAAM,CAAC,MAAM,GAAG,OAAO,CAAC,OAAO,CAAC,GAAG,OAAO,CAAC;IAE1E;;OAEG;IACH,cAAc,CAAC,GAAG,EAAE,QAAQ,CAAC,MAAM,CAAC,mBAAmB,GAAG,OAAO,CAAC,OAAO,CAAC,GAAG,OAAO,CAAC;IAErF;;OAEG;IACH,WAAW,CACT,IAAI,EAAE,WAAW,EACjB,OAAO,EAAE,QAAQ,CAAC,MAAM,CAAC,yBAAyB,CAAC,QAAQ,CAAC,GAC3D,OAAO,CAAC,IAAI,CAAC,CAAC;IAIjB;;OAEG;IACH,gBAAgB,CAAC,KAAK,EAAE,aAAa,GAAG,IAAI,CAAC;IAE7C;;OAEG;IACH,kBAAkB,CAAC,KAAK,EAAE,aAAa,GAAG,IAAI,CAAC;IAE/C;;OAEG;IACH,6BAA6B,CAAC,KAAK,EAAE,aAAa,GAAG,IAAI,CAAC;CAC3D;AAcD,eAAO,MAAM,oBAAoB,8FA8Z9B,CAAC"}

View File

@@ -0,0 +1,400 @@
'use strict';
var crypto = require('crypto');
var stream = require('stream');
var providers = require('../../../errors/providers.js');
var index$1 = require('../../providers/local-destination/index.js');
require('stream-chain');
require('../../queries/entity.js');
require('lodash/fp');
require('path');
require('fs-extra');
require('events');
require('ws');
var index = require('../flows/index.js');
var utils = require('./utils.js');
var _default = require('../flows/default.js');
const VALID_TRANSFER_ACTIONS = [
'bootstrap',
'close',
'rollback',
'beforeTransfer',
'getMetadata',
'getSchemas'
];
const TRANSFER_KIND = 'push';
const writeAsync = (stream, data)=>{
return new Promise((resolve, reject)=>{
stream.write(data, (error)=>{
if (error) {
reject(error);
}
resolve();
});
});
};
const createPushController = utils.handlerControllerFactory((proto)=>({
isTransferStarted () {
return proto.isTransferStarted.call(this) && this.provider !== undefined;
},
verifyAuth () {
return proto.verifyAuth.call(this, TRANSFER_KIND);
},
onInfo (message) {
this.diagnostics?.report({
details: {
message,
origin: 'push-handler',
createdAt: new Date()
},
kind: 'info'
});
},
onWarning (message) {
this.diagnostics?.report({
details: {
message,
createdAt: new Date(),
origin: 'push-handler'
},
kind: 'warning'
});
},
cleanup () {
proto.cleanup.call(this);
this.streams = {};
this.assets = {};
delete this.flow;
delete this.provider;
},
teardown () {
if (this.provider) {
this.provider.rollback();
}
proto.teardown.call(this);
},
assertValidTransfer () {
proto.assertValidTransfer.call(this);
if (this.provider === undefined) {
throw new Error('Invalid Transfer Process');
}
},
assertValidTransferAction (action) {
if (VALID_TRANSFER_ACTIONS.includes(action)) {
return;
}
throw new providers.ProviderTransferError(`Invalid action provided: "${action}"`, {
action,
validActions: Object.keys(VALID_TRANSFER_ACTIONS)
});
},
assertValidStreamTransferStep (stage) {
const currentStep = this.flow?.get();
const nextStep = {
kind: 'transfer',
stage
};
if (currentStep?.kind === 'transfer' && !currentStep.locked) {
throw new providers.ProviderTransferError(`You need to initialize the transfer stage (${nextStep}) before starting to stream data`);
}
if (this.flow?.cannot(nextStep)) {
throw new providers.ProviderTransferError(`Invalid stage (${nextStep}) provided for the current flow`, {
step: nextStep
});
}
},
async createWritableStreamForStep (step) {
const mapper = {
entities: ()=>this.provider?.createEntitiesWriteStream(),
links: ()=>this.provider?.createLinksWriteStream(),
configuration: ()=>this.provider?.createConfigurationWriteStream(),
assets: ()=>this.provider?.createAssetsWriteStream()
};
if (!(step in mapper)) {
throw new Error('Invalid transfer step, impossible to create a stream');
}
if (!this.streams) {
throw new Error('Invalid transfer state');
}
this.streams[step] = await mapper[step]();
},
async onMessage (raw) {
const msg = JSON.parse(raw.toString());
if (!utils.isDataTransferMessage(msg)) {
return;
}
if (!msg.uuid) {
await this.respond(undefined, new Error('Missing uuid in message'));
}
if (proto.hasUUID(msg.uuid)) {
const previousResponse = proto.response;
if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
}
return;
}
const { uuid, type } = msg;
proto.addUUID(uuid);
// Regular command message (init, end, status)
if (type === 'command') {
const { command } = msg;
this.onInfo(`received command:${command} uuid:${uuid}`);
await this.executeAndRespond(uuid, ()=>{
this.assertValidTransferCommand(command);
// The status command don't have params
if (command === 'status') {
return this.status();
}
return this[command](msg.params);
});
} else if (type === 'transfer') {
this.onInfo(`received transfer action:${msg.action} step:${msg.kind} uuid:${uuid}`);
await this.executeAndRespond(uuid, async ()=>{
await this.verifyAuth();
this.assertValidTransfer();
return this.onTransferMessage(msg);
});
} else {
await this.respond(uuid, new Error('Bad Request'));
}
},
async onTransferMessage (msg) {
const { kind } = msg;
if (kind === 'action') {
return this.onTransferAction(msg);
}
if (kind === 'step') {
return this.onTransferStep(msg);
}
},
lockTransferStep (stage) {
const currentStep = this.flow?.get();
const nextStep = {
kind: 'transfer',
stage
};
if (currentStep?.kind === 'transfer' && currentStep.locked) {
throw new providers.ProviderTransferError(`It's not possible to start a new transfer stage (${stage}) while another one is in progress (${currentStep.stage})`);
}
if (this.flow?.cannot(nextStep)) {
throw new providers.ProviderTransferError(`Invalid stage (${stage}) provided for the current flow`, {
step: nextStep
});
}
this.flow?.set({
...nextStep,
locked: true
});
},
unlockTransferStep (stage) {
const currentStep = this.flow?.get();
const nextStep = {
kind: 'transfer',
stage
};
// Cannot unlock if not locked (aka: started)
if (currentStep?.kind === 'transfer' && !currentStep.locked) {
throw new providers.ProviderTransferError(`You need to initialize the transfer stage (${stage}) before ending it`);
}
// Cannot unlock if invalid step provided
if (this.flow?.cannot(nextStep)) {
throw new providers.ProviderTransferError(`Invalid stage (${stage}) provided for the current flow`, {
step: nextStep
});
}
this.flow?.set({
...nextStep,
locked: false
});
},
async onTransferStep (msg) {
const { step: stage } = msg;
if (msg.action === 'start') {
this.lockTransferStep(stage);
if (this.streams?.[stage] instanceof stream.Writable) {
throw new Error('Stream already created, something went wrong');
}
await this.createWritableStreamForStep(stage);
this.stats[stage] = {
started: 0,
finished: 0
};
return {
ok: true
};
}
if (msg.action === 'stream') {
this.assertValidStreamTransferStep(stage);
// Stream operation on the current transfer stage
const stream = this.streams?.[stage];
if (!stream) {
throw new Error('You need to init first');
}
// Assets are nested streams
if (stage === 'assets') {
return this.streamAsset(msg.data);
}
// For all other steps
await Promise.all(msg.data.map(async (item)=>{
this.stats[stage].started += 1;
await writeAsync(stream, item);
this.stats[stage].finished += 1;
}));
}
if (msg.action === 'end') {
this.unlockTransferStep(stage);
const stream = this.streams?.[stage];
if (stream && !stream.closed) {
await new Promise((resolve, reject)=>{
stream.on('close', resolve).on('error', reject).end();
});
}
delete this.streams?.[stage];
return {
ok: true,
stats: this.stats[stage]
};
}
},
async onTransferAction (msg) {
const { action } = msg;
this.assertValidTransferAction(action);
const step = {
kind: 'action',
action
};
const isStepRegistered = this.flow?.has(step);
if (isStepRegistered) {
if (this.flow?.cannot(step)) {
throw new providers.ProviderTransferError(`Invalid action "${action}" found for the current flow `, {
action
});
}
this.flow?.set(step);
}
if (action === 'bootstrap') {
return this.provider?.[action](this.diagnostics);
}
return this.provider?.[action]();
},
async streamAsset (payload) {
const assetsStream = this.streams?.assets;
// TODO: close the stream upon receiving an 'end' event instead
if (payload === null) {
this.streams?.assets?.end();
return;
}
for (const item of payload){
const { action, assetID } = item;
if (!assetsStream) {
throw new Error('Stream not defined');
}
if (action === 'start') {
this.stats.assets.started += 1;
this.assets[assetID] = {
...item.data,
stream: new stream.PassThrough()
};
writeAsync(assetsStream, this.assets[assetID]);
}
if (action === 'stream') {
// The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array }
// We need to transform it back into a Buffer instance
const rawBuffer = item.data;
const chunk = Buffer.from(rawBuffer.data);
await writeAsync(this.assets[assetID].stream, chunk);
}
if (action === 'end') {
await new Promise((resolve, reject)=>{
const { stream: assetStream } = this.assets[assetID];
assetStream.on('close', ()=>{
this.stats.assets.finished += 1;
delete this.assets[assetID];
resolve();
}).on('error', reject).end();
});
}
}
},
onClose () {
this.teardown();
},
onError (err) {
this.teardown();
strapi.log.error(err);
},
// Commands
async init (params) {
if (this.transferID || this.provider) {
throw new Error('Transfer already in progress');
}
await this.verifyAuth();
this.transferID = crypto.randomUUID();
this.startedAt = Date.now();
this.assets = {};
this.streams = {};
this.stats = {
assets: {
started: 0,
finished: 0
},
configuration: {
started: 0,
finished: 0
},
entities: {
started: 0,
finished: 0
},
links: {
started: 0,
finished: 0
}
};
this.flow = index.createFlow(_default);
this.provider = index$1.createLocalStrapiDestinationProvider({
...params.options,
autoDestroy: false,
getStrapi: ()=>strapi
});
this.provider.onWarning = (message)=>{
this.onWarning(message);
strapi.log.warn(message);
};
return {
transferID: this.transferID
};
},
async status () {
const isStarted = this.isTransferStarted();
if (isStarted) {
const startedAt = this.startedAt;
return {
active: true,
kind: TRANSFER_KIND,
startedAt,
elapsed: Date.now() - startedAt
};
}
return {
active: false,
kind: null,
elapsed: null,
startedAt: null
};
},
async end (params) {
await this.verifyAuth();
if (this.transferID !== params?.transferID) {
throw new providers.ProviderTransferError('Bad transfer ID provided');
}
this.cleanup();
return {
ok: true
};
}
}));
exports.createPushController = createPushController;
//# sourceMappingURL=push.js.map

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,398 @@
import { randomUUID } from 'crypto';
import { Writable, PassThrough } from 'stream';
import { ProviderTransferError } from '../../../errors/providers.mjs';
import { createLocalStrapiDestinationProvider } from '../../providers/local-destination/index.mjs';
import 'stream-chain';
import '../../queries/entity.mjs';
import 'lodash/fp';
import 'path';
import 'fs-extra';
import 'events';
import 'ws';
import { createFlow } from '../flows/index.mjs';
import { handlerControllerFactory, isDataTransferMessage } from './utils.mjs';
import DEFAULT_TRANSFER_FLOW from '../flows/default.mjs';
const VALID_TRANSFER_ACTIONS = [
'bootstrap',
'close',
'rollback',
'beforeTransfer',
'getMetadata',
'getSchemas'
];
const TRANSFER_KIND = 'push';
const writeAsync = (stream, data)=>{
return new Promise((resolve, reject)=>{
stream.write(data, (error)=>{
if (error) {
reject(error);
}
resolve();
});
});
};
const createPushController = handlerControllerFactory((proto)=>({
isTransferStarted () {
return proto.isTransferStarted.call(this) && this.provider !== undefined;
},
verifyAuth () {
return proto.verifyAuth.call(this, TRANSFER_KIND);
},
onInfo (message) {
this.diagnostics?.report({
details: {
message,
origin: 'push-handler',
createdAt: new Date()
},
kind: 'info'
});
},
onWarning (message) {
this.diagnostics?.report({
details: {
message,
createdAt: new Date(),
origin: 'push-handler'
},
kind: 'warning'
});
},
cleanup () {
proto.cleanup.call(this);
this.streams = {};
this.assets = {};
delete this.flow;
delete this.provider;
},
teardown () {
if (this.provider) {
this.provider.rollback();
}
proto.teardown.call(this);
},
assertValidTransfer () {
proto.assertValidTransfer.call(this);
if (this.provider === undefined) {
throw new Error('Invalid Transfer Process');
}
},
assertValidTransferAction (action) {
if (VALID_TRANSFER_ACTIONS.includes(action)) {
return;
}
throw new ProviderTransferError(`Invalid action provided: "${action}"`, {
action,
validActions: Object.keys(VALID_TRANSFER_ACTIONS)
});
},
assertValidStreamTransferStep (stage) {
const currentStep = this.flow?.get();
const nextStep = {
kind: 'transfer',
stage
};
if (currentStep?.kind === 'transfer' && !currentStep.locked) {
throw new ProviderTransferError(`You need to initialize the transfer stage (${nextStep}) before starting to stream data`);
}
if (this.flow?.cannot(nextStep)) {
throw new ProviderTransferError(`Invalid stage (${nextStep}) provided for the current flow`, {
step: nextStep
});
}
},
async createWritableStreamForStep (step) {
const mapper = {
entities: ()=>this.provider?.createEntitiesWriteStream(),
links: ()=>this.provider?.createLinksWriteStream(),
configuration: ()=>this.provider?.createConfigurationWriteStream(),
assets: ()=>this.provider?.createAssetsWriteStream()
};
if (!(step in mapper)) {
throw new Error('Invalid transfer step, impossible to create a stream');
}
if (!this.streams) {
throw new Error('Invalid transfer state');
}
this.streams[step] = await mapper[step]();
},
async onMessage (raw) {
const msg = JSON.parse(raw.toString());
if (!isDataTransferMessage(msg)) {
return;
}
if (!msg.uuid) {
await this.respond(undefined, new Error('Missing uuid in message'));
}
if (proto.hasUUID(msg.uuid)) {
const previousResponse = proto.response;
if (previousResponse?.uuid === msg.uuid) {
await this.respond(previousResponse?.uuid, previousResponse.e, previousResponse.data);
}
return;
}
const { uuid, type } = msg;
proto.addUUID(uuid);
// Regular command message (init, end, status)
if (type === 'command') {
const { command } = msg;
this.onInfo(`received command:${command} uuid:${uuid}`);
await this.executeAndRespond(uuid, ()=>{
this.assertValidTransferCommand(command);
// The status command don't have params
if (command === 'status') {
return this.status();
}
return this[command](msg.params);
});
} else if (type === 'transfer') {
this.onInfo(`received transfer action:${msg.action} step:${msg.kind} uuid:${uuid}`);
await this.executeAndRespond(uuid, async ()=>{
await this.verifyAuth();
this.assertValidTransfer();
return this.onTransferMessage(msg);
});
} else {
await this.respond(uuid, new Error('Bad Request'));
}
},
async onTransferMessage (msg) {
const { kind } = msg;
if (kind === 'action') {
return this.onTransferAction(msg);
}
if (kind === 'step') {
return this.onTransferStep(msg);
}
},
lockTransferStep (stage) {
const currentStep = this.flow?.get();
const nextStep = {
kind: 'transfer',
stage
};
if (currentStep?.kind === 'transfer' && currentStep.locked) {
throw new ProviderTransferError(`It's not possible to start a new transfer stage (${stage}) while another one is in progress (${currentStep.stage})`);
}
if (this.flow?.cannot(nextStep)) {
throw new ProviderTransferError(`Invalid stage (${stage}) provided for the current flow`, {
step: nextStep
});
}
this.flow?.set({
...nextStep,
locked: true
});
},
unlockTransferStep (stage) {
const currentStep = this.flow?.get();
const nextStep = {
kind: 'transfer',
stage
};
// Cannot unlock if not locked (aka: started)
if (currentStep?.kind === 'transfer' && !currentStep.locked) {
throw new ProviderTransferError(`You need to initialize the transfer stage (${stage}) before ending it`);
}
// Cannot unlock if invalid step provided
if (this.flow?.cannot(nextStep)) {
throw new ProviderTransferError(`Invalid stage (${stage}) provided for the current flow`, {
step: nextStep
});
}
this.flow?.set({
...nextStep,
locked: false
});
},
async onTransferStep (msg) {
const { step: stage } = msg;
if (msg.action === 'start') {
this.lockTransferStep(stage);
if (this.streams?.[stage] instanceof Writable) {
throw new Error('Stream already created, something went wrong');
}
await this.createWritableStreamForStep(stage);
this.stats[stage] = {
started: 0,
finished: 0
};
return {
ok: true
};
}
if (msg.action === 'stream') {
this.assertValidStreamTransferStep(stage);
// Stream operation on the current transfer stage
const stream = this.streams?.[stage];
if (!stream) {
throw new Error('You need to init first');
}
// Assets are nested streams
if (stage === 'assets') {
return this.streamAsset(msg.data);
}
// For all other steps
await Promise.all(msg.data.map(async (item)=>{
this.stats[stage].started += 1;
await writeAsync(stream, item);
this.stats[stage].finished += 1;
}));
}
if (msg.action === 'end') {
this.unlockTransferStep(stage);
const stream = this.streams?.[stage];
if (stream && !stream.closed) {
await new Promise((resolve, reject)=>{
stream.on('close', resolve).on('error', reject).end();
});
}
delete this.streams?.[stage];
return {
ok: true,
stats: this.stats[stage]
};
}
},
async onTransferAction (msg) {
const { action } = msg;
this.assertValidTransferAction(action);
const step = {
kind: 'action',
action
};
const isStepRegistered = this.flow?.has(step);
if (isStepRegistered) {
if (this.flow?.cannot(step)) {
throw new ProviderTransferError(`Invalid action "${action}" found for the current flow `, {
action
});
}
this.flow?.set(step);
}
if (action === 'bootstrap') {
return this.provider?.[action](this.diagnostics);
}
return this.provider?.[action]();
},
async streamAsset (payload) {
const assetsStream = this.streams?.assets;
// TODO: close the stream upon receiving an 'end' event instead
if (payload === null) {
this.streams?.assets?.end();
return;
}
for (const item of payload){
const { action, assetID } = item;
if (!assetsStream) {
throw new Error('Stream not defined');
}
if (action === 'start') {
this.stats.assets.started += 1;
this.assets[assetID] = {
...item.data,
stream: new PassThrough()
};
writeAsync(assetsStream, this.assets[assetID]);
}
if (action === 'stream') {
// The buffer has gone through JSON operations and is now of shape { type: "Buffer"; data: UInt8Array }
// We need to transform it back into a Buffer instance
const rawBuffer = item.data;
const chunk = Buffer.from(rawBuffer.data);
await writeAsync(this.assets[assetID].stream, chunk);
}
if (action === 'end') {
await new Promise((resolve, reject)=>{
const { stream: assetStream } = this.assets[assetID];
assetStream.on('close', ()=>{
this.stats.assets.finished += 1;
delete this.assets[assetID];
resolve();
}).on('error', reject).end();
});
}
}
},
onClose () {
this.teardown();
},
onError (err) {
this.teardown();
strapi.log.error(err);
},
// Commands
async init (params) {
if (this.transferID || this.provider) {
throw new Error('Transfer already in progress');
}
await this.verifyAuth();
this.transferID = randomUUID();
this.startedAt = Date.now();
this.assets = {};
this.streams = {};
this.stats = {
assets: {
started: 0,
finished: 0
},
configuration: {
started: 0,
finished: 0
},
entities: {
started: 0,
finished: 0
},
links: {
started: 0,
finished: 0
}
};
this.flow = createFlow(DEFAULT_TRANSFER_FLOW);
this.provider = createLocalStrapiDestinationProvider({
...params.options,
autoDestroy: false,
getStrapi: ()=>strapi
});
this.provider.onWarning = (message)=>{
this.onWarning(message);
strapi.log.warn(message);
};
return {
transferID: this.transferID
};
},
async status () {
const isStarted = this.isTransferStarted();
if (isStarted) {
const startedAt = this.startedAt;
return {
active: true,
kind: TRANSFER_KIND,
startedAt,
elapsed: Date.now() - startedAt
};
}
return {
active: false,
kind: null,
elapsed: null,
startedAt: null
};
},
async end (params) {
await this.verifyAuth();
if (this.transferID !== params?.transferID) {
throw new ProviderTransferError('Bad transfer ID provided');
}
this.cleanup();
return {
ok: true
};
}
}));
export { createPushController };
//# sourceMappingURL=push.mjs.map

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,26 @@
/// <reference types="node" />
import type { IncomingMessage } from 'node:http';
import type { Context } from 'koa';
import type { ServerOptions } from 'ws';
import { WebSocket, WebSocketServer } from 'ws';
import type { Handler } from './abstract';
import type { Protocol } from '../../../../types';
import { TransferMethod } from '../constants';
type WSCallback = (client: WebSocket, request: IncomingMessage) => void;
export interface HandlerOptions {
verify: (ctx: Context, scope?: TransferMethod) => Promise<void>;
server?: ServerOptions;
}
export declare const transformUpgradeHeader: (header?: string) => string[];
/**
* Make sure that the upgrade header is a valid websocket one
*/
export declare const assertValidHeader: (ctx: Context) => void;
export declare const isDataTransferMessage: (message: unknown) => message is Protocol.Client.Message;
/**
* Handle the upgrade to ws connection
*/
export declare const handleWSUpgrade: (wss: WebSocketServer, ctx: Context, callback: WSCallback) => void;
export declare const handlerControllerFactory: <T extends Partial<Handler>>(implementation: (proto: Handler) => T) => (options: HandlerOptions) => (ctx: Context) => Promise<void>;
export {};
//# sourceMappingURL=utils.d.ts.map

View File

@@ -0,0 +1 @@
{"version":3,"file":"utils.d.ts","sourceRoot":"","sources":["../../../../src/strapi/remote/handlers/utils.ts"],"names":[],"mappings":";AAAA,OAAO,KAAK,EAAE,eAAe,EAAE,MAAM,WAAW,CAAC;AAEjD,OAAO,KAAK,EAAE,OAAO,EAAE,MAAM,KAAK,CAAC;AACnC,OAAO,KAAK,EAAW,aAAa,EAAE,MAAM,IAAI,CAAC;AACjD,OAAO,EAAE,SAAS,EAAE,eAAe,EAAE,MAAM,IAAI,CAAC;AAEhD,OAAO,KAAK,EAAE,OAAO,EAAiB,MAAM,YAAY,CAAC;AACzD,OAAO,KAAK,EAAE,QAAQ,EAAE,MAAM,mBAAmB,CAAC;AAGlD,OAAO,EAAE,cAAc,EAAE,MAAM,cAAc,CAAC;AAG9C,KAAK,UAAU,GAAG,CAAC,MAAM,EAAE,SAAS,EAAE,OAAO,EAAE,eAAe,KAAK,IAAI,CAAC;AAExE,MAAM,WAAW,cAAc;IAC7B,MAAM,EAAE,CAAC,GAAG,EAAE,OAAO,EAAE,KAAK,CAAC,EAAE,cAAc,KAAK,OAAO,CAAC,IAAI,CAAC,CAAC;IAChE,MAAM,CAAC,EAAE,aAAa,CAAC;CACxB;AAED,eAAO,MAAM,sBAAsB,+BAElC,CAAC;AAyCF;;GAEG;AACH,eAAO,MAAM,iBAAiB,QAAS,OAAO,SA4B7C,CAAC;AAEF,eAAO,MAAM,qBAAqB,YAAa,OAAO,uCAgBrD,CAAC;AAEF;;GAEG;AACH,eAAO,MAAM,eAAe,QAAS,eAAe,OAAO,OAAO,YAAY,UAAU,SAsBvF,CAAC;AAIF,eAAO,MAAM,wBAAwB,+CACU,CAAC,KAAK,EAAE,OAAO,KAAK,CAAC,eACxD,cAAc,WAKH,OAAO,kBAkP3B,CAAC"}

View File

@@ -0,0 +1,316 @@
'use strict';
var crypto = require('crypto');
var ws = require('ws');
var providers = require('../../../errors/providers.js');
var constants = require('./constants.js');
var diagnostic = require('../../../utils/diagnostic.js');
const transformUpgradeHeader = (header = '')=>{
return header.split(',').map((s)=>s.trim().toLowerCase());
};
let timeouts;
const hasHttpServer = ()=>{
// during server restarts, strapi may not have ever been defined at all, so we have to check it first
return typeof strapi !== 'undefined' && !!strapi?.server?.httpServer;
};
// temporarily disable server timeouts while transfer is running
const disableTimeouts = ()=>{
if (!hasHttpServer()) {
return;
}
const { httpServer } = strapi.server;
// save the original timeouts to restore after
if (!timeouts) {
timeouts = {
headersTimeout: httpServer.headersTimeout,
requestTimeout: httpServer.requestTimeout
};
}
httpServer.headersTimeout = 0;
httpServer.requestTimeout = 0;
strapi.log.info('[Data transfer] Disabling http timeouts');
};
const resetTimeouts = ()=>{
if (!hasHttpServer() || !timeouts) {
return;
}
const { httpServer } = strapi.server;
strapi.log.info('[Data transfer] Restoring http timeouts');
httpServer.headersTimeout = timeouts.headersTimeout;
httpServer.requestTimeout = timeouts.requestTimeout;
};
/**
* Make sure that the upgrade header is a valid websocket one
*/ const assertValidHeader = (ctx)=>{
// if it's exactly what we expect, it's fine
if (ctx.headers.upgrade === 'websocket') {
return;
}
// check if it could be an array that still includes websocket
const upgradeHeader = transformUpgradeHeader(ctx.headers.upgrade);
// Sanitize user input before writing it to our logs
const logSafeUpgradeHeader = JSON.stringify(ctx.headers.upgrade)?.replace(/[^a-z0-9\s.,|]/gi, '').substring(0, 50);
if (!upgradeHeader.includes('websocket')) {
throw new Error(`Transfer Upgrade header expected 'websocket', found '${logSafeUpgradeHeader}'. Please ensure that your server or proxy is not modifying the Upgrade header.`);
}
/**
* If there's more than expected but it still includes websocket, in theory it could still work
* and could be necessary for their certain configurations, so we'll allow it to proceed but
* log the unexpected behaviour in case it helps debug an issue
* */ strapi.log.info(`Transfer Upgrade header expected only 'websocket', found unexpected values: ${logSafeUpgradeHeader}`);
};
const isDataTransferMessage = (message)=>{
if (!message || typeof message !== 'object') {
return false;
}
const { uuid, type } = message;
if (typeof uuid !== 'string' || typeof type !== 'string') {
return false;
}
if (![
'command',
'transfer'
].includes(type)) {
return false;
}
return true;
};
/**
* Handle the upgrade to ws connection
*/ const handleWSUpgrade = (wss, ctx, callback)=>{
assertValidHeader(ctx);
wss.handleUpgrade(ctx.req, ctx.request.socket, Buffer.alloc(0), (client, request)=>{
if (!client) {
// If the WebSocket upgrade failed, destroy the socket to avoid hanging
ctx.request.socket.destroy();
return;
}
disableTimeouts();
strapi.db.lifecycles.disable();
strapi.log.info('[Data transfer] Disabling lifecycle hooks');
// Create a connection between the client & the server
wss.emit('connection', client, ctx.req);
// Invoke the ws callback
callback(client, request);
});
ctx.respond = false;
};
// Protocol related functions
const handlerControllerFactory = (implementation)=>(options)=>{
const { verify, server: serverOptions } = options ?? {};
const wss = new ws.WebSocket.Server({
...serverOptions,
noServer: true
});
return async (ctx)=>{
const cb = (ws)=>{
const state = {
id: undefined
};
const messageUUIDs = new Set();
const diagnostics = diagnostic.createDiagnosticReporter();
const cannotRespondHandler = (err)=>{
strapi?.log?.error('[Data transfer] Cannot send error response to client, closing connection');
strapi?.log?.error(err);
try {
ws.terminate();
ctx.req.socket.destroy();
} catch (err) {
strapi?.log?.error('[Data transfer] Failed to close socket on error');
}
};
const prototype = {
// Transfer ID
get transferID () {
return state.id;
},
set transferID (id){
state.id = id;
},
// Started at
get startedAt () {
return state.startedAt;
},
set startedAt (timestamp){
state.startedAt = timestamp;
},
get response () {
return state.response;
},
set response (response){
state.response = response;
},
get diagnostics () {
return diagnostics;
},
addUUID (uuid) {
messageUUIDs.add(uuid);
},
hasUUID (uuid) {
return messageUUIDs.has(uuid);
},
isTransferStarted () {
return this.transferID !== undefined && this.startedAt !== undefined;
},
assertValidTransfer () {
const isStarted = this.isTransferStarted();
if (!isStarted) {
throw new Error('Invalid Transfer Process');
}
},
assertValidTransferCommand (command) {
const isDefined = typeof this[command] === 'function';
const isValidTransferCommand = constants.VALID_TRANSFER_COMMANDS.includes(command);
if (!isDefined || !isValidTransferCommand) {
throw new Error('Invalid transfer command');
}
},
async respond (uuid, e, data) {
let details = {};
return new Promise((resolve, reject)=>{
if (!uuid && !e) {
reject(new Error('Missing uuid for this message'));
return;
}
this.response = {
uuid,
data,
e
};
if (e instanceof providers.ProviderError) {
details = e.details;
}
const payload = JSON.stringify({
uuid,
data: data ?? null,
error: e ? {
code: e?.name ?? 'ERR',
message: e?.message,
details
} : null
});
this.send(payload, (error)=>error ? reject(error) : resolve());
});
},
send (message, cb) {
ws.send(message, cb);
},
confirm (message) {
return new Promise((resolve, reject)=>{
const uuid = crypto.randomUUID();
const payload = JSON.stringify({
uuid,
data: message
});
this.send(payload, (error)=>{
if (error) {
reject(error);
}
});
const onResponse = (raw)=>{
const response1 = JSON.parse(raw.toString());
if (response1.uuid === uuid) {
resolve(response1.data ?? null);
} else {
ws.once('message', onResponse);
}
};
ws.once('message', onResponse);
});
},
async executeAndRespond (uuid, fn) {
try {
const response1 = await fn();
await this.respond(uuid, null, response1);
} catch (e) {
if (e instanceof Error) {
await this.respond(uuid, e).catch(cannotRespondHandler);
} else if (typeof e === 'string') {
await this.respond(uuid, new providers.ProviderTransferError(e)).catch(cannotRespondHandler);
} else {
await this.respond(uuid, new providers.ProviderTransferError('Unexpected error', {
error: e
})).catch(cannotRespondHandler);
}
}
},
cleanup () {
this.transferID = undefined;
this.startedAt = undefined;
this.response = undefined;
},
teardown () {
this.cleanup();
},
verifyAuth (scope) {
return verify(ctx, scope);
},
// Transfer commands
init () {},
end () {},
status () {},
// Default prototype implementation for events
onMessage () {},
onError () {},
onClose () {},
onInfo () {},
onWarning () {}
};
const handler = Object.assign(Object.create(prototype), implementation(prototype));
// Bind ws events to handler methods
ws.on('close', async (...args)=>{
try {
await handler.onClose(...args);
} catch (err) {
strapi?.log?.error('[Data transfer] Uncaught error closing connection');
strapi?.log?.error(err);
cannotRespondHandler(err);
} finally{
resetTimeouts();
strapi.db.lifecycles.enable();
strapi.log.info('[Data transfer] Restoring lifecycle hooks');
}
});
ws.on('error', async (...args)=>{
try {
await handler.onError(...args);
} catch (err) {
strapi?.log?.error('[Data transfer] Uncaught error in error handling');
strapi?.log?.error(err);
cannotRespondHandler(err);
}
});
ws.on('message', async (...args)=>{
try {
await handler.onMessage(...args);
} catch (err) {
strapi?.log?.error('[Data transfer] Uncaught error in message handling');
strapi?.log?.error(err);
cannotRespondHandler(err);
}
});
diagnostics.onDiagnostic((diagnostic)=>{
const uuid = crypto.randomUUID();
const payload = JSON.stringify({
diagnostic,
uuid
});
handler.send(payload);
});
};
try {
handleWSUpgrade(wss, ctx, cb);
} catch (err) {
strapi?.log?.error('[Data transfer] Error in websocket upgrade request');
strapi?.log?.error(err);
}
};
};
exports.assertValidHeader = assertValidHeader;
exports.handleWSUpgrade = handleWSUpgrade;
exports.handlerControllerFactory = handlerControllerFactory;
exports.isDataTransferMessage = isDataTransferMessage;
exports.transformUpgradeHeader = transformUpgradeHeader;
//# sourceMappingURL=utils.js.map

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,310 @@
import { randomUUID } from 'crypto';
import { WebSocket } from 'ws';
import { ProviderError, ProviderTransferError } from '../../../errors/providers.mjs';
import { VALID_TRANSFER_COMMANDS } from './constants.mjs';
import { createDiagnosticReporter } from '../../../utils/diagnostic.mjs';
const transformUpgradeHeader = (header = '')=>{
return header.split(',').map((s)=>s.trim().toLowerCase());
};
let timeouts;
const hasHttpServer = ()=>{
// during server restarts, strapi may not have ever been defined at all, so we have to check it first
return typeof strapi !== 'undefined' && !!strapi?.server?.httpServer;
};
// temporarily disable server timeouts while transfer is running
const disableTimeouts = ()=>{
if (!hasHttpServer()) {
return;
}
const { httpServer } = strapi.server;
// save the original timeouts to restore after
if (!timeouts) {
timeouts = {
headersTimeout: httpServer.headersTimeout,
requestTimeout: httpServer.requestTimeout
};
}
httpServer.headersTimeout = 0;
httpServer.requestTimeout = 0;
strapi.log.info('[Data transfer] Disabling http timeouts');
};
const resetTimeouts = ()=>{
if (!hasHttpServer() || !timeouts) {
return;
}
const { httpServer } = strapi.server;
strapi.log.info('[Data transfer] Restoring http timeouts');
httpServer.headersTimeout = timeouts.headersTimeout;
httpServer.requestTimeout = timeouts.requestTimeout;
};
/**
* Make sure that the upgrade header is a valid websocket one
*/ const assertValidHeader = (ctx)=>{
// if it's exactly what we expect, it's fine
if (ctx.headers.upgrade === 'websocket') {
return;
}
// check if it could be an array that still includes websocket
const upgradeHeader = transformUpgradeHeader(ctx.headers.upgrade);
// Sanitize user input before writing it to our logs
const logSafeUpgradeHeader = JSON.stringify(ctx.headers.upgrade)?.replace(/[^a-z0-9\s.,|]/gi, '').substring(0, 50);
if (!upgradeHeader.includes('websocket')) {
throw new Error(`Transfer Upgrade header expected 'websocket', found '${logSafeUpgradeHeader}'. Please ensure that your server or proxy is not modifying the Upgrade header.`);
}
/**
* If there's more than expected but it still includes websocket, in theory it could still work
* and could be necessary for their certain configurations, so we'll allow it to proceed but
* log the unexpected behaviour in case it helps debug an issue
* */ strapi.log.info(`Transfer Upgrade header expected only 'websocket', found unexpected values: ${logSafeUpgradeHeader}`);
};
const isDataTransferMessage = (message)=>{
if (!message || typeof message !== 'object') {
return false;
}
const { uuid, type } = message;
if (typeof uuid !== 'string' || typeof type !== 'string') {
return false;
}
if (![
'command',
'transfer'
].includes(type)) {
return false;
}
return true;
};
/**
* Handle the upgrade to ws connection
*/ const handleWSUpgrade = (wss, ctx, callback)=>{
assertValidHeader(ctx);
wss.handleUpgrade(ctx.req, ctx.request.socket, Buffer.alloc(0), (client, request)=>{
if (!client) {
// If the WebSocket upgrade failed, destroy the socket to avoid hanging
ctx.request.socket.destroy();
return;
}
disableTimeouts();
strapi.db.lifecycles.disable();
strapi.log.info('[Data transfer] Disabling lifecycle hooks');
// Create a connection between the client & the server
wss.emit('connection', client, ctx.req);
// Invoke the ws callback
callback(client, request);
});
ctx.respond = false;
};
// Protocol related functions
const handlerControllerFactory = (implementation)=>(options)=>{
const { verify, server: serverOptions } = options ?? {};
const wss = new WebSocket.Server({
...serverOptions,
noServer: true
});
return async (ctx)=>{
const cb = (ws)=>{
const state = {
id: undefined
};
const messageUUIDs = new Set();
const diagnostics = createDiagnosticReporter();
const cannotRespondHandler = (err)=>{
strapi?.log?.error('[Data transfer] Cannot send error response to client, closing connection');
strapi?.log?.error(err);
try {
ws.terminate();
ctx.req.socket.destroy();
} catch (err) {
strapi?.log?.error('[Data transfer] Failed to close socket on error');
}
};
const prototype = {
// Transfer ID
get transferID () {
return state.id;
},
set transferID (id){
state.id = id;
},
// Started at
get startedAt () {
return state.startedAt;
},
set startedAt (timestamp){
state.startedAt = timestamp;
},
get response () {
return state.response;
},
set response (response){
state.response = response;
},
get diagnostics () {
return diagnostics;
},
addUUID (uuid) {
messageUUIDs.add(uuid);
},
hasUUID (uuid) {
return messageUUIDs.has(uuid);
},
isTransferStarted () {
return this.transferID !== undefined && this.startedAt !== undefined;
},
assertValidTransfer () {
const isStarted = this.isTransferStarted();
if (!isStarted) {
throw new Error('Invalid Transfer Process');
}
},
assertValidTransferCommand (command) {
const isDefined = typeof this[command] === 'function';
const isValidTransferCommand = VALID_TRANSFER_COMMANDS.includes(command);
if (!isDefined || !isValidTransferCommand) {
throw new Error('Invalid transfer command');
}
},
async respond (uuid, e, data) {
let details = {};
return new Promise((resolve, reject)=>{
if (!uuid && !e) {
reject(new Error('Missing uuid for this message'));
return;
}
this.response = {
uuid,
data,
e
};
if (e instanceof ProviderError) {
details = e.details;
}
const payload = JSON.stringify({
uuid,
data: data ?? null,
error: e ? {
code: e?.name ?? 'ERR',
message: e?.message,
details
} : null
});
this.send(payload, (error)=>error ? reject(error) : resolve());
});
},
send (message, cb) {
ws.send(message, cb);
},
confirm (message) {
return new Promise((resolve, reject)=>{
const uuid = randomUUID();
const payload = JSON.stringify({
uuid,
data: message
});
this.send(payload, (error)=>{
if (error) {
reject(error);
}
});
const onResponse = (raw)=>{
const response1 = JSON.parse(raw.toString());
if (response1.uuid === uuid) {
resolve(response1.data ?? null);
} else {
ws.once('message', onResponse);
}
};
ws.once('message', onResponse);
});
},
async executeAndRespond (uuid, fn) {
try {
const response1 = await fn();
await this.respond(uuid, null, response1);
} catch (e) {
if (e instanceof Error) {
await this.respond(uuid, e).catch(cannotRespondHandler);
} else if (typeof e === 'string') {
await this.respond(uuid, new ProviderTransferError(e)).catch(cannotRespondHandler);
} else {
await this.respond(uuid, new ProviderTransferError('Unexpected error', {
error: e
})).catch(cannotRespondHandler);
}
}
},
cleanup () {
this.transferID = undefined;
this.startedAt = undefined;
this.response = undefined;
},
teardown () {
this.cleanup();
},
verifyAuth (scope) {
return verify(ctx, scope);
},
// Transfer commands
init () {},
end () {},
status () {},
// Default prototype implementation for events
onMessage () {},
onError () {},
onClose () {},
onInfo () {},
onWarning () {}
};
const handler = Object.assign(Object.create(prototype), implementation(prototype));
// Bind ws events to handler methods
ws.on('close', async (...args)=>{
try {
await handler.onClose(...args);
} catch (err) {
strapi?.log?.error('[Data transfer] Uncaught error closing connection');
strapi?.log?.error(err);
cannotRespondHandler(err);
} finally{
resetTimeouts();
strapi.db.lifecycles.enable();
strapi.log.info('[Data transfer] Restoring lifecycle hooks');
}
});
ws.on('error', async (...args)=>{
try {
await handler.onError(...args);
} catch (err) {
strapi?.log?.error('[Data transfer] Uncaught error in error handling');
strapi?.log?.error(err);
cannotRespondHandler(err);
}
});
ws.on('message', async (...args)=>{
try {
await handler.onMessage(...args);
} catch (err) {
strapi?.log?.error('[Data transfer] Uncaught error in message handling');
strapi?.log?.error(err);
cannotRespondHandler(err);
}
});
diagnostics.onDiagnostic((diagnostic)=>{
const uuid = randomUUID();
const payload = JSON.stringify({
diagnostic,
uuid
});
handler.send(payload);
});
};
try {
handleWSUpgrade(wss, ctx, cb);
} catch (err) {
strapi?.log?.error('[Data transfer] Error in websocket upgrade request');
strapi?.log?.error(err);
}
};
};
export { assertValidHeader, handleWSUpgrade, handlerControllerFactory, isDataTransferMessage, transformUpgradeHeader };
//# sourceMappingURL=utils.mjs.map

File diff suppressed because one or more lines are too long