Zippytal-Node/squadChatFSInstance.go

179 lines
5.1 KiB
Go

package localserver
import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"
"github.com/pion/webrtc/v3"
)
const (
SQUAD_CHAT_WEBRTC_OFFER ReqType = "squad_chat_offer"
SQUAD_CHAT_WEBRTC_ANSWER ReqType = "squad_chat_answer"
SQUAD_CHAT_WEBRTC_RENNEGOTIATION_OFFER ReqType = "squad_chat_rennegotiation_offer"
SQUAD_CHAT_WEBRTC_RENNEGOTIATION_ANSWER ReqType = "squad_chat_rennegotiation_answer"
SQUAD_CHAT_WEBRTC_COUNTER_OFFER ReqType = "squad_chat_webrtc_counter_offer"
SQUAD_CHAT_WEBRTC_CANDIDATE ReqType = "squad_chat_webrtc_candidate"
)
type SquadFSInstance struct {
SquadID string `json:"id"`
Owner string `json:"owner"`
Members []string `json:"members"`
OpenFiles map[string]*os.File `json:"-"`
OpenFilesForUser map[string][]string `json:"-"`
filesFlag *uint32 `json:"-"`
}
func NewSquadFSInstance(id, owner string, members []string) (squadFSInstance *SquadFSInstance) {
filesFlag := uint32(0)
squadFSInstance = &SquadFSInstance{
SquadID: id,
Owner: owner,
Members: members,
OpenFiles: make(map[string]*os.File),
OpenFilesForUser: make(map[string][]string),
filesFlag: &filesFlag,
}
return
}
func (fs *SquadFSInstance) SetupFileUpload(chatId, filename, userId string, dc *webrtc.DataChannel) (writePipe chan []byte, err error) {
concretePath := filepath.Join(dataPath, "data", "squads", fs.SquadID, "chat", "__files__", filename)
if _, err = os.ReadDir(filepath.Join(dataPath, "data", "squads", fs.SquadID, "chat")); err != nil {
return
}
if _, rErr := os.ReadDir(filepath.Join(dataPath, "data", "squads", fs.SquadID, "chat", "__files__")); os.IsNotExist(rErr) {
if err = os.MkdirAll(filepath.Join(dataPath, "data", "squads", fs.SquadID, "chat", "__files__"), 0700); err != nil {
return
}
} else if rErr != nil {
return nil, rErr
}
if err = os.Remove(concretePath); err != nil {
// logger.Println(err)
}
file, err := os.OpenFile(concretePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
return
}
writePipe = make(chan []byte)
go func(f *os.File) {
defer func() {
_ = f.Close()
}()
for chunk := range writePipe {
if _, err = file.Write(chunk); err != nil {
return
}
}
}(file)
return
}
func (fs *SquadFSInstance) FileUploadFailed(chatId, filename, userId string) (err error) {
concretePath := filepath.Join(dataPath, "data", "squads", fs.SquadID, "chat", "__files__", filename)
err = os.Remove(concretePath)
return
}
func (fs *SquadFSInstance) SetupFileDownload(chatId, filename, userId string, dc *webrtc.DataChannel) (err error) {
concretePath := filepath.Join(dataPath, "data", "squads", fs.SquadID, "chat", "__files__", filename)
file, err := os.OpenFile(concretePath, os.O_RDONLY, 0755)
if err != nil {
return
}
if dc != nil {
dc.SetBufferedAmountLowThreshold(12000000)
bufferedAmountLock := make(chan struct{})
go func(f *os.File) {
defer func() {
close(bufferedAmountLock)
bufferedAmountLock = nil
_ = f.Close()
}()
r := bufio.NewReader(file)
buf := make([]byte, 0, 50000)
sendingLoop:
for {
n, _ := r.Read(buf[:cap(buf)])
buf = buf[:n]
if n == 0 {
if err == nil {
// logger.Println("n is 0 weird")
break sendingLoop
}
if err == io.EOF {
break sendingLoop
}
// logger.Println(readErr)
return
}
if err = dc.Send(buf); err != nil {
dc.Close()
// logger.Println(err)
break sendingLoop
}
if dc.BufferedAmount() > dc.
BufferedAmountLowThreshold() {
<-bufferedAmountLock
}
}
// logger.Println("done")
if err = dc.SendText("download_done"); err != nil {
// logger.Println(err)
}
}(file)
dc.OnBufferedAmountLow(func() {
if bufferedAmountLock != nil {
bufferedAmountLock <- struct{}{}
}
})
} else {
err = fmt.Errorf("datachannel not created")
_ = atomicallyExecute(fs.filesFlag, func() (err error) {
if file, ok := fs.OpenFiles[fmt.Sprintf("%s/%s", filename, userId)]; ok {
file.Close()
}
delete(fs.OpenFiles, fmt.Sprintf("%s/%s", filename, userId))
return
})
}
return
}
func (fs *SquadFSInstance) FileDownloadFailed(chatId, filename, userId string) (err error) {
err = atomicallyExecute(fs.filesFlag, func() (err error) {
if file, ok := fs.OpenFiles[fmt.Sprintf("%s/%s", filename, userId)]; ok {
file.Close()
}
delete(fs.OpenFiles, fmt.Sprintf("%s/%s", filename, userId))
if _, ok := fs.OpenFilesForUser[userId]; ok {
var index int
for i, v := range fs.OpenFilesForUser[userId] {
if v == fmt.Sprintf("%s/%s", filename, userId) {
index = i
break
}
}
if len(fs.OpenFilesForUser[userId]) > 1 {
fs.OpenFilesForUser[userId] = append(fs.OpenFilesForUser[userId][:index], fs.OpenFilesForUser[userId][:index+1]...)
} else {
delete(fs.OpenFilesForUser, userId)
}
}
return
})
return
}
func (fs *SquadFSInstance) HandleDataChannelEvents(from, eventId string, payload map[string]interface{}) (err error) {
switch eventId {
}
return
}