fusenapi/server/upload/internal/logic/uploadfilesbackendlogic.go
2023-08-02 11:30:27 +08:00

260 lines
6.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package logic
import (
"encoding/json"
"fmt"
"fusenapi/model/gmodel"
"fusenapi/utils/auth"
"fusenapi/utils/basic"
"fusenapi/utils/hash"
"io"
"net/http"
"time"
"context"
"fusenapi/server/upload/internal/svc"
"fusenapi/server/upload/internal/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mr"
)
type UploadFilesBackendLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
r *http.Request
}
func NewUploadFilesBackendLogic(r *http.Request, svcCtx *svc.ServiceContext) *UploadFilesBackendLogic {
return &UploadFilesBackendLogic{
Logger: logx.WithContext(r.Context()),
ctx: r.Context(),
svcCtx: svcCtx,
r: r,
}
}
// 处理进入前逻辑w,r
// func (l *UploadFilesBackendLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) {
// }
// 处理逻辑后 w,r 如:重定向, resp 必须重新处理
// func (l *UploadFilesBackendLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) {
// // httpx.OkJsonCtx(r.Context(), w, resp)
// }
func (l *UploadFilesBackendLogic) UploadFilesBackend(req *types.UploadFilesReq, userinfo *auth.UserInfo) (resp *basic.Response) {
// 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data)
// userinfo 传入值时, 一定不为null
if userinfo.IsOnlooker() {
// 如果是,返回未授权的错误码
return resp.SetStatus(basic.CodeUnAuth)
}
// 定义用户ID和S3键名格式
var uid int64
var userId int64
var guestId int64
// 检查用户是否是游客
if userinfo.IsGuest() {
// 如果是使用游客ID和游客键名格式
guestId = userinfo.GuestId
uid = guestId
} else {
// 否则使用用户ID和用户键名格式
userId = userinfo.UserId
uid = userId
}
var uploadInfoList []UploadInfo
err := json.Unmarshal([]byte(req.UploadInfo), &uploadInfoList)
if err != nil {
logx.Error(err)
return resp.SetStatus(basic.CodeFileUploadErr, "file upload err,params Unmarshal failed")
}
var fileLen = len(uploadInfoList)
if fileLen == 0 {
return resp.SetStatus(basic.CodeFileUploadErr, "file upload err,no files")
}
if req.ApiType == 1 && fileLen > 100 {
return resp.SetStatus(basic.CodeFileUploadErr, "file upload err, files count is beyond the maximum")
}
// 定义存储桶名称
var bucketName *string
// 根据类别选择存储桶
switch req.UploadBucket {
case 2:
bucketName = basic.TempfileBucketName
default:
bucketName = basic.StorageBucketName
}
//设置内存大小
l.r.ParseMultipartForm(32 << 20)
//获取上传的文件组
files := l.r.MultipartForm.File["file"]
// 设置AWS会话的区域
l.svcCtx.AwsSession.Config.Region = aws.String("us-west-1")
// 创建新的S3服务实例
svc := s3.New(l.svcCtx.AwsSession)
// 定义S3请求和当前时间
var s3req *request.Request
resourceModel := gmodel.NewFsResourceModel(l.svcCtx.MysqlConn)
result, err := mr.MapReduce(func(source chan<- interface{}) {
for i, info := range uploadInfoList {
fileType := files[i].Header.Get("Content-Type")
// 打开文件
file, err := files[i].Open()
if err != nil {
logx.Error(err)
}
defer file.Close()
// 读取数据流
ioData, err := io.ReadAll(file)
if err != nil {
logx.Error(err)
}
// 一系列业务逻辑....验证类型,文件大小
var hashKey string = hash.JsonHashKey(fmt.Sprintf("%s%d", info.FileKeys, uid))
source <- UploadData{
FileKey: info.FileKeys,
FileType: fileType,
Metadata: info.Metadata,
FileData: ioData,
ApiType: req.ApiType,
Bucket: bucketName,
HashKey: hashKey,
}
}
}, func(item interface{}, writer mr.Writer[interface{}], cancel func(error)) {
uploadDataInfo := item.(UploadData)
var uploadUrl = UploadUrl{}
uploadUrl.Key = uploadDataInfo.FileKey
uploadUrl.ApiType = uploadDataInfo.ApiType
uploadUrl.ResourceType = uploadDataInfo.FileType
var resourceId string = uploadDataInfo.HashKey
// 查询数据库
resourceInfo, err := resourceModel.FindOneById(l.ctx, resourceId)
if err == nil && resourceInfo.ResourceId != "" {
uploadUrl.Status = 1
uploadUrl.ResourceId = resourceId
uploadUrl.ResourceUrl = *resourceInfo.ResourceUrl
} else {
// 创建S3对象存储请求
s3req, _ = svc.PutObjectRequest(
&s3.PutObjectInput{
Bucket: uploadDataInfo.Bucket,
Key: &uploadDataInfo.HashKey,
},
)
// 设置请求体为文件数据
s3req.SetBufferBody(uploadDataInfo.FileData)
// 发送请求
err = s3req.Send()
// 检查是否有错误
if err != nil {
logx.Error(err)
uploadUrl.Status = 0
} else {
var url = s3req.HTTPRequest.URL.String()
// 打印请求URL
logx.Info(url)
uploadUrl.Status = 1
uploadUrl.ResourceId = resourceId
uploadUrl.ResourceUrl = url
var version string = "0.0.1"
var nowTime = time.Now()
_, err = resourceModel.CreateOrUpdate(l.ctx, &gmodel.FsResource{
ResourceId: resourceId,
UserId: &userId,
GuestId: &guestId,
ResourceType: &uploadDataInfo.FileType,
ResourceUrl: &url,
Version: &version,
UploadedAt: &nowTime,
Metadata: &uploadDataInfo.Metadata,
ApiType: &uploadDataInfo.ApiType,
})
if err != nil {
logx.Error(err)
}
}
}
// Notice 这个必须加!
writer.Write(uploadUrl)
}, func(pipe <-chan interface{}, writer mr.Writer[interface{}], cancel func(error)) {
var uploadUrlList = make(map[string][]*UploadUrl)
var uploadUrlListFail []*UploadUrl
var uploadUrlListSuccess []*UploadUrl
for p := range pipe {
var uploadUrl = p.(UploadUrl)
if uploadUrl.Status == 1 {
uploadUrlListSuccess = append(uploadUrlListSuccess, &uploadUrl)
} else {
uploadUrlListFail = append(uploadUrlListFail, &uploadUrl)
}
}
// Notice 这个必须加!
uploadUrlList["success"] = uploadUrlListSuccess
uploadUrlList["fail"] = uploadUrlListFail
writer.Write(uploadUrlList)
})
if err != nil {
logx.Error(err)
}
// 返回成功的响应和上传URL
return resp.SetStatus(basic.CodeOK, map[string]interface{}{
"upload_urls": result,
})
}
type UploadInfo struct {
FileSize int64 `json:"file_size"` // 上传唯一标识信息
FileKeys string `json:"file_keys"` // 上传唯一标识信息
Metadata string `json:"meta_data"` // 上传文件额外信息
}
type UploadData struct {
ApiType int64 `json:"api_type"`
FileSize int64 `json:"file_size"`
FileType string `json:"file_type"`
FileKey string `json:"file_key"`
Metadata string `json:"metadata"`
Bucket *string `json:"bucket"`
HashKey string `json:"hash_key"`
FileData []byte `fsfile:"data"`
}
type UploadUrl struct {
Key string `json:"key"`
Status int64 `json:"status"`
ApiType int64 `json:"api_type"`
ResourceId string `json:"resource_id"`
ResourceType string `json:"resource_type"`
ResourceUrl string `json:"resource_url"`
}