Zippytal-Node/squadChatHandler.go

789 lines
22 KiB
Go

package localserver
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/dgraph-io/badger/v3"
"github.com/pion/webrtc/v3"
)
type SquadChatConfig struct {
ChatId string `json:"chatId"`
ChatType string `json:"chatType"`
Owner string `json:"owner"`
Members []string `json:"members"`
}
type SquadChat struct {
ChatId string `json:"chatId"`
ChatType string `json:"chatType"`
Owner string `json:"owner"`
Members []string `json:"members"`
LastReadIndex uint `json:"lastReadIndex"`
Unread uint `json:"unread"`
DB *SquadChatDBHandler `json:"-"`
Tracking *SquadChatTrackingDB `json:"-"`
}
type SquadChatsHandler[T ZippytalFSInstance] struct {
SquadName string
SquadId string
HostId string
ChatFSInstance T
SquadMembersId []string
DataChannels map[string]*DataChannel
ChatDataChannels map[string]*DataChannel
ChatDataChannelsFlag *uint32
Flag *uint32
ChatFSInstanceFlag *uint32
ChatFlag *uint32
Chat *SquadChat
reqChans []chan<- *SquadRequest
init bool
}
func NewSquadChatsHandler(hostId, squadId, squadName, owner string, authorizedMembers []string, dataChannels map[string]*DataChannel, flag *uint32) (squadChatsHandler *SquadChatsHandler[*SquadFSInstance], err error) {
_, err = os.ReadDir(filepath.Join(dataPath, "data", "squads", squadId, "chat"))
if err != nil {
if os.IsNotExist(err) {
// logger.Printf("creating chat directory for squad %s...\n", squadId)
mkdirErr := os.MkdirAll(filepath.Join(dataPath, "data", "squads", squadId, "chat"), 0700)
if mkdirErr != nil {
return nil, mkdirErr
}
file, ferr := os.Create(filepath.Join(dataPath, "data", "squads", squadId, "chat", "chatConfig.json"))
if ferr != nil {
return nil, ferr
}
baseConfig := ChatConfig{
ChatId: squadName,
Owner: owner,
ChatType: "public",
Members: authorizedMembers,
}
bs, jsonErr := json.Marshal(baseConfig)
if jsonErr != nil {
return nil, jsonErr
}
if _, writeErr := file.WriteString(string(bs)); writeErr != nil {
return nil, writeErr
}
_ = file.Close()
_, err = os.ReadDir(filepath.Join(dataPath, "data", "squads", squadId, "chat"))
if err != nil {
return nil, err
}
} else {
return
}
}
squadChatDBHandler, err := NewSquadChatDBHandler(squadId)
if err != nil {
return nil, err
}
var bs []byte
bs, err = os.ReadFile(filepath.Join(dataPath, "data", "squads", squadId, "chat", "chatConfig.json"))
if err != nil {
return nil, err
}
// logger.Println(string(bs))
var c SquadChat
if err = json.Unmarshal(bs, &c); err != nil {
return nil, err
}
squadChatTracking, err := NewSquadChatTracking(squadId, c.ChatId, c.Members...)
if err != nil {
return nil, err
}
// logger.Println("chats data :", c.ChatId, c.ChatType, c.Owner, c.Members)
c.DB = squadChatDBHandler
c.Tracking = squadChatTracking
chatFlag := uint32(0)
chatFSFlag := uint32(0)
chatDCFlag := uint32(0)
squadChatsHandler = &SquadChatsHandler[*SquadFSInstance]{
HostId: hostId,
SquadName: squadName,
ChatFSInstance: NewSquadFSInstance(squadId, owner, authorizedMembers),
ChatFSInstanceFlag: &chatFSFlag,
SquadId: squadId,
SquadMembersId: authorizedMembers,
DataChannels: dataChannels,
ChatDataChannels: make(map[string]*DataChannel),
ChatDataChannelsFlag: &chatDCFlag,
Flag: flag,
Chat: &c,
ChatFlag: &chatFlag,
init: false,
}
return
}
func (sch *SquadChatsHandler[T]) sendSquadRequest(reqType string, from string, payload map[string]interface{}) {
go func() {
for _, rc := range sch.reqChans {
rc <- &SquadRequest{
ReqType: reqType,
From: from,
Payload: payload,
}
}
}()
}
func (sch *SquadChatsHandler[T]) sendDataChannelMessage(reqType string, from string, to string, payload map[string]interface{}) (<-chan struct{}, <-chan error) {
done, errCh := make(chan struct{}), make(chan error)
go func() {
if err := atomicallyExecute(sch.ChatDataChannelsFlag, func() (err error) {
if _, ok := sch.ChatDataChannels[to]; ok {
bs, jsonErr := json.Marshal(&SquadResponse{
Type: reqType,
From: from,
To: to,
Payload: payload,
})
if jsonErr != nil {
return jsonErr
}
err = sch.ChatDataChannels[to].DataChannel.SendText(string(bs))
} else {
err = fmt.Errorf("no corresponding dataChannel")
}
return
}); err != nil {
errCh <- err
return
}
done <- struct{}{}
}()
return done, errCh
}
func (sch *SquadChatsHandler[T]) Init(ctx context.Context, authorizedMembers []string) (err error) {
sch.init = true
return
}
func (sch *SquadChatsHandler[T]) Subscribe(ctx context.Context, publisher <-chan *SquadRequest) (reqChan chan *SquadRequest, done chan struct{}, errCh chan error) {
reqChan, done, errCh = make(chan *SquadRequest), make(chan struct{}), make(chan error)
sch.reqChans = append(sch.reqChans, reqChan)
go func() {
for {
select {
case <-ctx.Done():
done <- struct{}{}
return
case req := <-publisher:
if err := sch.handleSquadRequest(ctx, req); err != nil {
errCh <- err
}
}
}
}()
return
}
func (sch *SquadChatsHandler[T]) ReadLastMessage(userId, chatId string) (err error) {
err = atomicallyExecute(sch.ChatFlag, func() (err error) {
err = sch.Chat.Tracking.SetUserLastIndex(userId, sch.Chat.DB.PreviousId)
return
})
return
}
func (sch *SquadChatsHandler[T]) ListLatestChatMessages(userId, chatId string, lastIndex, limit float64) (err error) {
var list []*ChatMessage
var i int
var done bool
if err = atomicallyExecute(sch.ChatFlag, func() (err error) {
list, i, done, err = sch.Chat.DB.ListChatMessages(int(lastIndex), int(limit))
if err != nil {
return
}
err = sch.Chat.Tracking.SetUserLastIndex(userId, sch.Chat.DB.PreviousId)
return
}); err != nil {
return
}
success, e := sch.sendDataChannelMessage(CHAT_MESSAGE_LIST, "node", userId, map[string]interface{}{
"done": done || i <= 0,
"lastIndex": i - 1,
"chatId": chatId,
"chatMessages": list,
})
select {
case <-success:
fmt.Println("done getting latest messages")
case err = <-e:
}
return
}
func (sch *SquadChatsHandler[T]) ListLatestChatFiles(userId, chatId string, lastIndex, limit float64) (err error) {
var list []*ChatFile
var i int
if err = atomicallyExecute(sch.ChatFlag, func() (err error) {
list, i, err = sch.Chat.DB.ListChatFiles(int(lastIndex), int(limit))
return
}); err != nil {
return
}
done, e := sch.sendDataChannelMessage(CHAT_FILES_LIST, "node", userId, map[string]interface{}{
"done": i <= 0,
"lastIndex": i,
"chatId": chatId,
"chatMessages": list,
})
select {
case <-done:
case err = <-e:
}
return
}
func (sch *SquadChatsHandler[T]) AddChatMessage(userId, chatId, content string, isResponse bool, chatResponseId uint64, file *ChatFile) (err error) {
dateTime := time.Now().Format(time.RFC3339)
chatMessage := &ChatMessage{
Content: content,
From: userId,
IsResponse: isResponse,
ResponseOf: nil,
File: file,
Tags: make([]string, 0),
Date: dateTime,
}
if err = atomicallyExecute(sch.ChatFlag, func() (err error) {
if isResponse {
parentMessage, getErr := sch.Chat.DB.GetChatMessage(chatResponseId)
if err != nil {
if getErr != badger.ErrKeyNotFound {
return getErr
}
}
chatMessage.ResponseOf = parentMessage
}
if err = sch.Chat.DB.AddNewChatMessage(chatMessage); err != nil {
return
}
chatMessage.ID = sch.Chat.DB.PreviousId
_ = sch.Chat.Tracking.SetUserLastIndex(userId, chatMessage.ID)
return
}); err != nil {
return
}
notifyActivity := func(member string) {
chat := sch.Chat
_ = atomicallyExecute(sch.ChatFlag, func() (err error) {
li, err := chat.Tracking.GetUserLastIndex(member)
if err != nil {
return err
}
count, err := chat.DB.calculateNewChatCount(uint64(li))
if err != nil {
return err
}
if count == 1 {
bs, err := json.Marshal(map[string]any{
"squadId": sch.SquadId,
"squadHost": NodeID,
})
if err != nil {
return err
}
sch.sendSquadRequest(CREATE_NOTIFICATION, "node", map[string]interface{}{
"type": NEW_SQUAD_CHAT_ACTIVITY,
"title": "Unread messages 👀",
"body": fmt.Sprintf("New messages in %s", sch.SquadName),
"isPushed": true,
"payload": string(bs),
"recipients": []string{member},
})
}
return
})
}
go func() {
if e := sch.signaSquadLastInteractionTime(context.Background(), dateTime); e != nil {
fmt.Println(e)
}
}()
for _, v := range sch.SquadMembersId {
done, e := sch.sendDataChannelMessage(NEW_CHAT_MESSAGE, "node", v, map[string]interface{}{
"chatMessage": chatMessage,
"chatId": chatId,
})
select {
case <-done:
case _ = <-e:
go notifyActivity(v)
}
}
return
}
// func (sch *SquadChatsHandler[T]) ConnectToChatFSInstance(channelId string, userId string, sdp string) (err error) {
// err = atomicallyExecute(sch.ChatFSInstanceFlag, func() (err error) {
// d, e := sch.ChatFSInstance.HandleOffer(context.Background(), channelId, userId, sdp, sch.HostId, sch.sendDataChannelMessage, sch.signalCandidate)
// select {
// case <-d:
// case <-e:
// }
// return
// })
// return
// }
// func (sch *SquadChatsHandler[T]) LeaveChatFSInstance(
// userId string) (err error) {
// sch.ChatFSInstance.HandleLeavingMember(userId)
// return
// }
func (sch *SquadChatsHandler[T]) DeleteChatMessage(key uint64, chatId string) (err error) {
err = atomicallyExecute(sch.ChatFlag, func() (err error) {
chat := sch.Chat
if err = chat.DB.DeleteChatMessage(key); err != nil {
return
}
if chat.ChatType == PRIVATE {
for _, member := range chat.Members {
d, e := sch.sendDataChannelMessage(CHAT_MESSAGE_DELETED, "node", member, map[string]interface{}{
"chatId": chatId,
"messageId": key,
})
select {
case <-d:
case <-e:
// logger.Println(tempErr)
}
}
} else {
for _, member := range sch.SquadMembersId {
d, e := sch.sendDataChannelMessage(CHAT_MESSAGE_DELETED, "node", member, map[string]interface{}{
"chatId": chatId,
"messageId": key,
})
select {
case <-d:
case <-e:
// logger.Println(tempErr)
}
}
}
previousId := chat.DB.PreviousId
if previousId == key {
if err = chat.DB.revertPreviousId(); err != nil {
return
}
previousId = chat.DB.PreviousId
if previousId == 1 {
previousId = 0
}
if err = chat.Tracking.RevertTrackingLastIndex(previousId); err != nil {
return
}
}
return
})
return
}
func (sch *SquadChatsHandler[T]) UpdateChatMessage(key uint64, chatId, newContent string) (err error) {
err = atomicallyExecute(sch.ChatFlag, func() (err error) {
chat := sch.Chat
if err = chat.DB.ModifyChatMessage(key, newContent); err != nil {
return
}
if chat.ChatType == PRIVATE {
for _, member := range chat.Members {
d, e := sch.sendDataChannelMessage(CHAT_MESSAGE_EDITED, "node", member, map[string]interface{}{
"chatId": chatId,
"messageId": key,
"newContent": newContent,
})
select {
case <-d:
case <-e:
// logger.Println(tempErr)
}
}
} else {
for _, member := range sch.SquadMembersId {
d, e := sch.sendDataChannelMessage(CHAT_MESSAGE_EDITED, "node", member, map[string]interface{}{
"chatId": chatId,
"messageId": key,
"newContent": newContent,
})
select {
case <-d:
case <-e:
// logger.Println(tempErr)
}
}
}
return
})
return
}
func (sch *SquadChatsHandler[T]) DeleteChatFile(key uint64, fileName, chatId string) (err error) {
err = atomicallyExecute(sch.ChatFlag, func() (err error) {
chat := sch.Chat
if err = chat.DB.DeleteChatFile(fileName, key); err != nil {
return
}
if chat.ChatType == PRIVATE {
for _, member := range chat.Members {
d, e := sch.sendDataChannelMessage(CHAT_FILE_DELETED, "node", member, map[string]interface{}{
"chatId": chatId,
"fileId": key,
"fileName": fileName,
})
select {
case <-d:
case <-e:
// logger.Println(tempErr)
}
}
} else {
for _, member := range sch.SquadMembersId {
d, e := sch.sendDataChannelMessage(CHAT_FILE_DELETED, "node", member, map[string]interface{}{
"chatId": chatId,
"fileId": key,
"fileName": fileName,
})
select {
case <-d:
case <-e:
// logger.Println(tempErr)
}
}
}
return
})
return
}
func (sch *SquadChatsHandler[T]) chatFileUpload(chatId string, filename string, userId string, dc *DataChannel) {
var writePipe chan<- []byte
var done bool
uploadDone := func() {
if writePipe != nil {
close(writePipe)
writePipe = nil
}
done = true
}
dc.DataChannel.OnError(func(err error) {
// logger.Println(err)
// logger.Println("abort...")
if !done {
uploadDone()
}
if fufErr := sch.ChatFSInstance.FileUploadFailed(chatId, filename, userId); fufErr != nil {
// logger.Println(fufErr)
}
})
dc.DataChannel.OnClose(func() {
if done {
// logger.Println("closing gracefully...")
} else {
// logger.Println("abort...")
uploadDone()
if fufErr := sch.ChatFSInstance.FileUploadFailed(chatId, filename, userId); fufErr != nil {
// logger.Println(fufErr)
}
}
})
dc.DataChannel.OnMessage(func(msg webrtc.DataChannelMessage) {
if msg.IsString {
if string(msg.Data) == "init_upload" {
// logger.Println("init upload....")
var initErr error
if writePipe, initErr = sch.ChatFSInstance.SetupFileUpload(chatId, filename, userId, dc.DataChannel); initErr != nil {
_ = dc.DataChannel.SendText("abort")
_ = dc.DataChannel.Close()
return
}
// logger.Println("upload ready !")
_ = dc.DataChannel.SendText("upload_ready")
} else if string(msg.Data) == "upload_done" {
uploadDone()
}
} else {
writePipe <- msg.Data
}
})
dc.DataChannel.OnOpen(func() {
_ = dc.DataChannel.SendText("channel_ready")
})
}
func (sch *SquadChatsHandler[T]) chatFileDownload(chatId string, filename string, userId string, dc *DataChannel) {
var done bool
dc.DataChannel.OnError(func(err error) {
if !done {
// logger.Println("abort...")
if fdf := sch.ChatFSInstance.FileDownloadFailed(chatId, filename, userId); fdf != nil {
// logger.Println(fdf)
}
}
})
dc.DataChannel.OnClose(func() {
if !done {
// logger.Println("abort...")
if fdf := sch.ChatFSInstance.FileDownloadFailed(chatId, filename, userId); fdf != nil {
// logger.Println(fdf)
}
}
})
dc.DataChannel.OnMessage(func(msg webrtc.DataChannelMessage) {
if msg.IsString {
if string(msg.Data) == "init_download" {
// logger.Println("init download....")
var initErr error
if initErr = sch.ChatFSInstance.SetupFileDownload(chatId, filename, userId, dc.DataChannel); initErr != nil {
fmt.Println("uwuwuwuwuwuwuwuwu")
fmt.Println(initErr)
_ = dc.DataChannel.SendText("abort")
_ = dc.DataChannel.Close()
return
}
// logger.Println("download started !")
} else if string(msg.Data) == "download_done" {
done = true
}
}
})
dc.DataChannel.OnOpen(func() {
_ = dc.DataChannel.SendText("channel_ready")
})
}
func (sch *SquadChatsHandler[T]) handleDataChannel(ctx context.Context, dc *DataChannel) (catched bool) {
var label string
_ = atomicallyExecute(dc.l, func() (err error) {
label = dc.DataChannel.Label()
return
})
if strings.Contains(label, "chat_upload") {
command := strings.Split(label, "|")
if len(command) == 4 {
catched = true
go sch.chatFileUpload(command[1], command[2], command[3], dc)
}
// logger.Println(command)
} else if strings.Contains(label, "chat_download") {
command := strings.Split(label, "|")
catched = true
go sch.chatFileDownload(command[1], command[2], command[3], dc)
// logger.Println(command)
} else if strings.Contains(label, "chat_data") {
command := strings.Split(label, "|")
catched = true
_ = atomicallyExecute(sch.ChatDataChannelsFlag, func() (err error) {
sch.ChatDataChannels[command[1]] = dc
return
})
dc.DataChannel.OnOpen(func() {
fmt.Println("datachann in squad chat fking created")
bs, err := json.Marshal(map[string]any{
"type": "chat_init",
"from": NodeID,
"to": command[1],
"payload": map[string]any{
"chatId": sch.SquadId,
},
})
if err != nil {
fmt.Println(err)
}
_ = dc.DataChannel.SendText(string(bs))
})
dc.DataChannel.OnClose(func() {
fmt.Println("closing gratefully chat dc...")
_ = atomicallyExecute(sch.ChatDataChannelsFlag, func() (err error) {
delete(sch.ChatDataChannels, command[1])
return
})
})
dc.DataChannel.OnError(func(err error) {
fmt.Println("error in chat dc...")
_ = atomicallyExecute(sch.ChatDataChannelsFlag, func() (err error) {
delete(sch.ChatDataChannels, command[1])
return
})
})
}
return
}
func (sch *SquadChatsHandler[T]) signaSquadLastInteractionTime(ctx context.Context, dateTime string) (err error) {
fmt.Println("this function has been called too")
em := NewEncryptionManager()
sig := em.SignRequestHMAC(NodeID)
body, err := json.Marshal(map[string]interface{}{
"type": UPDATE_SQUAD_LAST_INTERACTION_TIME,
"mac": sig,
"from": NodeID,
"peerType": "node",
"payload": map[string]string{
"squadId": sch.SquadId,
"lastInteractionTime": dateTime,
},
})
if err != nil {
return
}
res, err := http.Post("https://dev.zippytal.com/req", "application/json", bytes.NewReader(body))
if err != nil {
// logger.Println("error come from there inn chat manager")
return
}
_, err = io.ReadAll(res.Body)
if err != nil {
return
}
return
}
func (sch *SquadChatsHandler[T]) handleSquadRequest(ctx context.Context, req *SquadRequest) (err error) {
// logger.Println("got request in squad chat handler", req)
switch req.ReqType {
case LEAVE_ZONE:
if err = VerifyFieldsString(req.Payload, "userId"); err != nil {
return
}
// err = sch.LeaveChatFSInstance(req.Payload["userId"].(string))
case string(NEW_AUTHORIZED_SQUAD_MEMBER):
if err = VerifyFieldsString(req.Payload, "userId"); err != nil {
return
}
var contain bool
for _, m := range sch.SquadMembersId {
if m == req.Payload["userId"].(string) {
contain = true
break
}
}
if !contain {
sch.SquadMembersId = append(sch.SquadMembersId, req.Payload["userId"].(string))
fmt.Println("squad member added")
}
return
case string(REMOVED_SQUAD_AUTHORIZED_MEMBER):
if err = VerifyFieldsString(req.Payload, "userId"); err != nil {
return
}
var index int
var found bool
for i, m := range sch.SquadMembersId {
if m == req.Payload["userId"].(string) {
index = i
found = true
break
}
}
if !found {
err = fmt.Errorf("no such user in zone")
return
}
sch.SquadMembersId = append(sch.SquadMembersId[:index], sch.SquadMembersId[index+1:]...)
fmt.Println("squad member removed")
return
case LIST_LATEST_CHATS:
if err = VerifyFieldsString(req.Payload, "chatId"); err != nil {
return
}
if err = VerifyFieldsFloat64(req.Payload, "lastIndex", "limit"); err != nil {
return
}
err = sch.ListLatestChatMessages(req.From, req.Payload["chatId"].(string), req.Payload["lastIndex"].(float64), req.Payload["limit"].(float64))
return
case LIST_LATEST_FILES:
if err = VerifyFieldsString(req.Payload, "chatId"); err != nil {
return
}
if err = VerifyFieldsFloat64(req.Payload, "lastIndex", "limit"); err != nil {
return
}
err = sch.ListLatestChatFiles(req.From, req.Payload["chatId"].(string), req.Payload["lastIndex"].(float64), req.Payload["limit"].(float64))
return
case READ_LATEST_MESSAGE:
if err = VerifyFieldsString(req.Payload, "chatId"); err != nil {
return
}
err = sch.ReadLastMessage(req.From, req.Payload["chatId"].(string))
case ADD_CHAT_MESSAGE:
// logger.Println("got request in squad chat handler", req)
if err = VerifyFieldsString(req.Payload, "chatId", "content"); err != nil {
return
}
if err = VerifyFieldsBool(req.Payload, "isResponse"); err != nil {
return
}
var parentChatId uint64
if req.Payload["isResponse"].(bool) {
if err = VerifyFieldsFloat64(req.Payload, "parentChatId"); err != nil {
return
}
parentChatId = uint64(req.Payload["parentChatId"].(float64))
}
var file *ChatFile = nil
if _, ok := req.Payload["file"]; ok {
bs, jsonErr := json.Marshal(req.Payload["file"])
if jsonErr != nil {
return jsonErr
}
var f ChatFile
if err = json.Unmarshal(bs, &f); err != nil {
err = fmt.Errorf("the file payload dont match ChatFile struct pattern : %v", err)
return
}
file = &f
}
err = sch.AddChatMessage(req.From, req.Payload["chatId"].(string), req.Payload["content"].(string), req.Payload["isResponse"].(bool), parentChatId, file)
case DELETE_CHAT_MESSAGE:
if err = VerifyFieldsString(req.Payload, "chatId"); err != nil {
return
}
if err = VerifyFieldsFloat64(req.Payload, "messageId"); err != nil {
return
}
err = sch.DeleteChatMessage(uint64(req.Payload["messageId"].(float64)), req.Payload["chatId"].(string))
case EDIT_CHAT_MESSAGE:
if err = VerifyFieldsString(req.Payload, "chatId", "newContent"); err != nil {
return
}
if err = VerifyFieldsFloat64(req.Payload, "messageId"); err != nil {
return
}
err = sch.UpdateChatMessage(uint64(req.Payload["messageId"].(float64)), req.Payload["chatId"].(string), req.Payload["newContent"].(string))
case DELETE_CHAT_FILE:
if err = VerifyFieldsString(req.Payload, "chatId", "fileName"); err != nil {
return
}
if err = VerifyFieldsFloat64(req.Payload, "fileId"); err != nil {
return
}
err = sch.DeleteChatFile(uint64(req.Payload["fileId"].(float64)), req.Payload["fileName"].(string), req.Payload["chatId"].(string))
}
return
}