Skip to content

Commit

Permalink
feat(datastore): support backup and restore
Browse files Browse the repository at this point in the history
  • Loading branch information
jialeicui committed Jan 11, 2024
1 parent cd671cb commit 2fb055a
Show file tree
Hide file tree
Showing 13 changed files with 783 additions and 6 deletions.
13 changes: 13 additions & 0 deletions client/starwhale/base/client/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,19 @@ class RuntimeSuggestionVo(SwBaseModel):
runtimes: Optional[List[RuntimeVersionVo]] = None


class BackupVo(SwBaseModel):
id: Optional[str] = None
created_at: Optional[int] = Field(None, alias='createdAt')
done_at: Optional[int] = Field(None, alias='doneAt')
approximate_size_bytes: Optional[int] = Field(None, alias='approximateSizeBytes')


class ResponseMessageListBackupVo(SwBaseModel):
code: str
message: str
data: List[BackupVo]


class UserRoleDeleteRequest(UserCheckPasswordRequest):
pass

Expand Down
88 changes: 88 additions & 0 deletions console/src/api/server/Api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
ICompleteUploadBlobData,
IConfigRequest,
IConsumeNextDataData,
ICreateBackupData,
ICreateJobData,
ICreateJobTemplateRequest,
ICreateModelServingData,
Expand All @@ -49,6 +50,7 @@ import {
IDatasetBuildRequest,
IDatasetTagRequest,
IDatasetUploadRequest,
IDeleteBackupData,
IDeleteDatasetData,
IDeleteDatasetVersionTagData,
IDeleteModelData,
Expand Down Expand Up @@ -113,6 +115,7 @@ import {
IJobModifyPinRequest,
IJobModifyRequest,
IJobRequest,
IListBackupsData,
IListBuildRecordsData,
IListDatasetData,
IListDatasetTreeData,
Expand Down Expand Up @@ -179,6 +182,7 @@ import {
IReleaseFtData,
IRemoveJobData,
IResourcePool,
IRestoreBackupData,
IRevertDatasetRequest,
IRevertDatasetVersionData,
IRevertModelVersionData,
Expand Down Expand Up @@ -2776,6 +2780,90 @@ export class Api<SecurityDataType = unknown> {
...params,
})

/**
* No description
*
* @tags data-store-controller
* @name ListBackups
* @request GET:/api/v1/datastore/maintain/backup
* @secure
* @response `200` `IListBackupsData` OK
*/
listBackups = (params: RequestParams = {}) =>
this.http.request<IListBackupsData, any>({
path: `/api/v1/datastore/maintain/backup`,
method: 'GET',
secure: true,
...params,
})

useListBackups = (params: RequestParams = {}) =>
useQuery(qs.stringify(['listBackups', params]), () => this.listBackups(params), {
enabled: [].every(Boolean),
})
/**
* No description
*
* @tags data-store-controller
* @name CreateBackup
* @request POST:/api/v1/datastore/maintain/backup
* @secure
* @response `200` `ICreateBackupData` OK
*/
createBackup = (params: RequestParams = {}) =>
this.http.request<ICreateBackupData, any>({
path: `/api/v1/datastore/maintain/backup`,
method: 'POST',
secure: true,
...params,
})

/**
* No description
*
* @tags data-store-controller
* @name DeleteBackup
* @request DELETE:/api/v1/datastore/maintain/backup
* @secure
* @response `200` `IDeleteBackupData` OK
*/
deleteBackup = (
query: {
id: string
},
params: RequestParams = {}
) =>
this.http.request<IDeleteBackupData, any>({
path: `/api/v1/datastore/maintain/backup`,
method: 'DELETE',
query: query,
secure: true,
...params,
})

/**
* No description
*
* @tags data-store-controller
* @name RestoreBackup
* @request POST:/api/v1/datastore/maintain/backup/restore
* @secure
* @response `200` `IRestoreBackupData` OK
*/
restoreBackup = (
query: {
id: string
},
params: RequestParams = {}
) =>
this.http.request<IRestoreBackupData, any>({
path: `/api/v1/datastore/maintain/backup/restore`,
method: 'POST',
query: query,
secure: true,
...params,
})

/**
* No description
*
Expand Down
24 changes: 24 additions & 0 deletions console/src/api/server/data-contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,22 @@ export interface IRuntimeSuggestionVo {
runtimes?: IRuntimeVersionVo[]
}

export interface IBackupVo {
id?: string
/** @format int64 */
createdAt?: number
/** @format int64 */
doneAt?: number
/** @format int64 */
approximateSizeBytes?: number
}

export interface IResponseMessageListBackupVo {
code: string
message: string
data: IBackupVo[]
}

export interface IUserRoleDeleteRequest {
currentUserPwd: string
}
Expand Down Expand Up @@ -2359,6 +2375,14 @@ export type IQueryTableData = IResponseMessageRecordListVo['data']

export type IQueryAndExportData = any

export type IListBackupsData = IResponseMessageListBackupVo['data']

export type ICreateBackupData = IResponseMessageString['data']

export type IDeleteBackupData = IResponseMessageString['data']

export type IRestoreBackupData = IResponseMessageString['data']

export type IListTablesData = IResponseMessageTableNameListVo['data']

export type IFlushData = IResponseMessageString['data']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import ai.starwhale.mlops.api.protocol.Code;
import ai.starwhale.mlops.api.protocol.ResponseMessage;
import ai.starwhale.mlops.api.protocol.datastore.BackupVo;
import ai.starwhale.mlops.api.protocol.datastore.ColumnDesc;
import ai.starwhale.mlops.api.protocol.datastore.FlushRequest;
import ai.starwhale.mlops.api.protocol.datastore.ListTablesRequest;
Expand All @@ -35,6 +36,7 @@
import ai.starwhale.mlops.datastore.DataStoreScanRequest;
import ai.starwhale.mlops.datastore.RecordList;
import ai.starwhale.mlops.datastore.TableQueryFilter;
import ai.starwhale.mlops.datastore.backup.Meta;
import ai.starwhale.mlops.datastore.exporter.RecordsStreamingExporter;
import ai.starwhale.mlops.datastore.impl.RecordDecoder;
import ai.starwhale.mlops.exception.SwProcessException;
Expand All @@ -60,9 +62,12 @@
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
Expand Down Expand Up @@ -293,6 +298,49 @@ void scanAndExport(
}
}

@PostMapping(value = "/datastore/maintain/backup")
@PreAuthorize("hasAnyRole('OWNER')")
ResponseEntity<ResponseMessage<BackupVo>> createBackup() {
try {
var meta = this.dataStore.createBackup();
return ResponseEntity.ok(Code.success.asResponse(meta.toVo()));
} catch (IOException e) {
log.error("create backup failed", e);
throw new SwProcessException(ErrorType.SYSTEM, "create backup failed", e);
}
}

@GetMapping(value = "/datastore/maintain/backup")
@PreAuthorize("hasAnyRole('OWNER')")
ResponseEntity<ResponseMessage<List<BackupVo>>> listBackups() {
var backups = this.dataStore.listBackups().stream().map(Meta::toVo).collect(Collectors.toList());
return ResponseEntity.ok(Code.success.asResponse(backups));
}

@PostMapping(value = "/datastore/maintain/backup/restore")
@PreAuthorize("hasAnyRole('OWNER')")
ResponseEntity<ResponseMessage<String>> restoreBackup(@Valid @RequestParam String id) {
try {
this.dataStore.restoreWithBackup(id);
} catch (IOException e) {
log.error("restore backup failed", e);
throw new SwProcessException(ErrorType.SYSTEM, "restore backup failed", e);
}
return ResponseEntity.ok(Code.success.asResponse("success"));
}

@DeleteMapping(value = "/datastore/maintain/backup")
@PreAuthorize("hasAnyRole('OWNER')")
ResponseEntity<ResponseMessage<String>> deleteBackup(@Valid @RequestParam String id) {
try {
this.dataStore.deleteBackup(id);
} catch (IOException e) {
log.error("delete backup failed", e);
throw new SwProcessException(ErrorType.SYSTEM, "delete backup failed", e);
}
return ResponseEntity.ok(Code.success.asResponse("success"));
}

private RecordList queryRecordList(QueryTableRequest request) {
if (request.getTableName() == null) {
throw new SwValidationException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.starwhale.mlops.api.protocol.datastore;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BackupVo {
String id;

/**
* The time when the backup is created
*/
private Long createdAt;

/**
* The time when the backup is done
*/
private Long doneAt;

/**
* The approximate size of the backup in bytes
*/
private Long approximateSizeBytes;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package ai.starwhale.mlops.datastore;

import ai.starwhale.mlops.datastore.ParquetConfig.CompressionCodec;
import ai.starwhale.mlops.datastore.backup.BackupManager;
import ai.starwhale.mlops.datastore.impl.MemoryTableImpl;
import ai.starwhale.mlops.datastore.impl.RecordEncoder;
import ai.starwhale.mlops.datastore.type.BaseValue;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.experimental.Delegate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.convert.DurationStyle;
Expand All @@ -68,6 +70,8 @@ public class DataStore implements OrderedRollingUpdateStatusListener {

private WalManager walManager;
private final StorageAccessService storageAccessService;
@Delegate
private final BackupManager backupManager;

private final Map<String, SoftReference<MemoryTable>> tables = new ConcurrentHashMap<>();
private final Set<MemoryTable> dirtyTables = new HashSet<>();
Expand All @@ -86,25 +90,34 @@ public class DataStore implements OrderedRollingUpdateStatusListener {

private final int ossMaxAttempts;

public DataStore(StorageAccessService storageAccessService,
public DataStore(
StorageAccessService storageAccessService,
@Value("${sw.datastore.wal-max-file-size}") int walMaxFileSize,
@Value("#{T(java.nio.file.Paths).get('${sw.datastore.wal-local-cache-dir:wal_cache}')}")
Path walLocalCacheDir,
@Value("${sw.datastore.oss-max-attempts:5}") int ossMaxAttempts,
@Value("${sw.datastore.data-root-path:}") String dataRootPath,
@Value("${sw.datastore.backup.backup-path:backup/datastore}") String backupPath,
@Value("${sw.datastore.dump-interval:1h}") String dumpInterval,
@Value("${sw.datastore.min-no-update-period:4h}") String minNoUpdatePeriod,
@Value("${sw.datastore.min-wal-id-gap:1000}") int minWalIdGap,
@Value("${sw.datastore.parquet.compression-codec:SNAPPY}") String compressionCodec,
@Value("${sw.datastore.parquet.row-group-size:128MB}") String rowGroupSize,
@Value("${sw.datastore.parquet.page-size:1MB}") String pageSize,
@Value("${sw.datastore.parquet.page-row-count-limit:20000}") int pageRowCountLimit) {
@Value("${sw.datastore.parquet.page-row-count-limit:20000}") int pageRowCountLimit
) throws IOException {
this.storageAccessService = storageAccessService;
if (!dataRootPath.isEmpty() && !dataRootPath.endsWith("/")) {
dataRootPath += "/";
}
this.dataRootPath = dataRootPath;
this.snapshotRootPath = dataRootPath + "snapshot/";
this.backupManager = new BackupManager(
backupPath,
this.snapshotRootPath,
this.dataRootPath + "wal/",
this.storageAccessService
);
this.walMaxFileSize = walMaxFileSize;
this.walLocalCacheDir = walLocalCacheDir;
this.ossMaxAttempts = ossMaxAttempts;
Expand Down Expand Up @@ -185,9 +198,11 @@ public List<String> list(Set<String> prefixes) {
}

@WriteOperation
public String update(String tableName,
public String update(
String tableName,
TableSchemaDesc schema,
List<Map<String, Object>> records) {
List<Map<String, Object>> records
) {
if (schema != null && schema.getColumnSchemaList() != null) {
for (var col : schema.getColumnSchemaList()) {
if (col.getName() != null && !COLUMN_NAME_PATTERN.matcher(col.getName()).matches()) {
Expand Down Expand Up @@ -559,9 +574,9 @@ public interface ResultResolver<R> {
/**
* Applies this function to the given arguments.
*
* @param tables table meta data
* @param tables table meta data
* @param columnSchemaMap column schema map
* @param recordIter records iterator
* @param recordIter records iterator
* @return r the result
*/
R apply(
Expand Down
Loading

0 comments on commit 2fb055a

Please sign in to comment.