Data Management OSS (Object Storage Service) の Direct-to-S3 アプローチへの移行についてのアナウンスがありましたので、この移行をよりスムーズにおこなっていただくための情報をご提供したいと思います。今回は、Autodesk Forge サービスにおける新しいバイナリ転送のための Node.js ユーティリティについてです。 これらのサンプルは、LTS バージョンの Node.js を使用してビルドされています。
チームはまた、Direct-to-S3 アプローチを使用する新しいSDKの開発にも取り組んでいます。
チームの Petr Broz は、OSS Direct-to-S3 アプローチのために新しくリリースされたすべてのエンドポイントを含む、キュレーションされたユーティリティファイルに取り組みました。
Github のリポジトリはこちら、その中の Node.js ブランチはこちらで利用可能です。
Index.js (ユーティリテイ ファイル)
const axios = require('axios');
const rax = require('retry-axios');
class BinaryTransferClient {
/**
* Creates a new instance of the binary transfer helper client.
*
* Note that the provided access token will be used for all requests initiated
* by this client. For long-running operations the token could potentially expire,
* so consider modifying this class to refresh the token whenever needed.
*
* @param {string} token Access token to use when communicating with Autodesk Forge services.
* @param {string} [host="https://developer.api.autodesk.com"] Optional Autodesk Forge host).
*/
constructor(token, host) {
this.token = token;
this.axios = axios.create({
baseURL: (host || 'https://developer.api.autodesk.com') + '/oss/v2/'
});
// Attach an interceptor to the axios instance that will retry response codes 100-199, 429, and 500-599.
// For default settings, see https://github.com/JustinBeckwith/retry-axios#usage.
this.axios.defaults.raxConfig = {
instance: this.axios
};
rax.attach(this.axios);
}
/**
* Generates one or more signed URLs that can be used to upload a file (or its parts) to OSS,
* and an upload key that is used to generate additional URLs or in {@see _completeUpload}
* after all the parts have been uploaded successfully.
*
* Note that if you are uploading in multiple parts, each part except for the final one
* must be of size at least 5MB, otherwise the call to {@see _completeUpload} will fail.
*
* @async
* @param {string} bucketKey Bucket key.
* @param {string} objectKey Object key.
* @param {number} [parts=1] How many URLs to generate in case of multi-part upload.
* @param {number} [firstPart=1] Index of the part the first returned URL should point to.
* For example, to upload parts 10 through 15 of a file, use `firstPart` = 10 and `parts` = 6.
* @param {string} [uploadKey] Optional upload key if this is a continuation of a previously
* initiated upload.
* @param {number} [minutesExpiration] Custom expiration for the upload URLs
* (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
* @returns {Promise<object>} Signed URLs for uploading chunks of the file to AWS S3,
* and a unique upload key used to generate additional URLs or to complete the upload.
*/
async _getUploadUrls(bucketKey, objectKey, parts = 1, firstPart = 1, uploadKey, minutesExpiration) {
let endpoint = `buckets/${bucketKey}/objects/${encodeURIComponent(objectKey)}/signeds3upload?parts=${parts}&firstPart=${firstPart}`;
if (uploadKey) {
endpoint += `&uploadKey=${uploadKey}`;
}
if (minutesExpiration) {
endpoint += `&minutesExpiration=${minutesExpiration}`;
}
const headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + this.token
};
const resp = await this.axios.get(endpoint, { headers });
return resp.data;
}
/**
* Finalizes the upload of a file to OSS.
*
* @async
* @param {string} bucketKey Bucket key.
* @param {string} objectKey Object key.
* @param {string} uploadKey Upload key returned by {@see _getUploadUrls}.
* @param {string} [contentType] Optinal content type that should be recorded for the uploaded file.
* @returns {Promise<object>} Details of the created object in OSS.
*/
async _completeUpload(bucketKey, objectKey, uploadKey, contentType) {
const endpoint = `buckets/${bucketKey}/objects/${encodeURIComponent(objectKey)}/signeds3upload`;
const payload = { uploadKey };
const headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + this.token
};
if (contentType) {
headers['x-ads-meta-Content-Type'] = contentType;
}
const resp = await this.axios.post(endpoint, payload, { headers });
return resp.data;
}
/**
* Uploads content to a specific bucket object.
*
* @async
* @param {string} bucketKey Bucket key.
* @param {string} objectKey Name of uploaded object.
* @param {Buffer} data Object content.
* @param {object} [options] Additional upload options.
* @param {string} [options.contentType] Optional content type of the uploaded file.
* @param {number} [options.minutesExpiration] Custom expiration for the upload URLs
* (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
* @returns {Promise<object>} Object description containing 'bucketKey', 'objectKey', 'objectId',
* 'sha1', 'size', 'location', and 'contentType'.
* @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
*/
async uploadObject(bucketKey, objectKey, data, options) {
console.assert(data.byteLength > 0);
const ChunkSize = 5 << 20;
const MaxBatches = 25;
const totalParts = Math.ceil(data.byteLength / ChunkSize);
let partsUploaded = 0;
let uploadUrls = [];
let uploadKey;
while (partsUploaded < totalParts) {
const chunk = data.slice(partsUploaded * ChunkSize, Math.min((partsUploaded + 1) * ChunkSize, data.byteLength));
while (true) {
console.debug('Uploading part', partsUploaded + 1);
if (uploadUrls.length === 0) {
// Automatically retries 429 and 500-599 responses
const uploadParams = await this._getUploadUrls(bucketKey, objectKey, Math.min(totalParts - partsUploaded, MaxBatches), partsUploaded + 1, uploadKey, options?.minutesExpiration);
uploadUrls = uploadParams.urls.slice();
uploadKey = uploadParams.uploadKey;
}
const url = uploadUrls.shift();
try {
await this.axios.put(url, chunk);
break;
} catch (err) {
const status = err.response?.status;
if (status === 403) {
console.debug('Got 403, refreshing upload URLs');
uploadUrls = []; // Couldn't this cause an infinite loop? (i.e., could the server keep responding with 403 indefinitely?)
} else {
throw err;
}
}
}
console.debug('Part successfully uploaded', partsUploaded + 1);
partsUploaded++;
}
console.debug('Completing part upload');
return this._completeUpload(bucketKey, objectKey, uploadKey, options?.contentType);
}
/**
* Uploads content stream to a specific bucket object.
*
* @async
* @param {string} bucketKey Bucket key.
* @param {string} objectKey Name of uploaded object.
* @param {AsyncIterable<Buffer>} stream Input stream.
* @param {object} [options] Additional upload options.
* @param {string} [options.contentType] Optional content type of the uploaded file.
* @param {number} [options.minutesExpiration] Custom expiration for the upload URLs
* (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
* @returns {Promise<object>} Object description containing 'bucketKey', 'objectKey', 'objectId',
* 'sha1', 'size', 'location', and 'contentType'.
* @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
*/
async uploadObjectStream(bucketKey, objectKey, input, options) {
// Helper async generator making sure that each chunk has at least certain number of bytes
async function* bufferChunks(input, minChunkSize) {
let buffer = Buffer.alloc(2 * minChunkSize);
let bytesRead = 0;
for await (const chunk of input) {
chunk.copy(buffer, bytesRead);
bytesRead += chunk.byteLength;
if (bytesRead >= minChunkSize) {
yield buffer.slice(0, bytesRead);
bytesRead = 0;
}
}
if (bytesRead > 0) {
yield buffer.slice(0, bytesRead);
}
}
const MaxBatches = 25;
const ChunkSize = 5 << 20;
let partsUploaded = 0;
let uploadUrls = [];
let uploadKey;
for await (const chunk of bufferChunks(input, ChunkSize)) {
while (true) {
console.debug('Uploading part', partsUploaded + 1);
if (uploadUrls.length === 0) {
const uploadParams = await this._getUploadUrls(bucketKey, objectKey, MaxBatches, partsUploaded + 1, uploadKey, options?.minutesExpiration);
uploadUrls = uploadParams.urls.slice();
uploadKey = uploadParams.uploadKey;
}
const url = uploadUrls.shift();
try {
await this.axios.put(url, chunk);
break;
} catch (err) {
const status = err.response?.status;
if (status === 403) {
console.debug('Got 403, refreshing upload URLs');
uploadUrls = []; // Couldn't this cause an infinite loop? (i.e., could the server keep responding with 403 indefinitely?
} else {
throw err;
}
}
}
console.debug('Part successfully uploaded', partsUploaded + 1);
partsUploaded++;
}
console.debug('Completing part upload');
return this._completeUpload(bucketKey, objectKey, uploadKey, options?.contentType);
}
/**
* Generates a signed URL that can be used to download a file from OSS.
*
* @async
* @param {string} bucketKey Bucket key.
* @param {string} objectKey Object key.
* @param {number} [minutesExpiration] Custom expiration for the download URLs
* (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
* @returns {Promise<object>} Download URLs and potentially other helpful information.
*/
async _getDownloadUrl(bucketKey, objectKey, minutesExpiration) {
let endpoint = `buckets/${bucketKey}/objects/${encodeURIComponent(objectKey)}/signeds3download`;
if (minutesExpiration) {
endpoint += `?minutesExpiration=${minutesExpiration}`;
}
const headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + this.token
};
const resp = await this.axios.get(endpoint, { headers });
return resp.data;
}
/**
* Downloads a specific OSS object.
*
* @async
* @param {string} bucketKey Bucket key.
* @param {string} objectKey Object key.
* @param {object} [options] Additional download options.
* @param {number} [options.minutesExpiration] Custom expiration for the download URLs
* (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
* @returns {Promise<ArrayBuffer>} Object content.
* @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
*/
async downloadObject(bucketKey, objectKey, options) {
console.debug('Retrieving download URL');
const downloadParams = await this._getDownloadUrl(bucketKey, objectKey, options?.minutesExpiration);
if (downloadParams.status !== 'complete') {
throw new Error('File not available for download yet.');
}
const resp = await this.axios.get(downloadParams.url, {
responseType: 'arraybuffer',
onDownloadProgress: progressEvent => {
const downloadedBytes = progressEvent.currentTarget.response.length;
const totalBytes = parseInt(progressEvent.currentTarget.responseHeaders['Content-Length']);
console.debug('Downloaded', downloadedBytes, 'bytes of', totalBytes);
}
});
return resp.data;
}
/**
* Downloads content stream of a specific bucket object.
*
* @async
* @param {string} bucketKey Bucket key.
* @param {string} objectKey Object name.
* @param {object} [options] Additional download options.
* @param {number} [options.minutesExpiration] Custom expiration for the download URLs
* (within the 1 to 60 minutes range). If not specified, default is 2 minutes.
* @returns {Promise<ReadableStream>} Object content stream.
* @throws Error when the request fails, for example, due to insufficient rights, or incorrect scopes.
*/
async downloadObjectStream(bucketKey, objectKey, options) {
console.debug('Retrieving download URL');
const downloadParams = await this._getDownloadUrl(bucketKey, objectKey, options?.minutesExpiration);
if (downloadParams.status !== 'complete') {
throw new Error('File not available for download yet.');
}
const resp = await this.axios.get(downloadParams.url, {
responseType: 'stream',
onDownloadProgress: progressEvent => {
const downloadedBytes = progressEvent.currentTarget.response.length;
const totalBytes = parseInt(progressEvent.currentTarget.responseHeaders['Content-Length']);
console.debug('Downloaded', downloadedBytes, 'bytes of', totalBytes);
}
});
return resp.data;
}
}
module.exports = {
BinaryTransferClient
};
署名済み URL(Signed URL)のデフォルトの有効期限は2分です(minutesExpiration パラメータで最大60分まで有効期限を延長することができます)。
ダウンロード
まず、ダウンロードの手順からご紹介します。AWS S3 から署名済み URL(Signed URL)を使ってファイルを直接ダウンロードするために、2つのステップを踏む必要があります。以下は、その仕組みを説明する擬似コードです。
- GET buckets/:bucketKey/objects/:objectName/signeds3download エンドポイントを使ってダウンロード用の URL を生成します。
- 新しいURLを使用して、AWS S3 から直接 OSS オブジェクトをダウンロードします。
- レスポンス コードが 100~199、429、500~599 の場合、ダウンロードの再試行(例えば指数関数的バックオフ)を検討する。
以下は、ストリームのダウンロードをおこなう場合のコードです。
const fs = require('fs');
const { BinaryTransferClient } = require('..');
async function downloadStream(filePath, bucketKey, objectKey, accessToken) {
const client = new BinaryTransferClient(accessToken);
const stream = await client.downloadObjectStream(bucketKey, objectKey);
stream.pipe(fs.createWriteStream(filePath));
}
if (process.argv.length < 6) {
console.log('Usage:');
console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
process.exit(0);
}
downloadStream(process.argv[2], process.argv[3], process.argv[4], process.argv[5]);
また、オブジェクトをローカルファイルにダウンロードする場合(最初にファイル全体をメモリに受信する)には、次のようなコードを使用します。
const fs = require('fs');
const { BinaryTransferClient } = require('..');
async function downloadBuffer(filePath, bucketKey, objectKey, accessToken) {
const client = new BinaryTransferClient(accessToken);
const buffer = await client.downloadObject(bucketKey, objectKey);
fs.writeFileSync(filePath, buffer);
}
if (process.argv.length < 6) {
console.log('Usage:');
console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
process.exit(0);
}
downloadBuffer(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
.then(_ => 'Done!')
.catch(err => console.error(err));
アップロード
次にアップロードの手順をご紹介します。AWS S3 から署名付き URL(Signed URL)を使って直接ファイルをアップロードするには、3 つのステップを踏む必要があります。以下は、その仕組みを説明する擬似コードです。
- アップロードするファイルのパーツ数を算出
- 注意:最後の 1 つを除き、アップロードする各パーツは 5 MB 以上であること
- GET buckets/:bucketKey/objects/:objectKey/signeds3upload?firstPart=<index of first part>&parts=<number of parts> エンドポイントを使用して特定のパーツのファイルをアップロードするための、最大 25 の URL を生成
-
パーツ番号は 1 から始まるものと仮定
-
例えば、10 番パーツから 15 番パーツまでのアップロード用 URL を生成するには、<index of first part> を 10 に、<number of parts> を 6 に設定
-
このエンドポイントは、後で追加の URL を要求したり、アップロードを確定するために使用する uploadKey も返す
-
- 残りのパーツ ファイルを、対応するアップロード URL にアップロード
-
レスポンスコードが 100~199、429、500~599 の場合、個々のアップロードの再試行を検討する(例えば指数関数的バックオフを使用)
-
レスポンスコードが 403 の場合、アップロード用 URL の有効期限が切れているため、上記手順 2. へ戻る
-
アップロード用 URL をすべて使い切ってしまい、まだアップロードする必要があるパーツが存在する場合、手順 2. に戻って URL を生成する
-
-
POST buckets/:bucketKey/objects/:objectKey/signeds3upload エンドポイントを使用して、ステップ 2. からの uploadKey 値を使用してアップロードを確定させる
下記は、ローカルファイルを OSS Bucket に(ストリームとして)アップロードする場合のコードです。
const fs = require('fs');
const { BinaryTransferClient } = require('..');
async function uploadStream(filePath, bucketKey, objectKey, accessToken) {
const client = new BinaryTransferClient(accessToken);
const stream = fs.createReadStream(filePath);
const object = await client.uploadObjectStream(bucketKey, objectKey, stream);
return object;
}
if (process.argv.length < 6) {
console.log('Usage:');
console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
process.exit(0);
}
uploadStream(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
.then(obj => console.log(obj))
.catch(err => console.error(err));
また、ローカルファイルを OSS Bucket にアップロードしたい場合(最初にファイル全体をメモリに読み込む)には、次のようにします。
const fs = require('fs');
const { BinaryTransferClient } = require('..');
async function uploadBuffer(filePath, bucketKey, objectKey, accessToken) {
const client = new BinaryTransferClient(accessToken);
const buffer = fs.readFileSync(filePath);
const object = await client.uploadObject(bucketKey, objectKey, buffer);
return object;
}
if (process.argv.length < 6) {
console.log('Usage:');
console.log('node ' + __filename + ' <path to local file> <bucket key> <object key> <access token>');
process.exit(0);
}
uploadBuffer(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
.then(obj => console.log(obj))
.catch(err => console.error(err));
また、Data Management API で Hub(BIM 360、Fusion Teams、ACC など)にローカルファイルをアップロードする方法も忘れてはいけません。
const fs = require('fs');
const path = require('path');
const { ProjectsApi, FoldersApi, ItemsApi, VersionsApi } = require('forge-apis');
const { BinaryTransferClient } = require('..');
async function getFolderContents(projectId, folderId, getAccessToken) {
const resp = await new FoldersApi().getFolderContents(projectId, folderId, {}, null, getAccessToken());
return resp.body.data;
}
async function createStorage(projectId, folderId, displayName, getAccessToken) {
const body = {
jsonapi: {
version: '1.0'
},
data: {
type: 'objects',
attributes: {
name: displayName
},
relationships: {
target: {
data: {
type: 'folders',
id: folderId
}
}
}
}
};
const resp = await new ProjectsApi().postStorage(projectId, body, null, getAccessToken());
return resp.body.data;
}
async function createItem(projectId, folderId, objectId, displayName, getAccessToken) {
const body = {
jsonapi: {
version: '1.0'
},
data: {
type: 'items',
attributes: {
displayName,
extension: {
type: 'items:autodesk.core:File',
version: '1.0'
}
},
relationships: {
tip: {
data: {
type: 'versions',
id: '1'
}
},
parent: {
data: {
type: 'folders',
id: folderId
}
}
}
},
included: [
{
type: 'versions',
id: '1',
attributes: {
name: displayName,
extension: {
type: 'versions:autodesk.core:File',
version: '1.0'
}
},
relationships: {
storage: {
data: {
type: 'objects',
id: objectId
}
}
}
}
]
};
const resp = await new ItemsApi().postItem(projectId, body, null, getAccessToken());
return resp.body.data;
}
async function createVersion(projectId, lineageId, objectId, displayName, getAccessToken) {
const body = {
jsonapi: {
version: '1.0'
},
data: {
type: 'versions',
attributes: {
name: displayName,
extension: {
type: 'versions:autodesk.core:File',
version: '1.0'
}
},
relationships: {
item: {
data: {
type: 'items',
id: lineageId
}
},
storage: {
data: {
type: 'objects',
id: objectId
}
}
}
}
};
const resp = await new VersionsApi().postVersion(projectId, body, null, getAccessToken());
return resp.body.data;
}
async function upload(filePath, projectId, folderId, accessToken) {
const displayName = path.basename(filePath);
const getAccessToken = () => {
return { access_token: accessToken };
};
console.log('Creating storage...');
const storage = await createStorage(projectId, folderId, displayName, getAccessToken);
console.log(storage);
const match = /urn:adsk.objects:os.object:([^\/]+)\/(.+)/.exec(storage.id);
if (!match || match.length < 3) {
throw new Error('Unexpected storage ID', storage.id);
}
const bucketKey = match[1];
const objectKey = match[2];
console.log('Uploading file...');
const client = new BinaryTransferClient(accessToken);
const object = await client.uploadObject(bucketKey, objectKey, fs.readFileSync(filePath));
console.log(object);
console.log('Checking if file already exists...');
const contents = await getFolderContents(projectId, folderId, getAccessToken);
const item = contents.find(e => e.type === 'items' && e.attributes.displayName === displayName);
if (!item) {
console.log('Creating new item...');
const lineage = await createItem(projectId, folderId, object.objectId, displayName, getAccessToken);
console.log(lineage);
} else {
console.log('Creating new item version...');
const version = await createVersion(projectId, item.id, object.objectId, displayName, getAccessToken);
console.log(version);
}
}
if (process.argv.length < 6) {
console.log('Usage:');
console.log('node ' + __filename + ' <path to local file> <project id> <folder id> <access token>');
process.exit(0);
}
upload(process.argv[2], process.argv[3], process.argv[4], process.argv[5])
.then(obj => console.log('Done!'))
.catch(err => console.error(err));
ご不明な点等ございましたら、forge.help@autodesk.com.までお問い合わせください。
※ 本記事は Direct-to-S3 Node.js samples | Autodesk Forge から転写・翻訳して一部加筆したものです。
By Toshiaki Isezaki
コメント