package localserver import ( "context" "encoding/json" "fmt" "io/fs" "os" "path/filepath" "strings" "time" "github.com/dgraph-io/badger/v3" "github.com/pion/webrtc/v3" ) const ( LIST_LATEST_CHATS = "list_latest_chats" READ_LATEST_MESSAGE = "read_latest_message" LIST_LATEST_FILES = "list_latest_files" LIST_CHATS = "list_chats" GET_CHATS = "get_chats" GET_CHAT = "get_chat" CREATE_CHAT = "create_chat" DELETE_CHAT = "delete_chat" EDIT_CHAT_TYPE = "edit_chat_type" EDIT_CHAT_NAME = "edit_chat_name" ADD_CHAT_MEMBERS = "add_chat_members" REMOVE_CHAT_MEMBER = "remove_chat_member" ADD_CHAT_MESSAGE = "add_chat_message" ZONE_UPLOAD_CHAT_FILE = "zone_upload_chat_file" ZONE_DOWNLOAD_CHAT_FILE = "zone_download_chat_file" DELETE_CHAT_FILE = "delete_chat_file" DELETE_CHAT_MESSAGE = "delete_chat_message" EDIT_CHAT_MESSAGE = "edit_chat_message" ZONE_UPLOAD_CHAT_FILE_DONE = "zone_upload_chat_file_done" ) const ( CHAT_NAME_EDITED = "chat_name_edited" CHAT_TYPE_EDITED = "chat_type_edited" CHAT_MESSAGE_LIST = "chat_messages_list" CHAT_FILES_LIST = "chat_files_list" CHAT_MEMBER_ADDED = "chat_member_added" CHAT_MEMBER_REMOVED = "chat_member_removed" REMOVED_FROM_CHAT = "removed_from_chat" ADDED_IN_CHAT = "added_in_chat" GET_CHATS_RESPONSE = "get_chats_response" NEW_CHAT_MESSAGE = "new_chat_message" CHAT_MESSAGE_DELETED = "chat_message_deleted" CHAT_MESSAGE_EDITED = "chat_message_edited" CHAT_FILE_DELETED = "chat_file_deleted" NEW_CHAT_FILE = "new_chat_file" ) const ( GENERAL = "general" PUBLIC = "public" PRIVATE = "private" BROADCAST = "broadcast" ) type ChatConfig struct { ChatId string `json:"chatId"` ChatType string `json:"chatType"` Owner string `json:"owner"` Members []string `json:"members"` } type Chat struct { ChatId string `json:"chatId"` ChatType string `json:"chatType"` Owner string `json:"owner"` Members []string `json:"members"` LastReadIndex uint `json:"lastReadIndex"` Unread uint `json:"unread"` DB *ZoneChatDBHandler `json:"-"` Tracking *ZoneChatTrackingDB `json:"-"` } type ZoneChatsHandler[T ZippytalFSInstance] struct { ZoneName string ZoneId string HostId string ChatFSInstance T ZoneMembersId []string DataChannels map[string]*DataChannel ChatDataChannels map[string]*DataChannel ChatDataChannelsFlag *uint32 Flag *uint32 ChatFSInstanceFlag *uint32 ChatFlag *uint32 Chats map[string]*Chat reqChans []chan<- *ZoneRequest init bool } func NewZoneChatsHandler(hostId, zoneId, zoneName, owner string, authorizedMembers []string, dataChannels map[string]*DataChannel, flag *uint32) (zoneChatsHandler *ZoneChatsHandler[*ChatFSInstance], err error) { var dirs []fs.DirEntry dirs, err = os.ReadDir(filepath.Join(dataPath, "data", "zones", zoneId, "chats")) if err != nil { if os.IsNotExist(err) { logger.Printf("creating chat directory for zone %s...\n", zoneId) mkdirErr := os.MkdirAll(filepath.Join(dataPath, "data", "zones", zoneId, "chats", GENERAL), 0700) if mkdirErr != nil { return nil, mkdirErr } file, ferr := os.Create(filepath.Join(dataPath, "data", "zones", zoneId, "chats", GENERAL, "chatConfig.json")) if ferr != nil { return nil, ferr } baseConfig := ChatConfig{ ChatId: GENERAL, Owner: owner, ChatType: "public", Members: []string{}, } bs, jsonErr := json.Marshal(baseConfig) if jsonErr != nil { return nil, jsonErr } if _, writeErr := file.WriteString(string(bs)); writeErr != nil { return nil, writeErr } _ = file.Close() dirs, err = os.ReadDir(filepath.Join(dataPath, "data", "zones", zoneId, "chats")) if err != nil { return nil, err } } else { return } } chats := make(map[string]*Chat) for _, chat := range dirs { if strings.HasPrefix(chat.Name(), ".") { continue } zoneChatDBHandler, err := NewZoneChatDBHandler(zoneId, chat.Name()) if err != nil { return nil, err } var bs []byte bs, err = os.ReadFile(filepath.Join(dataPath, "data", "zones", zoneId, "chats", chat.Name(), "chatConfig.json")) if err != nil { return nil, err } logger.Println(string(bs)) var c Chat if err = json.Unmarshal(bs, &c); err != nil { return nil, err } zoneChatTracking, err := NewZoneChatTracking(zoneId, c.ChatId, c.Members...) if err != nil { return nil, err } logger.Println("chats data :", c.ChatId, c.ChatType, c.Owner, c.Members) c.DB = zoneChatDBHandler c.Tracking = zoneChatTracking chats[c.ChatId] = &c } chatFlag := uint32(0) chatFSFlag := uint32(0) chatDCFlag := uint32(0) zoneChatsHandler = &ZoneChatsHandler[*ChatFSInstance]{ HostId: hostId, ZoneName: zoneName, ChatFSInstance: NewChatFSInstance(zoneId, owner, authorizedMembers), ChatFSInstanceFlag: &chatFSFlag, ZoneId: zoneId, ZoneMembersId: authorizedMembers, DataChannels: dataChannels, ChatDataChannels: make(map[string]*DataChannel), ChatDataChannelsFlag: &chatDCFlag, Flag: flag, Chats: chats, ChatFlag: &chatFlag, init: false, } return } func (zch *ZoneChatsHandler[T]) sendZoneRequest(reqType string, from string, payload map[string]interface{}) { go func() { for _, rc := range zch.reqChans { rc <- &ZoneRequest{ ReqType: reqType, From: from, Payload: payload, } } }() } func (zch *ZoneChatsHandler[T]) sendDataChannelMessage(reqType string, from string, to string, payload map[string]interface{}) (<-chan struct{}, <-chan error) { done, errCh := make(chan struct{}), make(chan error) go func() { if err := atomicallyExecute(zch.ChatDataChannelsFlag, func() (err error) { if _, ok := zch.ChatDataChannels[to]; ok { bs, jsonErr := json.Marshal(&ZoneResponse{ Type: reqType, From: from, To: to, Payload: payload, }) if jsonErr != nil { return jsonErr } err = zch.ChatDataChannels[to].DataChannel.SendText(string(bs)) } else { err = fmt.Errorf("no corresponding dataChannel") } return }); err != nil { errCh <- err return } done <- struct{}{} }() return done, errCh } func (zch *ZoneChatsHandler[T]) Init(ctx context.Context, authorizedMembers []string) (err error) { for _, member := range authorizedMembers { if serr := zch.SetAllPublicChatForUser(member); serr != nil { logger.Println(serr) } } zch.init = true return } func (zch *ZoneChatsHandler[T]) Subscribe(ctx context.Context, publisher <-chan *ZoneRequest) (reqChan chan *ZoneRequest, done chan struct{}, errCh chan error) { reqChan, done, errCh = make(chan *ZoneRequest), make(chan struct{}), make(chan error) zch.reqChans = append(zch.reqChans, reqChan) go func() { for { select { case <-ctx.Done(): done <- struct{}{} return case req := <-publisher: if err := zch.handleZoneRequest(ctx, req); err != nil { errCh <- err } } } }() return } // func (zch *ZoneChatsHandler[T]) signalCandidate(from string, to string, candidate *webrtc.ICECandidate) (err error) { // d, e := zch.sendDataChannelMessage(string(ZONE_CHAT_WEBRTC_CANDIDATE), from, to, map[string]interface{}{ // "from": from, // "to": to, // "candidate": candidate.ToJSON().Candidate, // "sdpMid": *candidate.ToJSON().SDPMid, // "sdpMLineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)), // }) // select { // case <-d: // case <-e: // } // return // } func (zch *ZoneChatsHandler[T]) GetChats(userId string, chatsId ...interface{}) (err error) { chats := make([]*Chat, 0, len(chatsId)) for _, id := range chatsId { if _, ok := id.(string); !ok { err = fmt.Errorf("id of wrong type") return } _ = atomicallyExecute(zch.ChatFlag, func() (err error) { if _, ok := zch.Chats[id.(string)]; ok { logger.Println(zch.Chats[id.(string)]) chats = append(chats, zch.Chats[id.(string)]) } return }) } for _, chat := range chats { index, err := chat.Tracking.GetUserLastIndex(userId) if err != nil { return err } unread, err := chat.DB.calculateNewChatCount(uint64(index)) if err != nil { return err } chat.LastReadIndex = index chat.Unread = unread } done, e := zch.sendDataChannelMessage(GET_CHATS_RESPONSE, "node", userId, map[string]interface{}{ "chats": chats, }) select { case <-done: case err = <-e: } return } func (zch *ZoneChatsHandler[T]) ListChats() (chats []*Chat, err error) { chats = make([]*Chat, 0) _ = atomicallyExecute(zch.ChatFlag, func() (err error) { for _, chat := range zch.Chats { chats = append(chats, chat) } return }) return } func (zch *ZoneChatsHandler[T]) AddNewChat(chatName string, owner string, chatType string, members []interface{}) (err error) { if chatName == "" { err = fmt.Errorf("not a valid chat name provided") return } if _, ok := zch.Chats[chatName]; ok { err = fmt.Errorf("a chat with this name already exist") return } mkdirErr := os.MkdirAll(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatName), 0700) if mkdirErr != nil { return mkdirErr } mkdirErr = os.Mkdir(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatName, "__tracking__"), 0700) if mkdirErr != nil { return mkdirErr } file, ferr := os.Create(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatName, "chatConfig.json")) defer func() { _ = file.Close() }() if ferr != nil { return ferr } m := make([]string, 0, len(members)) if chatType == PRIVATE { for _, member := range members { if mbr, ok := member.(string); ok && mbr != owner { m = append(m, mbr) } } m = append(m, owner) } else { m = zch.ZoneMembersId } baseConfig := &ChatConfig{ ChatId: chatName, Owner: owner, ChatType: chatType, Members: m, } bs, jsonErr := json.Marshal(baseConfig) if jsonErr != nil { return jsonErr } if _, writeErr := file.WriteString(string(bs)); writeErr != nil { return writeErr } err = file.Close() if err != nil { return err } zoneChatDBHandler, err := NewZoneChatDBHandler(zch.ZoneId, chatName) if err != nil { return err } zoneChatTrackingDB, err := NewZoneChatTracking(zch.ZoneId, chatName) if err != nil { return err } if err = zoneChatTrackingDB.Initialize(1, baseConfig.Members...); err != nil { return err } var c Chat if err = json.Unmarshal(bs, &c); err != nil { return err } c.DB = zoneChatDBHandler c.Tracking = zoneChatTrackingDB _ = atomicallyExecute(zch.ChatFlag, func() (err error) { zch.Chats[c.ChatId] = &c return }) newChatForMembers := func(members []string) (err error) { for _, member := range members { zch.sendZoneRequest(ADD_KNOWN_CHAT, "node", map[string]interface{}{ "userId": member, "chatId": chatName, }) done, e := zch.sendDataChannelMessage(GET_CHATS_RESPONSE, "node", member, map[string]interface{}{ "chats": []*Chat{zch.Chats[chatName]}, }) select { case <-done: case <-e: } } bs, err := json.Marshal(map[string]any{ "zoneId": zch.ZoneId, "chatId": chatName, }) if err != nil { return err } zch.sendZoneRequest(CREATE_NOTIFICATION, "node", map[string]interface{}{ "type": "added_in_chat", "title": "Added in zone text channel 💬", "body": fmt.Sprintf("Added in channel %s in zone %s", c.ChatId, zch.ZoneName), "isPushed": true, "payload": string(bs), "recipients": members, }) return } switch c.ChatType { case BROADCAST: fallthrough case PUBLIC: err = atomicallyExecute(zch.ChatFlag, func() (err error) { err = newChatForMembers(zch.ZoneMembersId) return }) case PRIVATE: err = atomicallyExecute(zch.ChatFlag, func() (err error) { err = newChatForMembers(c.Members) return }) } return } func (zch *ZoneChatsHandler[T]) DeleteChat(chatId string) (err error) { if err = atomicallyExecute(zch.ChatFlag, func() (err error) { defer delete(zch.Chats, chatId) if _, ok := zch.Chats[chatId]; !ok { err = fmt.Errorf("no corresponding chat") return } removeKnownChats := func(members []string) (err error) { for _, member := range members { zch.sendZoneRequest(REMOVE_KNOWN_CHAT, "node", map[string]interface{}{ "userId": member, "chatId": chatId, }) done, e := zch.sendDataChannelMessage(REMOVED_FROM_CHAT, "node", member, map[string]interface{}{ "chatId": chatId, "userId": member, }) select { case <-done: case <-e: } } bs, err := json.Marshal(map[string]any{ "zoneId": zch.ZoneId, "chatId": chatId, }) if err != nil { return err } zch.sendZoneRequest(CREATE_NOTIFICATION, "node", map[string]any{ "type": "removed_from_chat", "title": "Removed from zone text channel ⛔️", "body": fmt.Sprintf("You have no longer access to the channel %s in zone %s", chatId, zch.ZoneName), "isPushed": true, "payload": string(bs), "recipients": members, }) return } switch zch.Chats[chatId].ChatType { case BROADCAST: fallthrough case PUBLIC: err = removeKnownChats(zch.ZoneMembersId) case PRIVATE: err = removeKnownChats(zch.Chats[chatId].Members) } return }); err != nil { return } if err = os.RemoveAll(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatId)); err != nil { return } return } func (zch *ZoneChatsHandler[T]) EditChatName(chatId string, newChatId string) (err error) { if _, ok := zch.Chats[chatId]; !ok { err = fmt.Errorf("no coresponding chat") return } bs, err := os.ReadFile(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatId, "chatConfig.json")) if err != nil { return } var chatConfig ChatConfig if err = json.Unmarshal(bs, &chatConfig); err != nil { return } chatConfig.ChatId = newChatId bs, err = json.Marshal(&chatConfig) if err = os.Rename(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatId), filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", newChatId)); err != nil { return } f, err := os.OpenFile(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", newChatId, "chatConfig.json"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) defer func() { _ = f.Close() }() if err != nil { return err } if _, err = f.Write(bs); err != nil { return err } chat := &Chat{ Owner: chatConfig.Owner, ChatId: chatConfig.ChatId, ChatType: chatConfig.ChatType, Members: chatConfig.Members, DB: zch.Chats[chatId].DB, Tracking: zch.Chats[chatId].Tracking, } chat.DB.updateDbCallbackFolder(newChatId) chat.Tracking.updateDBCallbackFolder(newChatId) _ = atomicallyExecute(zch.ChatFlag, func() (err error) { defer delete(zch.Chats, chatId) zch.Chats[newChatId] = chat updateKnownChats := func(members []string) (err error) { for _, member := range members { zch.sendZoneRequest(ADD_KNOWN_CHAT, "node", map[string]interface{}{ "userId": member, "chatId": newChatId, }) zch.sendZoneRequest(REMOVE_KNOWN_CHAT, "node", map[string]interface{}{ "userId": member, "chatId": chatId, }) done, e := zch.sendDataChannelMessage(CHAT_NAME_EDITED, "node", member, map[string]interface{}{ "formerChatId": chatId, "newChatId": newChatId, }) select { case <-done: case channelErr := <-e: logger.Println(channelErr) } } return } switch chat.ChatType { case BROADCAST: fallthrough case PUBLIC: err = updateKnownChats(zch.ZoneMembersId) case PRIVATE: err = updateKnownChats(chat.Members) } return }) return } func (zch *ZoneChatsHandler[T]) EditChatType(chatId string, chatType string) (err error) { if _, ok := zch.Chats[chatId]; !ok { err = fmt.Errorf("no coresponding chat") return } bs, err := os.ReadFile(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatId, "chatConfig.json")) if err != nil { return } var chatConfig ChatConfig if err = json.Unmarshal(bs, &chatConfig); err != nil { return } chatConfig.ChatType = chatType if chatType == PUBLIC { chatConfig.Members = zch.ZoneMembersId } bs, err = json.Marshal(&chatConfig) f, err := os.OpenFile(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatId, "chatConfig.json"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) defer func() { _ = f.Close() }() if err != nil { return err } if _, err = f.Write(bs); err != nil { return err } chat := &Chat{ Owner: chatConfig.Owner, ChatId: chatConfig.ChatId, ChatType: chatType, Members: chatConfig.Members, DB: zch.Chats[chatId].DB, Tracking: zch.Chats[chatId].Tracking, } switch chatType { case BROADCAST: fallthrough case PUBLIC: for _, member := range zch.ZoneMembersId { if pubErr := zch.AddChatMembers(chatId, []any{member}); pubErr != nil { logger.Println(pubErr) } zch.sendDataChannelMessage(CHAT_TYPE_EDITED, "node", member, map[string]interface{}{ "chatId": chatId, "chatType": chatType, }) } case PRIVATE: } _ = atomicallyExecute(zch.ChatFlag, func() (err error) { zch.Chats[chatId] = chat return }) return } func (zch *ZoneChatsHandler[T]) AddChatMembers(chatId string, chatMembers []any) (err error) { if _, ok := zch.Chats[chatId]; !ok { err = fmt.Errorf("no coresponding chat") return } chatMembersStr, err := ToStringSlice(chatMembers) if err != nil { return } bs, err := os.ReadFile(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatId, "chatConfig.json")) if err != nil { return } var chatConfig ChatConfig if err = json.Unmarshal(bs, &chatConfig); err != nil { return } logger.Printf("%s - %s - %s -%v\n", chatConfig.ChatId, chatConfig.ChatType, chatConfig.Owner, chatMembers) addedMembers := make([]string, 0) memberLoop: for _, chatMember := range chatMembersStr { for _, member := range chatConfig.Members { if member == chatMember { zch.sendZoneRequest(ADD_KNOWN_CHAT, "node", map[string]interface{}{ "userId": chatMember, "chatId": chatId, }) continue memberLoop } } chatConfig.Members = append(chatConfig.Members, chatMember) addedMembers = append(addedMembers, chatMember) zch.sendZoneRequest(ADD_KNOWN_CHAT, "node", map[string]interface{}{ "userId": chatMember, "chatId": chatId, }) } bs, err = json.Marshal(&chatConfig) f, err := os.OpenFile(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatId, "chatConfig.json"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) defer func() { _ = f.Close() }() if err != nil { return err } if _, err = f.Write(bs); err != nil { return err } chat := &Chat{ Owner: chatConfig.Owner, ChatId: chatConfig.ChatId, ChatType: chatConfig.ChatType, Members: chatConfig.Members, DB: zch.Chats[chatId].DB, Tracking: zch.Chats[chatId].Tracking, } _ = atomicallyExecute(zch.ChatFlag, func() (err error) { lastIndex := chat.DB.PreviousId zch.Chats[chatId] = chat var chmb []string if chat.ChatType == PRIVATE { chmb = chat.Members } else { chmb = zch.ZoneMembersId } broadcastLoop: for _, member := range chmb { for _, m := range addedMembers { if err = chat.Tracking.SetUserLastIndex(member, lastIndex); err != nil { return } if member == m { if m != chat.Owner { go func() { bs, err = json.Marshal(map[string]any{ "zoneId": zch.ZoneId, "chatId": chatId, "zoneHost": NodeID, }) if err != nil { return } zch.sendZoneRequest(CREATE_NOTIFICATION, "node", map[string]interface{}{ "type": "added_in_chat", "title": "Added in zone text channel 💬", "body": fmt.Sprintf("Added in channel %s in zone %s", chatId, zch.ZoneName), "isPushed": true, "payload": string(bs), "recipients": []string{m}, }) }() } done, e := zch.sendDataChannelMessage(ADDED_IN_CHAT, "node", member, map[string]interface{}{ "chat": chat, }) select { case <-done: case err = <-e: logger.Println(err) } continue broadcastLoop } if _, ok := zch.DataChannels[member]; ok { done, e := zch.sendDataChannelMessage(CHAT_MEMBER_ADDED, "node", member, map[string]interface{}{ "userId": m, "chatId": chat.ChatId, }) select { case <-done: case err = <-e: logger.Println(err) } } } } return }) return } func (zch *ZoneChatsHandler[T]) RemoveChatMember(chatId string, chatMember string) (err error) { if _, ok := zch.Chats[chatId]; !ok { err = fmt.Errorf("no coresponding chat") return } bs, err := os.ReadFile(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatId, "chatConfig.json")) if err != nil { return } var chatConfig ChatConfig if err = json.Unmarshal(bs, &chatConfig); err != nil { logger.Println(string(bs)) logger.Println("json error right here") return } if chatMember == chatConfig.Owner { err = fmt.Errorf("you cannot remove the owner from the chat") return } var index int var contain bool for i, member := range chatConfig.Members { if member == chatMember { index = i contain = true break } } if !contain { err = fmt.Errorf("member %s not in the channel %s", chatMember, chatId) return } chatConfig.Members = append(chatConfig.Members[:index], chatConfig.Members[index+1:]...) bs, err = json.Marshal(&chatConfig) if err != nil { logger.Println("json error there") return } f, err := os.OpenFile(filepath.Join(dataPath, "data", "zones", zch.ZoneId, "chats", chatId, "chatConfig.json"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) defer func() { _ = f.Close() }() if err != nil { return } logger.Println(string(bs)) if _, err = f.Write(bs); err != nil { return } var chat *Chat _ = atomicallyExecute(zch.ChatFlag, func() (err error) { chat = &Chat{ Owner: chatConfig.Owner, ChatId: chatConfig.ChatId, ChatType: chatConfig.ChatType, Members: chatConfig.Members, DB: zch.Chats[chatId].DB, Tracking: zch.Chats[chatId].Tracking, } zch.Chats[chatId] = chat return }) if err = chat.Tracking.DeleteUserTracking(chatMember); err != nil { return } zch.sendZoneRequest(REMOVE_KNOWN_CHAT, "node", map[string]interface{}{ "userId": chatMember, "chatId": chatId, }) if chatMember != chat.Owner { bs, err = json.Marshal(map[string]any{ "zoneId": zch.ZoneId, "chatId": chatId, "zoneHost": NodeID, }) if err != nil { return } zch.sendZoneRequest(CREATE_NOTIFICATION, "node", map[string]interface{}{ "type": "removed_from_chat", "title": "Removed from zone text channel ⛔️", "body": fmt.Sprintf("You have no longer access to the channel %s in zone %s", chatId, zch.ZoneName), "isPushed": true, "payload": string(bs), "recipients": []string{chatMember}, }) } done, e := zch.sendDataChannelMessage(REMOVED_FROM_CHAT, "node", chatMember, map[string]interface{}{ "chatId": chatId, "userId": chatMember, }) select { case <-done: case err = <-e: logger.Println(err) } var chmb []string if chat.ChatType == PRIVATE { chmb = chat.Members } else { chmb = zch.ZoneMembersId } broadcastLoop: for _, member := range chmb { if member == chatMember { continue broadcastLoop } done, e := zch.sendDataChannelMessage(CHAT_MEMBER_REMOVED, "node", member, map[string]interface{}{ "userId": chatMember, "chatId": chat.ChatId, }) select { case <-done: case err = <-e: logger.Println(err) } } return } func (zch *ZoneChatsHandler[T]) ReadLastMessage(userId, chatId string) (err error) { err = atomicallyExecute(zch.ChatFlag, func() (err error) { if chat, ok := zch.Chats[chatId]; ok { err = chat.Tracking.SetUserLastIndex(userId, chat.DB.PreviousId) } return }) return } func (zch *ZoneChatsHandler[T]) ListLatestChatMessages(userId, chatId string, lastIndex, limit float64) (err error) { var list []*ChatMessage var i int var done bool if err = atomicallyExecute(zch.ChatFlag, func() (err error) { if chat, ok := zch.Chats[chatId]; ok { list, i, done, err = zch.Chats[chatId].DB.ListChatMessages(int(lastIndex), int(limit)) if err != nil { return } err = chat.Tracking.SetUserLastIndex(userId, chat.DB.PreviousId) } return }); err != nil { return } success, e := zch.sendDataChannelMessage(CHAT_MESSAGE_LIST, "node", userId, map[string]interface{}{ "done": done || i <= 0, "lastIndex": i - 1, "chatId": chatId, "chatMessages": list, }) select { case <-success: fmt.Println("done getting latest messages") case err = <-e: } return } func (zch *ZoneChatsHandler[T]) ListLatestChatFiles(userId, chatId string, lastIndex, limit float64) (err error) { var list []*ChatFile var i int if err = atomicallyExecute(zch.ChatFlag, func() (err error) { if _, ok := zch.Chats[chatId]; ok { list, i, err = zch.Chats[chatId].DB.ListChatFiles(int(lastIndex), int(limit)) } return }); err != nil { return } done, e := zch.sendDataChannelMessage(CHAT_FILES_LIST, "node", userId, map[string]interface{}{ "done": i <= 0, "lastIndex": i, "chatId": chatId, "chatMessages": list, }) select { case <-done: case err = <-e: } return } func (zch *ZoneChatsHandler[T]) AddChatMessage(userId, chatId, content string, isResponse bool, chatResponseId uint64, file *ChatFile) (err error) { chatMessage := &ChatMessage{ Content: content, From: userId, IsResponse: isResponse, ResponseOf: nil, File: file, Tags: make([]string, 0), Date: time.Now().Format("Mon, 02 Jan 2006 15:04:05 MST"), } var chatType string var chatMembers []string if err = atomicallyExecute(zch.ChatFlag, func() (err error) { if chat, ok := zch.Chats[chatId]; ok { chatType = chat.ChatType chatMembers = chat.Members if isResponse { parentMessage, getErr := chat.DB.GetChatMessage(chatResponseId) if err != nil { if getErr != badger.ErrKeyNotFound { return getErr } } chatMessage.ResponseOf = parentMessage } if err = zch.Chats[chatId].DB.AddNewChatMessage(chatMessage); err != nil { return } chatMessage.ID = zch.Chats[chatId].DB.PreviousId } else { err = fmt.Errorf("no corresponding chats") } return }); err != nil { return } notifyActivity := func(done <-chan struct{}, e <-chan error, member string) { select { case <-done: case <-e: _ = atomicallyExecute(zch.ChatFlag, func() (err error) { if chat, ok := zch.Chats[chatId]; ok { li, err := chat.Tracking.GetUserLastIndex(member) if err != nil { return err } count, err := chat.DB.calculateNewChatCount(uint64(li)) if err != nil { return err } if count == 1 { bs, err := json.Marshal(map[string]any{ "zoneId": zch.ZoneId, "chatId": chatId, "zoneHost": NodeID, }) if err != nil { return err } zch.sendZoneRequest(CREATE_NOTIFICATION, "node", map[string]interface{}{ "type": "new_chat_activity", "title": "Unread messages 👀", "body": fmt.Sprintf("New messages in channel %s of %s", chatId, zch.ZoneName), "isPushed": true, "payload": string(bs), "recipients": []string{member}, }) } } return }) } } switch chatType { case BROADCAST: fallthrough case PUBLIC: for _, v := range zch.ZoneMembersId { done, e := zch.sendDataChannelMessage(NEW_CHAT_MESSAGE, "node", v, map[string]interface{}{ "chatMessage": chatMessage, "chatId": chatId, }) go notifyActivity(done, e, v) } case PRIVATE: for _, v := range chatMembers { done, e := zch.sendDataChannelMessage(NEW_CHAT_MESSAGE, "node", v, map[string]interface{}{ "chatMessage": chatMessage, "chatId": chatId, }) go notifyActivity(done, e, v) } } return } func (zch *ZoneChatsHandler[T]) SetAllPublicChatForUser(userId string) (err error) { for _, chat := range zch.Chats { if chat.ChatType == PUBLIC || chat.ChatType == BROADCAST { if err = zch.AddChatMembers(chat.ChatId, []interface{}{userId}); err != nil { logger.Println(err) } continue } } return } func (zch *ZoneChatsHandler[T]) RemoveUserFromAllChats(userId string) (err error) { chats, err := zch.ListChats() if err != nil { logger.Println("an error occured -----------------*") return } for _, chat := range chats { if err = zch.RemoveChatMember(chat.ChatId, userId); err != nil { logger.Println("-------------------*", err) continue } } return } func (zch *ZoneChatsHandler[T]) RemoveAllUserChat(userId string) (err error) { chats, err := zch.ListChats() if err != nil { return } for _, chat := range chats { logger.Println("$$$$$$$$$$ chat owner :", chat.Owner, userId) if chat.Owner == userId { if derr := zch.DeleteChat(chat.ChatId); derr != nil { logger.Println("**************", derr) } logger.Println("deleted chat", chat.ChatId) } } return } // func (zch *ZoneChatsHandler[T]) ConnectToChatFSInstance(channelId string, userId string, sdp string) (err error) { // err = atomicallyExecute(zch.ChatFSInstanceFlag, func() (err error) { // d, e := zch.ChatFSInstance.HandleOffer(context.Background(), channelId, userId, sdp, zch.HostId, zch.sendDataChannelMessage, zch.signalCandidate) // select { // case <-d: // case <-e: // } // return // }) // return // } // func (zch *ZoneChatsHandler[T]) LeaveChatFSInstance( // userId string) (err error) { // zch.ChatFSInstance.HandleLeavingMember(userId) // return // } func (zch *ZoneChatsHandler[T]) DeleteChatMessage(key uint64, chatId string) (err error) { err = atomicallyExecute(zch.ChatFlag, func() (err error) { if _, ok := zch.Chats[chatId]; !ok { err = fmt.Errorf("no file corresponding to id %s", chatId) return } if err = zch.Chats[chatId].DB.DeleteChatMessage(key); err != nil { return } if zch.Chats[chatId].ChatType == PRIVATE { for _, member := range zch.Chats[chatId].Members { d, e := zch.sendDataChannelMessage(CHAT_MESSAGE_DELETED, "node", member, map[string]interface{}{ "chatId": chatId, "messageId": key, }) select { case <-d: case tempErr := <-e: logger.Println(tempErr) } } } else { for _, member := range zch.ZoneMembersId { d, e := zch.sendDataChannelMessage(CHAT_MESSAGE_DELETED, "node", member, map[string]interface{}{ "chatId": chatId, "messageId": key, }) select { case <-d: case tempErr := <-e: logger.Println(tempErr) } } } previousId := zch.Chats[chatId].DB.PreviousId if previousId == key { if err = zch.Chats[chatId].DB.revertPreviousId(); err != nil { return } previousId = zch.Chats[chatId].DB.PreviousId if previousId == 1 { previousId = 0 } if err = zch.Chats[chatId].Tracking.RevertTrackingLastIndex(previousId); err != nil { return } } return }) return } func (zch *ZoneChatsHandler[T]) UpdateChatMessage(key uint64, chatId, newContent string) (err error) { err = atomicallyExecute(zch.ChatFlag, func() (err error) { if _, ok := zch.Chats[chatId]; !ok { err = fmt.Errorf("no chat corresponding to id %s", chatId) return } if err = zch.Chats[chatId].DB.ModifyChatMessage(key, newContent); err != nil { return } if zch.Chats[chatId].ChatType == PRIVATE { for _, member := range zch.Chats[chatId].Members { d, e := zch.sendDataChannelMessage(CHAT_MESSAGE_EDITED, "node", member, map[string]interface{}{ "chatId": chatId, "messageId": key, "newContent": newContent, }) select { case <-d: case tempErr := <-e: logger.Println(tempErr) } } } else { for _, member := range zch.ZoneMembersId { d, e := zch.sendDataChannelMessage(CHAT_MESSAGE_EDITED, "node", member, map[string]interface{}{ "chatId": chatId, "messageId": key, "newContent": newContent, }) select { case <-d: case tempErr := <-e: logger.Println(tempErr) } } } return }) return } func (zch *ZoneChatsHandler[T]) DeleteChatFile(key uint64, fileName, chatId string) (err error) { err = atomicallyExecute(zch.ChatFlag, func() (err error) { if _, ok := zch.Chats[chatId]; !ok { err = fmt.Errorf("no file corresponding to id %s", chatId) return } if err = zch.Chats[chatId].DB.DeleteChatFile(fileName, key); err != nil { return } if zch.Chats[chatId].ChatType == PRIVATE { for _, member := range zch.Chats[chatId].Members { d, e := zch.sendDataChannelMessage(CHAT_FILE_DELETED, "node", member, map[string]interface{}{ "chatId": chatId, "fileId": key, "fileName": fileName, }) select { case <-d: case tempErr := <-e: logger.Println(tempErr) } } } else { for _, member := range zch.ZoneMembersId { d, e := zch.sendDataChannelMessage(CHAT_FILE_DELETED, "node", member, map[string]interface{}{ "chatId": chatId, "fileId": key, "fileName": fileName, }) select { case <-d: case tempErr := <-e: logger.Println(tempErr) } } } return }) return } func (zch *ZoneChatsHandler[T]) chatFileUpload(chatId string, filename string, userId string, dc *DataChannel) { var writePipe chan<- []byte var done bool uploadDone := func() { if writePipe != nil { close(writePipe) writePipe = nil } done = true } dc.DataChannel.OnError(func(err error) { logger.Println(err) logger.Println("abort...") if !done { uploadDone() } if fufErr := zch.ChatFSInstance.FileUploadFailed(chatId, filename, userId); fufErr != nil { logger.Println(fufErr) } }) dc.DataChannel.OnClose(func() { if done { logger.Println("closing gracefully...") } else { logger.Println("abort...") uploadDone() if fufErr := zch.ChatFSInstance.FileUploadFailed(chatId, filename, userId); fufErr != nil { logger.Println(fufErr) } } }) dc.DataChannel.OnMessage(func(msg webrtc.DataChannelMessage) { if msg.IsString { if string(msg.Data) == "init_upload" { logger.Println("init upload....") var initErr error if writePipe, initErr = zch.ChatFSInstance.SetupFileUpload(chatId, filename, userId, dc.DataChannel); initErr != nil { _ = dc.DataChannel.SendText("abort") _ = dc.DataChannel.Close() return } logger.Println("upload ready !") _ = dc.DataChannel.SendText("upload_ready") } else if string(msg.Data) == "upload_done" { uploadDone() } } else { writePipe <- msg.Data } }) dc.DataChannel.OnOpen(func() { _ = dc.DataChannel.SendText("channel_ready") }) } func (zch *ZoneChatsHandler[T]) chatFileDownload(chatId string, filename string, userId string, dc *DataChannel) { var done bool dc.DataChannel.OnError(func(err error) { if !done { logger.Println("abort...") if fdf := zch.ChatFSInstance.FileDownloadFailed(chatId, filename, userId); fdf != nil { logger.Println(fdf) } } }) dc.DataChannel.OnClose(func() { if !done { logger.Println("abort...") if fdf := zch.ChatFSInstance.FileDownloadFailed(chatId, filename, userId); fdf != nil { logger.Println(fdf) } } }) dc.DataChannel.OnMessage(func(msg webrtc.DataChannelMessage) { if msg.IsString { if string(msg.Data) == "init_download" { logger.Println("init download....") var initErr error if initErr = zch.ChatFSInstance.SetupFileDownload(chatId, filename, userId, dc.DataChannel); initErr != nil { _ = dc.DataChannel.SendText("abort") _ = dc.DataChannel.Close() return } logger.Println("download started !") } else if string(msg.Data) == "download_done" { done = true } } }) dc.DataChannel.OnOpen(func() { _ = dc.DataChannel.SendText("channel_ready") }) } func (zch *ZoneChatsHandler[T]) handleDataChannel(ctx context.Context, dc *DataChannel) (catched bool) { var label string _ = atomicallyExecute(dc.l, func() (err error) { label = dc.DataChannel.Label() return }) if strings.Contains(label, "chat_upload") { command := strings.Split(label, "|") if len(command) == 4 { catched = true go zch.chatFileUpload(command[1], command[2], command[3], dc) } logger.Println(command) } else if strings.Contains(label, "chat_download") { command := strings.Split(label, "|") catched = true go zch.chatFileDownload(command[1], command[2], command[3], dc) logger.Println(command) } else if strings.Contains(label, "chat_data") { command := strings.Split(label, "|") catched = true _ = atomicallyExecute(zch.ChatDataChannelsFlag, func() (err error) { zch.ChatDataChannels[command[1]] = dc return }) dc.DataChannel.OnClose(func() { fmt.Println("closing gratefully chat dc...") _ = atomicallyExecute(zch.ChatDataChannelsFlag, func() (err error) { delete(zch.ChatDataChannels, command[1]) return }) }) dc.DataChannel.OnError(func(err error) { fmt.Println("error in chat dc...") _ = atomicallyExecute(zch.ChatDataChannelsFlag, func() (err error) { delete(zch.ChatDataChannels, command[1]) return }) }) } return } func (zch *ZoneChatsHandler[T]) handleZoneRequest(ctx context.Context, req *ZoneRequest) (err error) { logger.Println("got request in zone chat handler", req) switch req.ReqType { case LEAVE_ZONE: if err = verifyFieldsString(req.Payload, "userId"); err != nil { return } // err = zch.LeaveChatFSInstance(req.Payload["userId"].(string)) case string(REMOVED_ZONE_AUTHORIZED_MEMBER): if err = verifyFieldsString(req.Payload, "userId"); err != nil { return } var index int var found bool for i, m := range zch.ZoneMembersId { if m == req.Payload["userId"].(string) { index = i found = true break } } if !found { err = fmt.Errorf("no such user in zone") return } zch.ZoneMembersId = append(zch.ZoneMembersId[:index], zch.ZoneMembersId[index+1:]...) if err = zch.RemoveAllUserChat(req.Payload["userId"].(string)); err != nil { logger.Println("****______________", err) } err = zch.RemoveUserFromAllChats(req.Payload["userId"].(string)) case string(NEW_AUTHORIZED_ZONE_MEMBER): if err = verifyFieldsString(req.Payload, "userId"); err != nil { return } var contain bool for _, m := range zch.ZoneMembersId { if m == req.Payload["userId"].(string) { contain = true break } } if !contain { zch.ZoneMembersId = append(zch.ZoneMembersId, req.Payload["userId"].(string)) } err = zch.SetAllPublicChatForUser(req.Payload["userId"].(string)) case ADD_CHAT_MEMBERS: if err = verifyFieldsString(req.Payload, "chatId"); err != nil { return } if err = verifyFieldsSliceInterface(req.Payload, "members"); err != nil { return } err = zch.AddChatMembers(req.Payload["chatId"].(string), req.Payload["members"].([]interface{})) case REMOVE_CHAT_MEMBER: if err = verifyFieldsString(req.Payload, "chatId", "userId"); err != nil { return } err = zch.RemoveChatMember(req.Payload["chatId"].(string), req.Payload["userId"].(string)) if err != nil { logger.Println("an error occured in add known chat", err) return } case EDIT_CHAT_NAME: if err = verifyFieldsString(req.Payload, "chatId", "newChatId"); err != nil { return } if err = zch.EditChatName(req.Payload["chatId"].(string), req.Payload["newChatId"].(string)); err != nil { return } case EDIT_CHAT_TYPE: if err = verifyFieldsString(req.Payload, "chatId", "chatType"); err != nil { return } if err = zch.EditChatType(req.Payload["chatId"].(string), req.Payload["chatType"].(string)); err != nil { return } case DELETE_CHAT: if err = verifyFieldsString(req.Payload, "chatId"); err != nil { return } err = zch.DeleteChat(req.Payload["chatId"].(string)) case CREATE_CHAT: if err = verifyFieldsString(req.Payload, "chatId", "owner", "chatType"); err != nil { return } if err = verifyFieldsSliceInterface(req.Payload, "members"); err != nil { return } err = zch.AddNewChat(req.Payload["chatId"].(string), req.Payload["owner"].(string), req.Payload["chatType"].(string), req.Payload["members"].([]interface{})) case GET_CHATS: if err = verifyFieldsSliceInterface(req.Payload, "chatsId"); err != nil { return } err = zch.GetChats(req.From, req.Payload["chatsId"].([]interface{})...) case LIST_LATEST_CHATS: if err = verifyFieldsString(req.Payload, "chatId"); err != nil { return } if err = verifyFieldsFloat64(req.Payload, "lastIndex", "limit"); err != nil { return } err = zch.ListLatestChatMessages(req.From, req.Payload["chatId"].(string), req.Payload["lastIndex"].(float64), req.Payload["limit"].(float64)) return case LIST_LATEST_FILES: if err = verifyFieldsString(req.Payload, "chatId"); err != nil { return } if err = verifyFieldsFloat64(req.Payload, "lastIndex", "limit"); err != nil { return } err = zch.ListLatestChatFiles(req.From, req.Payload["chatId"].(string), req.Payload["lastIndex"].(float64), req.Payload["limit"].(float64)) return case READ_LATEST_MESSAGE: if err = verifyFieldsString(req.Payload, "chatId"); err != nil { return } err = zch.ReadLastMessage(req.From, req.Payload["chatId"].(string)) case ADD_CHAT_MESSAGE: logger.Println("got request in zone chat handler", req) if err = verifyFieldsString(req.Payload, "chatId", "content"); err != nil { return } if err = verifyFieldsBool(req.Payload, "isResponse"); err != nil { return } var parentChatId uint64 if req.Payload["isResponse"].(bool) { if err = verifyFieldsFloat64(req.Payload, "parentChatId"); err != nil { return } parentChatId = uint64(req.Payload["parentChatId"].(float64)) } var file *ChatFile = nil if _, ok := req.Payload["file"]; ok { bs, jsonErr := json.Marshal(req.Payload["file"]) if jsonErr != nil { return jsonErr } var f ChatFile if err = json.Unmarshal(bs, &f); err != nil { err = fmt.Errorf("the file payload dont match ChatFile struct pattern : %v", err) return } file = &f } err = zch.AddChatMessage(req.From, req.Payload["chatId"].(string), req.Payload["content"].(string), req.Payload["isResponse"].(bool), parentChatId, file) case DELETE_CHAT_MESSAGE: if err = verifyFieldsString(req.Payload, "chatId"); err != nil { return } if err = verifyFieldsFloat64(req.Payload, "messageId"); err != nil { return } err = zch.DeleteChatMessage(uint64(req.Payload["messageId"].(float64)), req.Payload["chatId"].(string)) case EDIT_CHAT_MESSAGE: if err = verifyFieldsString(req.Payload, "chatId", "newContent"); err != nil { return } if err = verifyFieldsFloat64(req.Payload, "messageId"); err != nil { return } err = zch.UpdateChatMessage(uint64(req.Payload["messageId"].(float64)), req.Payload["chatId"].(string), req.Payload["newContent"].(string)) case DELETE_CHAT_FILE: if err = verifyFieldsString(req.Payload, "chatId", "fileName"); err != nil { return } if err = verifyFieldsFloat64(req.Payload, "fileId"); err != nil { return } err = zch.DeleteChatFile(uint64(req.Payload["fileId"].(float64)), req.Payload["fileName"].(string), req.Payload["chatId"].(string)) // case ZONE_UPLOAD_CHAT_FILE: // if err = verifyFieldsString(req.Payload, "chatId", "userId", "fileName"); err != nil { // return // } // err = zch.ChatFSInstance.SetupFileUpload(req.Payload["chatId"].(string), req.Payload["fileName"].(string), req.Payload["userId"].(string)) // case ZONE_DOWNLOAD_CHAT_FILE: // if err = verifyFieldsString(req.Payload, "chatId", "userId", "fileName"); err != nil { // return // } // err = zch.ChatFSInstance.SetupFileDownload(req.Payload["chatId"].(string), req.Payload["fileName"].(string), req.Payload["userId"].(string)) // case ZONE_UPLOAD_CHAT_FILE_DONE: // if err = verifyFieldsBool(req.Payload, "failed"); err != nil { // return // } // if err = verifyFieldsString(req.Payload, "path", "userId", "fileName", "type"); err != nil { // return // } // if err = verifyFieldsFloat64(req.Payload, "size"); err != nil { // return // } // if _, ok := req.Payload["chatFile"]; !ok { // err = fmt.Errorf("no field chatFile in request payload") // return // } // if req.Payload["failed"].(bool) { // logger.Println(zch.ChatFSInstance.FileUploadFailed(req.Payload["path"].(string), req.Payload["fileName"].(string), req.Payload["userId"].(string))) // } else { // defer func() { // logger.Println(zch.ChatFSInstance.FileUploadDone(req.Payload["path"].(string), req.Payload["fileName"].(string), req.Payload["userId"].(string))) // }() // } // case string(ZONE_CHAT_WEBRTC_OFFER): // if err = verifyFieldsString(req.Payload, "channelId", "userId", "sdp"); err != nil { // return // } // err = zch.ConnectToChatFSInstance(req.Payload["channelId"].(string), req.Payload["userId"].(string), req.Payload["sdp"].(string)) // case string(ZONE_CHAT_WEBRTC_COUNTER_OFFER): // logger.Println("handling fs instance counter offer") // if err = verifyFieldsString(req.Payload, "channelId", "userId"); err != nil { // return // } // err = atomicallyExecute(zch.ChatFSInstanceFlag, func() (err error) { // err = zch.ChatFSInstance.HandleCounterOffer(context.Background(), req.Payload["userId"].(string), zch.sendDataChannelMessage) // return // }) // case string(ZONE_CHAT_WEBRTC_RENNEGOTIATION_OFFER): // if err = verifyFieldsString(req.Payload, "channelId", "userId", "sdp"); err != nil { // return // } // err = atomicallyExecute(zch.ChatFSInstanceFlag, func() (err error) { // err = zch.ChatFSInstance.HandleRennegotiationOffer(req.Payload["userId"].(string), req.Payload["sdp"].(string), zch.sendDataChannelMessage) // return // }) // case string(ZONE_CHAT_WEBRTC_RENNEGOTIATION_ANSWER): // if err = verifyFieldsString(req.Payload, "channelId", "userId", "sdp"); err != nil { // return // } // err = atomicallyExecute(zch.ChatFSInstanceFlag, func() (err error) { // err = zch.ChatFSInstance.HandleRennegotiationAnswer(req.Payload["userId"].(string), req.Payload["sdp"].(string)) // return // }) // case string(ZONE_CHAT_WEBRTC_CANDIDATE): // logger.Println("handling fs instance webrtc candidate") // logger.Println(req.Payload) // if err = verifyFieldsString(req.Payload, FROM, "candidate", "sdpMLineIndex", "sdpMid", "channelId", "userId"); err != nil { // return // } // logger.Println(req.Payload) // i, convErr := strconv.Atoi(req.Payload["sdpMLineIndex"].(string)) // if convErr != nil { // return convErr // } // SDPMLineIndex := uint16(i) // sdpMid := req.Payload["sdpMid"].(string) // logger.Println(sdpMid, SDPMLineIndex) // err = atomicallyExecute(zch.ChatFSInstanceFlag, func() (err error) { // err = zch.ChatFSInstance.AddCandidate(&webrtc.ICECandidateInit{ // Candidate: req.Payload["candidate"].(string), // SDPMid: &sdpMid, // SDPMLineIndex: &SDPMLineIndex, // }, req.Payload["userId"].(string)) // return // }) } return }