From 5dcb013c1286c909064644902aff686e3d4091a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFs=20Bibehe?= Date: Sat, 10 Sep 2022 02:56:52 +0200 Subject: [PATCH] add notifications and chat tracking --- config.go | 19 +- webrtcCallManager.go | 8 +- zoneAudioChannelshandler.go | 2 +- zoneChatTracking.go | 80 ++++--- zoneChatsDBHandler.go | 43 ++-- zoneChatsHandler.go | 386 ++++++++++++++++++++-------------- zoneFSInstance.go | 2 +- zoneGrpcMiddleware.go | 4 +- zoneManager.go | 48 +++-- zoneNotificationsDBHandler.go | 41 ++++ zoneNotificationsHandler.go | 173 ++++++++++++++- zoneRequestScheduler.go | 2 +- zoneVideoChannelsHandler.go | 2 +- 13 files changed, 551 insertions(+), 259 deletions(-) create mode 100644 zoneNotificationsDBHandler.go diff --git a/config.go b/config.go index 79cba1c..bac365e 100644 --- a/config.go +++ b/config.go @@ -17,6 +17,7 @@ import ( ) var NodeToken string +var NodeID string type LocalServerConfig struct { configFilePath string @@ -78,11 +79,11 @@ func (l *LocalServerConfig) startup() (err error) { l.PrivateKeyPath = "" l.Token = "" bss, marshallErr := json.Marshal(map[string]interface{}{ - "type": "create_node", - "from": id, - "to": "serv", - "token": "", - "peerType":"node", + "type": "create_node", + "from": id, + "to": "serv", + "token": "", + "peerType": "node", "payload": map[string]string{ "nodeId": id, "nodeKey": string(key), @@ -114,6 +115,7 @@ func (l *LocalServerConfig) startup() (err error) { return } logger.Println(config) + NodeID = l.NodeId l.NodeId = config["nodeId"] l.PrivateKeyPath = config["privKeyPath"] l.Token = config["token"] @@ -136,9 +138,9 @@ func (l *LocalServerConfig) authenticate() (err error) { return } body, err := json.Marshal(map[string]interface{}{ - "type": "login_key_node", - "from": l.NodeId, - "peerType":"node", + "type": "login_key_node", + "from": l.NodeId, + "peerType": "node", "payload": map[string]string{ "id": l.NodeId, "signature": sig, @@ -160,6 +162,7 @@ func (l *LocalServerConfig) authenticate() (err error) { return } err = l.handleLoginResponse(payload) + NodeID = l.NodeId return } diff --git a/webrtcCallManager.go b/webrtcCallManager.go index e43b8e1..a39211f 100644 --- a/webrtcCallManager.go +++ b/webrtcCallManager.go @@ -135,10 +135,10 @@ func loadHostedSquads(token string, hostId string) (squads []*Squad, err error) em := NewEncryptionManager() sig := em.SignRequestHMAC(hostId) body, err := json.Marshal(map[string]interface{}{ - "type": LIST_HOSTED_SQUADS_BY_HOST, - "mac": sig, - "from": hostId, - "peerType":"node", + "type": LIST_HOSTED_SQUADS_BY_HOST, + "mac": sig, + "from": hostId, + "peerType": "node", "payload": map[string]string{ "host": hostId, "lastIndex": "0", diff --git a/zoneAudioChannelshandler.go b/zoneAudioChannelshandler.go index af7e38d..5f77e70 100644 --- a/zoneAudioChannelshandler.go +++ b/zoneAudioChannelshandler.go @@ -284,7 +284,7 @@ func (zach *ZoneAudioChannelsHandler) AddNewAudioChannel(channelName string, own ID: channelName, Owner: owner, ChannelType: channelType, - Members: append(m,owner), + Members: append(m, owner), } bs, jsonErr := json.Marshal(baseConfig) if jsonErr != nil { diff --git a/zoneChatTracking.go b/zoneChatTracking.go index 4ae7838..6c7f073 100644 --- a/zoneChatTracking.go +++ b/zoneChatTracking.go @@ -10,31 +10,32 @@ import ( ) type ZoneChatTrackingDB struct { - db func(cb func(*badger.DB) (err error)) (err error) - lock *sync.RWMutex + ZoneID string + db func(cb func(*badger.DB) (err error)) (err error) + lock *sync.RWMutex } -func NewZoneChatTracking(zoneId,chatId string,members ...string) (*ZoneChatTrackingDB,error) { - if _,dirErr := os.ReadDir(filepath.Join("data", "zones", zoneId, "chats", chatId,"__tracking__")); os.IsNotExist(dirErr) { - _ = os.MkdirAll(filepath.Join("data", "zones", zoneId, "chats", chatId,"__tracking__"),0700) +func NewZoneChatTracking(zoneId, chatId string, members ...string) (*ZoneChatTrackingDB, error) { + if _, dirErr := os.ReadDir(filepath.Join("data", "zones", zoneId, "chats", chatId, "__tracking__")); os.IsNotExist(dirErr) { + _ = os.MkdirAll(filepath.Join("data", "zones", zoneId, "chats", chatId, "__tracking__"), 0700) } db := func(f func(*badger.DB) (err error)) (err error) { - db, err := badger.Open(badger.DefaultOptions(filepath.Join("data", "zones", zoneId, "chats", chatId,"__tracking__")).WithLogger(dbLogger)) - if err != nil { - return - } - defer db.Close() - err = f(db) + db, err := badger.Open(badger.DefaultOptions(filepath.Join("data", "zones", zoneId, "chats", chatId, "__tracking__")).WithLogger(dbLogger)) + if err != nil { return } - lock := new(sync.RWMutex) + defer db.Close() + err = f(db) + return + } + lock := new(sync.RWMutex) if err := db(func(d *badger.DB) (err error) { err = d.Update(func(txn *badger.Txn) error { - b := make([]byte,bufferSize) - binary.BigEndian.PutUint64(b,0) + b := make([]byte, bufferSize) + binary.BigEndian.PutUint64(b, 0) for _, member := range members { - if _,rerr := txn.Get([]byte(member)); rerr == badger.ErrKeyNotFound { - txn.Set([]byte(member),b) + if _, rerr := txn.Get([]byte(member)); rerr == badger.ErrKeyNotFound { + _ = txn.Set([]byte(member), b) } else if rerr != nil { return rerr } @@ -45,21 +46,34 @@ func NewZoneChatTracking(zoneId,chatId string,members ...string) (*ZoneChatTrack return } return - });err != nil { - return nil,err + }); err != nil { + return nil, err } - return &ZoneChatTrackingDB{db,lock},nil + return &ZoneChatTrackingDB{zoneId, db, lock}, nil } -func (zctdb *ZoneChatTrackingDB) Initialize(lastIndex uint64,users ...string) (err error) { +func (zctdb *ZoneChatTrackingDB) Initialize(lastIndex uint64, users ...string) (err error) { for _, user := range users { - if err = zctdb.SetUserLastIndex(user,lastIndex);err != nil { + if err = zctdb.SetUserLastIndex(user, lastIndex); err != nil { return } } return } +func (zctdb *ZoneChatTrackingDB) updateDBCallbackFolder(newChatId string) { + db := func(f func(*badger.DB) (err error)) (err error) { + db, err := badger.Open(badger.DefaultOptions(filepath.Join("data", "zones", zctdb.ZoneID, "chats", newChatId, "__tracking__")).WithLogger(dbLogger)) + if err != nil { + return + } + defer db.Close() + err = f(db) + return + } + zctdb.db = db +} + func (zctdb *ZoneChatTrackingDB) RevertTrackingLastIndex(lastIndex uint64) (err error) { zctdb.lock.Lock() defer zctdb.lock.Unlock() @@ -69,7 +83,7 @@ func (zctdb *ZoneChatTrackingDB) RevertTrackingLastIndex(lastIndex uint64) (err opt := badger.DefaultIteratorOptions it := txn.NewIterator(opt) defer it.Close() - for it.Rewind();it.Valid();it.Next() { + for it.Rewind(); it.Valid(); it.Next() { item := it.Item() if err = item.Value(func(val []byte) error { li := binary.BigEndian.Uint64(val) @@ -88,9 +102,9 @@ func (zctdb *ZoneChatTrackingDB) RevertTrackingLastIndex(lastIndex uint64) (err } err = d.Update(func(txn *badger.Txn) error { for _, key := range keys { - b := make([]byte,bufferSize) - binary.BigEndian.PutUint64(b,lastIndex) - if updateErr := txn.Set(key,b); updateErr != nil { + b := make([]byte, bufferSize) + binary.BigEndian.PutUint64(b, lastIndex) + if updateErr := txn.Set(key, b); updateErr != nil { return updateErr } } @@ -101,14 +115,14 @@ func (zctdb *ZoneChatTrackingDB) RevertTrackingLastIndex(lastIndex uint64) (err return } -func (zctdb *ZoneChatTrackingDB) SetUserLastIndex(userId string,lastIndex uint64) (err error) { +func (zctdb *ZoneChatTrackingDB) SetUserLastIndex(userId string, lastIndex uint64) (err error) { zctdb.lock.Lock() defer zctdb.lock.Unlock() err = zctdb.db(func(d *badger.DB) (err error) { err = d.Update(func(txn *badger.Txn) error { - b := make([]byte,bufferSize) - binary.BigEndian.PutUint64(b,lastIndex) - updateErr := txn.Set([]byte(userId),b) + b := make([]byte, bufferSize) + binary.BigEndian.PutUint64(b, lastIndex) + updateErr := txn.Set([]byte(userId), b) return updateErr }) return @@ -116,16 +130,16 @@ func (zctdb *ZoneChatTrackingDB) SetUserLastIndex(userId string,lastIndex uint64 return } -func (zctdb *ZoneChatTrackingDB) GetUserLastIndex(userId string) (index uint,err error) { +func (zctdb *ZoneChatTrackingDB) GetUserLastIndex(userId string) (index uint, err error) { zctdb.lock.Lock() defer zctdb.lock.Unlock() err = zctdb.db(func(d *badger.DB) (err error) { err = d.Update(func(txn *badger.Txn) error { - item,rerr := txn.Get([]byte(userId)) + item, rerr := txn.Get([]byte(userId)) if rerr != nil { return rerr } - item.Value(func(val []byte) error { + _ = item.Value(func(val []byte) error { index = uint(binary.BigEndian.Uint64(val)) return nil }) @@ -147,4 +161,4 @@ func (zctdb *ZoneChatTrackingDB) DeleteUserTracking(userId string) (err error) { return }) return -} \ No newline at end of file +} diff --git a/zoneChatsDBHandler.go b/zoneChatsDBHandler.go index 77127be..ee0aeb4 100644 --- a/zoneChatsDBHandler.go +++ b/zoneChatsDBHandler.go @@ -35,7 +35,7 @@ type ZoneChatDBHandler struct { PreviousId uint64 ItemCount uint64 db func(func(*badger.DB) (err error)) (err error) - lock *sync.RWMutex + lock *sync.RWMutex } const bufferSize = 8 @@ -53,7 +53,7 @@ func NewZoneChatDBHandler(zoneId string, chatID string) (zoneChatDBHandler *Zone }, ChatID: chatID, ZoneID: zoneId, - lock: new(sync.RWMutex), + lock: new(sync.RWMutex), } err = zoneChatDBHandler.db(func(d *badger.DB) (err error) { err = d.View(func(txn *badger.Txn) error { @@ -85,26 +85,23 @@ func NewZoneChatDBHandler(zoneId string, chatID string) (zoneChatDBHandler *Zone return } -func (zcdbh *ZoneChatDBHandler) calculateNewChatCount(previousId uint64) ( count uint,err error) { +func (zcdbh *ZoneChatDBHandler) calculateNewChatCount(previousId uint64) (count uint, err error) { err = zcdbh.db(func(d *badger.DB) (err error) { err = d.View(func(txn *badger.Txn) error { opt := badger.DefaultIteratorOptions it := txn.NewIterator(opt) defer it.Close() count = 0 - b := make([]byte,bufferSize) - binary.BigEndian.PutUint64(b,previousId) - for it.Seek(b); it.Valid(); it.Next() { - item := it.Item() - if err = item.Value(func(val []byte) (err error) { - var chatMessage *ChatMessage - if err = json.Unmarshal(val, &chatMessage); err != nil { - return err - } + b := make([]byte, bufferSize) + binary.BigEndian.PutUint64(b, previousId) + if previousId == 0 || previousId == 1 { + count++ + for it.Rewind(); it.Valid(); it.Next() { + count++ + } + } else { + for it.Seek(b); it.Valid(); it.Next() { count++ - return - }); err != nil { - return err } } if count > 0 { @@ -118,8 +115,6 @@ func (zcdbh *ZoneChatDBHandler) calculateNewChatCount(previousId uint64) ( count } func (zcdbh *ZoneChatDBHandler) revertPreviousId() (err error) { - zcdbh.lock.Lock() - defer zcdbh.lock.Unlock() err = zcdbh.db(func(d *badger.DB) (err error) { err = d.View(func(txn *badger.Txn) error { opt := badger.DefaultIteratorOptions @@ -162,8 +157,6 @@ func (zcdbh *ZoneChatDBHandler) updateDbCallbackFolder(newChatId string) { } func (zcdbh *ZoneChatDBHandler) AddNewChatMessage(chatMessage *ChatMessage) (err error) { - zcdbh.lock.Lock() - defer zcdbh.lock.Unlock() b := make([]byte, bufferSize) zcdbh.PreviousId++ binary.BigEndian.PutUint64(b, zcdbh.PreviousId) @@ -188,8 +181,6 @@ func (zcdbh *ZoneChatDBHandler) AddNewChatMessage(chatMessage *ChatMessage) (err func (zcdbh *ZoneChatDBHandler) DeleteChatMessage(key uint64) (err error) { if err = zcdbh.db(func(d *badger.DB) (err error) { - zcdbh.lock.Lock() - defer zcdbh.lock.Unlock() err = d.Update(func(txn *badger.Txn) (err error) { b := make([]byte, bufferSize) binary.BigEndian.PutUint64(b, key) @@ -207,8 +198,6 @@ func (zcdbh *ZoneChatDBHandler) DeleteChatMessage(key uint64) (err error) { } func (zcdbh *ZoneChatDBHandler) DeleteChatFile(filename string, key uint64) (err error) { - zcdbh.lock.Lock() - defer zcdbh.lock.Unlock() if err = zcdbh.DeleteChatMessage(key); err != nil { return } @@ -217,8 +206,6 @@ func (zcdbh *ZoneChatDBHandler) DeleteChatFile(filename string, key uint64) (err } func (zcdbh *ZoneChatDBHandler) ListChatMessages(lastIndex int, limit int) (chatMessages []*ChatMessage, l int, done bool, err error) { - zcdbh.lock.RLock() - defer zcdbh.lock.RUnlock() err = zcdbh.db(func(d *badger.DB) (err error) { err = d.View(func(txn *badger.Txn) (err error) { opt := badger.DefaultIteratorOptions @@ -264,8 +251,6 @@ func (zcdbh *ZoneChatDBHandler) ListChatMessages(lastIndex int, limit int) (chat } func (zcdbh *ZoneChatDBHandler) ListChatFiles(lastIndex int, limit int) (chatMessages []*ChatFile, l int, err error) { - zcdbh.lock.RLock() - defer zcdbh.lock.RUnlock() err = zcdbh.db(func(d *badger.DB) (err error) { err = d.View(func(txn *badger.Txn) (err error) { opt := badger.DefaultIteratorOptions @@ -326,8 +311,6 @@ func (zcdbh *ZoneChatDBHandler) ListChatFiles(lastIndex int, limit int) (chatMes } func (zcdbh *ZoneChatDBHandler) GetChatMessage(index uint64) (chatMessage *ChatMessage, err error) { - zcdbh.lock.RLock() - defer zcdbh.lock.RUnlock() err = zcdbh.db(func(d *badger.DB) (err error) { err = d.View(func(txn *badger.Txn) (err error) { b := make([]byte, bufferSize) @@ -347,8 +330,6 @@ func (zcdbh *ZoneChatDBHandler) GetChatMessage(index uint64) (chatMessage *ChatM } func (zcdbh *ZoneChatDBHandler) ModifyChatMessage(key uint64, newContent string) (err error) { - zcdbh.lock.Lock() - defer zcdbh.lock.Unlock() b := make([]byte, bufferSize) binary.BigEndian.PutUint64(b, key) chatMessage, err := zcdbh.GetChatMessage(key) diff --git a/zoneChatsHandler.go b/zoneChatsHandler.go index 8e2bfe2..73492a9 100644 --- a/zoneChatsHandler.go +++ b/zoneChatsHandler.go @@ -16,7 +16,7 @@ import ( const ( LIST_LATEST_CHATS = "list_latest_chats" - READ_LATEST_MESSAGE = "read_latest_message" + READ_LATEST_MESSAGE = "read_latest_message" LIST_LATEST_FILES = "list_latest_files" LIST_CHATS = "list_chats" GET_CHATS = "get_chats" @@ -39,7 +39,7 @@ const ( const ( CHAT_NAME_EDITED = "chat_name_edited" CHAT_TYPE_EDITED = "chat_type_edited" - CHAT_MESSAGE_LIST = "chat_messages_list" + CHAT_MESSAGE_LIST = "chat_messages_list" CHAT_FILES_LIST = "chat_files_list" CHAT_MEMBER_ADDED = "chat_member_added" CHAT_MEMBER_REMOVED = "chat_member_removed" @@ -68,17 +68,18 @@ type ChatConfig struct { } type Chat struct { - ChatId string `json:"chatId"` - ChatType string `json:"chatType"` - Owner string `json:"owner"` - Members []string `json:"members"` - LastReadIndex uint `json:"lastReadIndex"` - Unread uint `json:"unread"` - DB *ZoneChatDBHandler `json:"-"` - Tracking *ZoneChatTrackingDB `json:"-"` + ChatId string `json:"chatId"` + ChatType string `json:"chatType"` + Owner string `json:"owner"` + Members []string `json:"members"` + LastReadIndex uint `json:"lastReadIndex"` + Unread uint `json:"unread"` + DB *ZoneChatDBHandler `json:"-"` + Tracking *ZoneChatTrackingDB `json:"-"` } type ZoneChatsHandler struct { + ZoneName string ZoneId string HostId string ChatFSInstance *ChatFSInstance @@ -92,17 +93,17 @@ type ZoneChatsHandler struct { init bool } -func NewZoneChatsHandler(hostId, zoneId, owner string, authorizedMembers []string, dataChannels map[string]*DataChannel, flag *uint32) (zoneChatsHandler *ZoneChatsHandler, err error) { +func NewZoneChatsHandler(hostId, zoneId, zoneName, owner string, authorizedMembers []string, dataChannels map[string]*DataChannel, flag *uint32) (zoneChatsHandler *ZoneChatsHandler, err error) { var dirs []fs.DirEntry dirs, err = os.ReadDir(filepath.Join("data", "zones", zoneId, "chats")) if err != nil { if os.IsNotExist(err) { logger.Printf("creating chat directory for zone %s...\n", zoneId) - mkdirErr := os.MkdirAll(filepath.Join("data", "zones", zoneId, "chats", "general"), 0700) + mkdirErr := os.MkdirAll(filepath.Join("data", "zones", zoneId, "chats", GENERAL), 0700) if mkdirErr != nil { return nil, mkdirErr } - file, ferr := os.Create(filepath.Join("data", "zones", zoneId, "chats", "general", "chatConfig.json")) + file, ferr := os.Create(filepath.Join("data", "zones", zoneId, "chats", GENERAL, "chatConfig.json")) if ferr != nil { return nil, ferr } @@ -110,7 +111,7 @@ func NewZoneChatsHandler(hostId, zoneId, owner string, authorizedMembers []strin ChatId: GENERAL, Owner: owner, ChatType: "public", - Members: authorizedMembers, + Members: []string{}, } bs, jsonErr := json.Marshal(baseConfig) if jsonErr != nil { @@ -134,10 +135,6 @@ func NewZoneChatsHandler(hostId, zoneId, owner string, authorizedMembers []strin if err != nil { return nil, err } - zoneChatTracking,err := NewZoneChatTracking(zoneId, chat.Name()) - if err != nil { - return nil, err - } var bs []byte bs, err = os.ReadFile(filepath.Join("data", "zones", zoneId, "chats", chat.Name(), "chatConfig.json")) if err != nil { @@ -148,8 +145,9 @@ func NewZoneChatsHandler(hostId, zoneId, owner string, authorizedMembers []strin if err = json.Unmarshal(bs, &c); err != nil { return nil, err } - if err = zoneChatTracking.Initialize(0,c.Members...); err != nil { - return nil,err + zoneChatTracking, err := NewZoneChatTracking(zoneId, c.ChatId, c.Members...) + if err != nil { + return nil, err } logger.Println("chats data :", c.ChatId, c.ChatType, c.Owner, c.Members) c.DB = zoneChatDBHandler @@ -160,6 +158,7 @@ func NewZoneChatsHandler(hostId, zoneId, owner string, authorizedMembers []strin chatFSFlag := uint32(0) zoneChatsHandler = &ZoneChatsHandler{ HostId: hostId, + ZoneName: zoneName, ChatFSInstance: NewChatFSInstance(zoneId, owner, authorizedMembers), ChatFSInstanceFlag: &chatFSFlag, ZoneId: zoneId, @@ -200,6 +199,8 @@ func (zch *ZoneChatsHandler) sendDataChannelMessage(reqType string, from string, return jsonErr } err = zch.DataChannels[to].DataChannel.SendText(string(bs)) + } else { + err = fmt.Errorf("no corresponding dataChannel") } return }); err != nil { @@ -250,7 +251,7 @@ func (zch *ZoneChatsHandler) signalCandidate(from string, to string, candidate * }) select { case <-d: - case err = <-e: + case <-e: } return } @@ -262,7 +263,6 @@ func (zch *ZoneChatsHandler) GetChats(userId string, chatsId ...interface{}) (er err = fmt.Errorf("id of wrong type") return } - fmt.Println("chat from get chats", id.(string)) _ = atomicallyExecute(zch.ChatFlag, func() (err error) { if _, ok := zch.Chats[id.(string)]; ok { logger.Println(zch.Chats[id.(string)]) @@ -271,36 +271,25 @@ func (zch *ZoneChatsHandler) GetChats(userId string, chatsId ...interface{}) (er return }) } - fmt.Println("first loop done") for _, chat := range chats { - fmt.Println(chat.ChatId) - index,err := chat.Tracking.GetUserLastIndex(userId) + index, err := chat.Tracking.GetUserLastIndex(userId) if err != nil { - fmt.Println("there") - fmt.Println(err) return err } - chat.DB.lock.RLock() - defer chat.DB.lock.RUnlock() - unread,err := chat.DB.calculateNewChatCount(uint64(index)) + unread, err := chat.DB.calculateNewChatCount(uint64(index)) if err != nil { - fmt.Println("over there") - fmt.Println(err) return err } chat.LastReadIndex = index chat.Unread = unread } - fmt.Println(chats) - done,e := zch.sendDataChannelMessage(GET_CHATS_RESPONSE,"node",userId,map[string]interface{}{ - "chats": chats, - }) - select { - case <-done: - fmt.Println("done") - case terr :=<-e: - fmt.Println(terr) - } + done, e := zch.sendDataChannelMessage(GET_CHATS_RESPONSE, "node", userId, map[string]interface{}{ + "chats": chats, + }) + select { + case <-done: + case err = <-e: + } return } @@ -324,7 +313,7 @@ func (zch *ZoneChatsHandler) AddNewChat(chatName string, owner string, chatType if mkdirErr != nil { return mkdirErr } - mkdirErr = os.Mkdir(filepath.Join("data", "zones", zch.ZoneId, "chats", chatName,"__tracking__"), 0700) + mkdirErr = os.Mkdir(filepath.Join("data", "zones", zch.ZoneId, "chats", chatName, "__tracking__"), 0700) if mkdirErr != nil { return mkdirErr } @@ -333,16 +322,21 @@ func (zch *ZoneChatsHandler) AddNewChat(chatName string, owner string, chatType return ferr } m := make([]string, 0, len(members)) - for _, member := range members { - if mbr, ok := member.(string); ok && mbr != owner { - m = append(m, mbr) + if chatType == PRIVATE { + for _, member := range members { + if mbr, ok := member.(string); ok && mbr != owner { + m = append(m, mbr) + } } + m = append(m, owner) + } else { + m = zch.ZoneMembersId } baseConfig := &ChatConfig{ ChatId: chatName, Owner: owner, ChatType: chatType, - Members: append(m,owner), + Members: m, } bs, jsonErr := json.Marshal(baseConfig) if jsonErr != nil { @@ -359,11 +353,11 @@ func (zch *ZoneChatsHandler) AddNewChat(chatName string, owner string, chatType if err != nil { return err } - zoneChatTrackingDB,err := NewZoneChatTracking(zch.ZoneId,chatName) + zoneChatTrackingDB, err := NewZoneChatTracking(zch.ZoneId, chatName) if err != nil { return err } - if err = zoneChatTrackingDB.Initialize(1,baseConfig.Members...); err != nil { + if err = zoneChatTrackingDB.Initialize(1, baseConfig.Members...); err != nil { return err } var c Chat @@ -387,10 +381,24 @@ func (zch *ZoneChatsHandler) AddNewChat(chatName string, owner string, chatType }) select { case <-done: - case err = <-e: - return + case <-e: } } + bs, err := json.Marshal(map[string]any{ + "zoneId": zch.ZoneId, + "chatId": chatName, + }) + if err != nil { + return err + } + zch.sendZoneRequest(CREATE_NOTIFICATION, "node", map[string]interface{}{ + "type": "added_in_chat", + "title": "Added in zone text channel 💬", + "body": fmt.Sprintf("Added in channel %s in zone %s", c.ChatId, zch.ZoneName), + "isPushed": true, + "payload": string(bs), + "recipients": members, + }) return } switch c.ChatType { @@ -402,7 +410,10 @@ func (zch *ZoneChatsHandler) AddNewChat(chatName string, owner string, chatType return }) case PRIVATE: - err = newChatForMembers(c.Members) + err = atomicallyExecute(zch.ChatFlag, func() (err error) { + err = newChatForMembers(c.Members) + return + }) } return } @@ -426,11 +437,24 @@ func (zch *ZoneChatsHandler) DeleteChat(chatId string) (err error) { }) select { case <-done: - continue - case err = <-e: - return + case <-e: } } + bs, err := json.Marshal(map[string]any{ + "zoneId": zch.ZoneId, + "chatId": chatId, + }) + if err != nil { + return err + } + zch.sendZoneRequest(CREATE_NOTIFICATION, "node", map[string]any{ + "type": "removed_from_chat", + "title": "Removed from zone text channel ⛔️", + "body": fmt.Sprintf("You have no longer access to the channel %s in zone %s", chatId, zch.ZoneName), + "isPushed": true, + "payload": string(bs), + "recipients": members, + }) return } switch zch.Chats[chatId].ChatType { @@ -485,9 +509,10 @@ func (zch *ZoneChatsHandler) EditChatName(chatId string, newChatId string) (err ChatType: chatConfig.ChatType, Members: chatConfig.Members, DB: zch.Chats[chatId].DB, - Tracking:zch.Chats[chatId].Tracking, + Tracking: zch.Chats[chatId].Tracking, } chat.DB.updateDbCallbackFolder(newChatId) + chat.Tracking.updateDBCallbackFolder(newChatId) _ = atomicallyExecute(zch.ChatFlag, func() (err error) { defer delete(zch.Chats, chatId) zch.Chats[newChatId] = chat @@ -540,6 +565,9 @@ func (zch *ZoneChatsHandler) EditChatType(chatId string, chatType string) (err e return } chatConfig.ChatType = chatType + if chatType == PUBLIC { + chatConfig.Members = zch.ZoneMembersId + } bs, err = json.Marshal(&chatConfig) f, err := os.OpenFile(filepath.Join("data", "zones", zch.ZoneId, "chats", chatId, "chatConfig.json"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) defer func() { @@ -557,19 +585,14 @@ func (zch *ZoneChatsHandler) EditChatType(chatId string, chatType string) (err e ChatType: chatType, Members: chatConfig.Members, DB: zch.Chats[chatId].DB, - Tracking:zch.Chats[chatId].Tracking, + Tracking: zch.Chats[chatId].Tracking, } switch chatType { case BROADCAST: fallthrough case PUBLIC: - var members = []string{} - _ = atomicallyExecute(zch.ChatFlag, func() (err error) { - members = append(members, zch.ZoneMembersId...) - return - }) for _, member := range zch.ZoneMembersId { - if pubErr := zch.SetChatPublicForUser(chatId, member); pubErr != nil { + if pubErr := zch.AddChatMembers(chatId, []any{member}); pubErr != nil { logger.Println(pubErr) } zch.sendDataChannelMessage(CHAT_TYPE_EDITED, "node", member, map[string]interface{}{ @@ -578,11 +601,6 @@ func (zch *ZoneChatsHandler) EditChatType(chatId string, chatType string) (err e }) } case PRIVATE: - var members = []string{} - _ = atomicallyExecute(zch.ChatFlag, func() (err error) { - members = append(members, zch.ZoneMembersId...) - return - }) for _, member := range zch.ZoneMembersId { if pubErr := zch.SetChatPrivateForUser(chatId, member); pubErr != nil { logger.Println(pubErr) @@ -636,6 +654,21 @@ memberLoop: "chatId": chatId, }) } + bs, err = json.Marshal(map[string]any{ + "zoneId": zch.ZoneId, + "chatId": chatId, + }) + if err != nil { + return err + } + zch.sendZoneRequest(CREATE_NOTIFICATION, "node", map[string]interface{}{ + "type": "added_in_chat", + "title": "Added in zone text channel 💬", + "body": fmt.Sprintf("Added in channel %s in zone %s", chatId, zch.ZoneName), + "isPushed": true, + "payload": string(bs), + "recipients": chatMembers, + }) bs, err = json.Marshal(&chatConfig) f, err := os.OpenFile(filepath.Join("data", "zones", zch.ZoneId, "chats", chatId, "chatConfig.json"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) defer func() { @@ -655,49 +688,48 @@ memberLoop: DB: zch.Chats[chatId].DB, Tracking: zch.Chats[chatId].Tracking, } - chat.DB.lock.Lock() - lastIndex := chat.DB.PreviousId - chat.DB.lock.Unlock() _ = atomicallyExecute(zch.ChatFlag, func() (err error) { + lastIndex := chat.DB.PreviousId zch.Chats[chatId] = chat - return - }) - var chmb []string - if chat.ChatType == PRIVATE { - chmb = chat.Members - } else { - chmb = zch.ZoneMembersId - } -broadcastLoop: - for _, member := range chmb { - for _, m := range addedMembers { - if err = chat.Tracking.SetUserLastIndex(member,lastIndex); err != nil { - return - } - if member == m { - done, e := zch.sendDataChannelMessage(ADDED_IN_CHAT, "node", member, map[string]interface{}{ - "chat": chat, - }) - select { - case <-done: - case err = <-e: - logger.Println(err) + + var chmb []string + if chat.ChatType == PRIVATE { + chmb = chat.Members + } else { + chmb = zch.ZoneMembersId + } + broadcastLoop: + for _, member := range chmb { + for _, m := range addedMembers { + if err = chat.Tracking.SetUserLastIndex(member, lastIndex); err != nil { + return } - continue broadcastLoop - } - if _, ok := zch.DataChannels[member]; ok { - done, e := zch.sendDataChannelMessage(CHAT_MEMBER_ADDED, "node", member, map[string]interface{}{ - "userId": m, - "chatId": chat.ChatId, - }) - select { - case <-done: - case err = <-e: - logger.Println(err) + if member == m { + done, e := zch.sendDataChannelMessage(ADDED_IN_CHAT, "node", member, map[string]interface{}{ + "chat": chat, + }) + select { + case <-done: + case err = <-e: + logger.Println(err) + } + continue broadcastLoop + } + if _, ok := zch.DataChannels[member]; ok { + done, e := zch.sendDataChannelMessage(CHAT_MEMBER_ADDED, "node", member, map[string]interface{}{ + "userId": m, + "chatId": chat.ChatId, + }) + select { + case <-done: + case err = <-e: + logger.Println(err) + } } } } - } + return + }) return } @@ -770,6 +802,21 @@ func (zch *ZoneChatsHandler) RemoveChatMember(chatId string, chatMember string) "userId": chatMember, "chatId": chatId, }) + bs, err = json.Marshal(map[string]any{ + "zoneId": zch.ZoneId, + "chatId": chatId, + }) + if err != nil { + return + } + zch.sendZoneRequest(CREATE_NOTIFICATION, "node", map[string]interface{}{ + "type": "removed_from_chat", + "title": "Removed from zone text channel ⛔️", + "body": fmt.Sprintf("You have no longer access to the channel %s in zone %s", chatId, zch.ZoneName), + "isPushed": true, + "payload": string(bs), + "recipients": []string{chatMember}, + }) done, e := zch.sendDataChannelMessage(REMOVED_FROM_CHAT, "node", chatMember, map[string]interface{}{ "chatId": chatId, "userId": chatMember, @@ -803,10 +850,10 @@ broadcastLoop: return } -func(zch *ZoneChatsHandler) ReadLastMessage(userId,chatId string) (err error) { +func (zch *ZoneChatsHandler) ReadLastMessage(userId, chatId string) (err error) { err = atomicallyExecute(zch.ChatFlag, func() (err error) { if chat, ok := zch.Chats[chatId]; ok { - err = chat.Tracking.SetUserLastIndex(userId,chat.DB.PreviousId) + err = chat.Tracking.SetUserLastIndex(userId, chat.DB.PreviousId) } return }) @@ -823,7 +870,7 @@ func (zch *ZoneChatsHandler) ListLatestChatMessages(userId, chatId string, lastI if err != nil { return } - err = chat.Tracking.SetUserLastIndex(userId,chat.DB.PreviousId) + err = chat.Tracking.SetUserLastIndex(userId, chat.DB.PreviousId) } return }); err != nil { @@ -876,9 +923,12 @@ func (zch *ZoneChatsHandler) AddChatMessage(userId, chatId, content string, isRe Tags: make([]string, 0), Date: time.Now().Format("Mon, 02 Jan 2006 15:04:05 MST"), } - + var chatType string + var chatMembers []string if err = atomicallyExecute(zch.ChatFlag, func() (err error) { if chat, ok := zch.Chats[chatId]; ok { + chatType = chat.ChatType + chatMembers = chat.Members if isResponse { parentMessage, getErr := chat.DB.GetChatMessage(chatResponseId) if err != nil { @@ -900,37 +950,63 @@ func (zch *ZoneChatsHandler) AddChatMessage(userId, chatId, content string, isRe }); err != nil { return } - _ = atomicallyExecute(zch.ChatFlag, func() (err error) { - chat := zch.Chats[chatId] - logger.Println(chat.ChatType) - switch chat.ChatType { - case BROADCAST: - fallthrough - case PUBLIC: - for _, v := range zch.ZoneMembersId { - done, e := zch.sendDataChannelMessage(NEW_CHAT_MESSAGE, "node", v, map[string]interface{}{ - "chatMessage": chatMessage, - "chatId": chatId, - }) - select { - case <-done: - case err = <-e: + + notifyActivity := func(done <-chan struct{}, e <-chan error, member string) { + select { + case <-done: + case <-e: + _ = atomicallyExecute(zch.ChatFlag, func() (err error) { + if chat, ok := zch.Chats[chatId]; ok { + li, err := chat.Tracking.GetUserLastIndex(member) + if err != nil { + return err + } + count, err := chat.DB.calculateNewChatCount(uint64(li)) + if err != nil { + return err + } + if count == 1 { + bs, err := json.Marshal(map[string]any{ + "zoneId": zch.ZoneId, + "chatId": chatId, + }) + if err != nil { + return err + } + zch.sendZoneRequest(CREATE_NOTIFICATION, "node", map[string]interface{}{ + "type": "new_chat_activity", + "title": "Unread messages 👀", + "body": fmt.Sprintf("New messages in channel %s of %s", chatId, zch.ZoneName), + "isPushed": true, + "payload": string(bs), + "recipients": []string{member}, + }) + } } - } - case PRIVATE: - for _, v := range chat.Members { - done, e := zch.sendDataChannelMessage(NEW_CHAT_MESSAGE, "node", v, map[string]interface{}{ - "chatMessage": chatMessage, - "chatId": chatId, - }) - select { - case <-done: - case err = <-e: - } - } + return + }) } - return - }) + } + switch chatType { + case BROADCAST: + fallthrough + case PUBLIC: + for _, v := range zch.ZoneMembersId { + done, e := zch.sendDataChannelMessage(NEW_CHAT_MESSAGE, "node", v, map[string]interface{}{ + "chatMessage": chatMessage, + "chatId": chatId, + }) + go notifyActivity(done, e, v) + } + case PRIVATE: + for _, v := range chatMembers { + done, e := zch.sendDataChannelMessage(NEW_CHAT_MESSAGE, "node", v, map[string]interface{}{ + "chatMessage": chatMessage, + "chatId": chatId, + }) + go notifyActivity(done, e, v) + } + } return } @@ -955,7 +1031,7 @@ func (zch *ZoneChatsHandler) SetChatPrivateForUser(chatId string, member string) }) select { case <-done: - case err = <-e: + case <-e: } } } @@ -984,7 +1060,7 @@ func (zch *ZoneChatsHandler) SetChatPublicForUser(chatId string, member string) }) select { case <-done: - case err = <-e: + case <-e: } } } @@ -997,15 +1073,22 @@ func (zch *ZoneChatsHandler) SetAllPublicChatForUser(userId string) (err error) for _, chat := range zch.Chats { logger.Println("--------------- public chat for all : ", chat) if chat.ChatType == PUBLIC || chat.ChatType == BROADCAST { - if chat.ChatId == GENERAL { - if err = zch.AddChatMembers(chat.ChatId, []interface{}{userId}); err != nil { - logger.Println(err) + var contains bool + for _, m := range chat.Members { + if m == userId { + contains = true + break + } + } + if !contains { + if addErr := zch.AddChatMembers(chat.ChatId, []any{userId}); addErr != nil { + logger.Println(addErr) } continue } - if err = zch.SetChatPublicForUser(chat.ChatId, userId); err != nil { - continue - } + // if addErr := zch.SetChatPublicForUser(chat.ChatId, userId); addErr != nil { + // continue + // } } } return @@ -1048,7 +1131,7 @@ func (zch *ZoneChatsHandler) ConnectToChatFSInstance(channelId string, userId st d, e := zch.ChatFSInstance.HandleOffer(context.Background(), channelId, userId, sdp, zch.HostId, zch.sendDataChannelMessage, zch.signalCandidate) select { case <-d: - case err = <-e: + case <-e: } return }) @@ -1116,7 +1199,7 @@ func (zch *ZoneChatsHandler) DeleteChatMessage(key uint64, chatId string) (err e func (zch *ZoneChatsHandler) UpdateChatMessage(key uint64, chatId, newContent string) (err error) { err = atomicallyExecute(zch.ChatFlag, func() (err error) { if _, ok := zch.Chats[chatId]; !ok { - err = fmt.Errorf("no file corresponding to id %s", chatId) + err = fmt.Errorf("no chat corresponding to id %s", chatId) return } if err = zch.Chats[chatId].DB.ModifyChatMessage(key, newContent); err != nil { @@ -1279,13 +1362,10 @@ func (zch *ZoneChatsHandler) handleZoneRequest(ctx context.Context, req *ZoneReq } err = zch.AddNewChat(req.Payload["chatId"].(string), req.Payload["owner"].(string), req.Payload["chatType"].(string), req.Payload["members"].([]interface{})) case GET_CHATS: - fmt.Println("got a get chat req") if err = verifyFieldsSliceInterface(req.Payload, "chatsId"); err != nil { return } - fmt.Println("calling get chat") err = zch.GetChats(req.From, req.Payload["chatsId"].([]interface{})...) - fmt.Println("get chat done") case LIST_LATEST_CHATS: if err = verifyFieldsString(req.Payload, "chatId"); err != nil { return @@ -1308,7 +1388,7 @@ func (zch *ZoneChatsHandler) handleZoneRequest(ctx context.Context, req *ZoneReq if err = verifyFieldsString(req.Payload, "chatId"); err != nil { return } - err = zch.ReadLastMessage(req.From,req.Payload["chatId"].(string)) + err = zch.ReadLastMessage(req.From, req.Payload["chatId"].(string)) case ADD_CHAT_MESSAGE: logger.Println("got request in zone chat handler", req) if err = verifyFieldsString(req.Payload, "chatId", "content"); err != nil { @@ -1447,4 +1527,4 @@ func (zch *ZoneChatsHandler) handleZoneRequest(ctx context.Context, req *ZoneReq }) } return -} \ No newline at end of file +} diff --git a/zoneFSInstance.go b/zoneFSInstance.go index c030a44..9880a87 100644 --- a/zoneFSInstance.go +++ b/zoneFSInstance.go @@ -264,7 +264,7 @@ func (fs *FSInstance) SetupFileDownload(path, filename, userId string) (err erro } } logger.Println("done") - <-time.After(4*time.Second) + <-time.After(4 * time.Second) _ = dc.SendText("done") <-time.After(time.Second) _ = dc.Close() diff --git a/zoneGrpcMiddleware.go b/zoneGrpcMiddleware.go index 6de6a93..0e5dd45 100644 --- a/zoneGrpcMiddleware.go +++ b/zoneGrpcMiddleware.go @@ -24,8 +24,8 @@ const ( NEW_ZONE ReqType = "new_zone" NEW_AUTHORIZED_ZONE_MEMBER ReqType = "new_authorized_zone_member" REMOVED_ZONE_AUTHORIZED_MEMBER ReqType = "removed_zone_authorized_member" - DELETE_ZONE = "delete_zone" - DISCONNECT_ZONE_MEMBER = "disconnect_zone_member" + DELETE_ZONE = "delete_zone" + DISCONNECT_ZONE_MEMBER = "disconnect_zone_member" ) type ZoneGrpcMiddleware struct { diff --git a/zoneManager.go b/zoneManager.go index 29a7e60..ac21642 100644 --- a/zoneManager.go +++ b/zoneManager.go @@ -52,7 +52,7 @@ type Zone struct { func NewZone(hostId string, zoneId string, zoneName string, imageUrl string, owner string, creationDate string, initialized bool, authorizedMembers []string) (zone *Zone, err error) { dataChannels, dataChannelFlag := make(map[string]*DataChannel), uint32(0) - zoneChatHandler, err := NewZoneChatsHandler(hostId, zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) + zoneChatHandler, err := NewZoneChatsHandler(hostId, zoneId, zoneName, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return nil, err } @@ -68,11 +68,15 @@ func NewZone(hostId string, zoneId string, zoneName string, imageUrl string, own if err != nil { return } + zoneNotificationsHandler, err := NewZoneNotificationsHandler(hostId, zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) + if err != nil { + return + } zoneFileHandler, err := NewZoneFileHandler(hostId, zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } - zoneScheduler, e := NewZoneRequestScheduler(authorizedMembers, zoneUsersHandler, zoneAudioChannelsHandler, zoneVideoChannelsHandler, zoneFileHandler, zoneChatHandler) + zoneScheduler, e := NewZoneRequestScheduler(authorizedMembers, zoneUsersHandler, zoneAudioChannelsHandler, zoneVideoChannelsHandler, zoneFileHandler, zoneChatHandler, zoneNotificationsHandler) go func() { for schedErr := range e { logger.Println("from scheduler :", schedErr) @@ -111,7 +115,7 @@ func NewZoneManager(id string, token string) (zoneManager *ZoneManager, err erro } zoneMap[zone.ID] = z } - zonesFolder,err := os.ReadDir(filepath.Join("data", "zones")) + zonesFolder, err := os.ReadDir(filepath.Join("data", "zones")) if err != nil { return nil, err } @@ -130,7 +134,7 @@ func NewZoneManager(id string, token string) (zoneManager *ZoneManager, err erro candidateFlag: &candidateFlag, } for _, z := range zonesFolder { - if _,ok := zoneMap[z.Name()]; !ok { + if _, ok := zoneMap[z.Name()]; !ok { logger.Println(zoneManager.DeleteZone(z.Name())) } } @@ -163,17 +167,17 @@ func (zm *ZoneManager) sendSignalingMessage(messageType, from, to string, payloa } func (zm *ZoneManager) DeleteZone(zoneId string) error { - return os.RemoveAll(filepath.Join("data", "zones",zoneId)) + return os.RemoveAll(filepath.Join("data", "zones", zoneId)) } func (zm *ZoneManager) fetchZones(nodeId string, token string) (zones []*Zone, err error) { em := NewEncryptionManager() sig := em.SignRequestHMAC(nodeId) body, err := json.Marshal(map[string]interface{}{ - "type": LIST_ZONES_BY_HOST, - "mac": sig, - "from": nodeId, - "peerType":"node", + "type": LIST_ZONES_BY_HOST, + "mac": sig, + "from": nodeId, + "peerType": "node", "payload": map[string]string{ "host": nodeId, "lastIndex": "0", @@ -245,12 +249,12 @@ func (zm *ZoneManager) HandleOffer(ctx context.Context, from string, to string, return } logger.Println("handling zone offer") - - if _,ok := zm.RTCPeerConnections[from]; ok { - if e := zm.HandleLeavingMember(from,req["zoneId"]); e != nil { - logger.Println(e) - } + + if _, ok := zm.RTCPeerConnections[from]; ok { + if e := zm.HandleLeavingMember(from, req["zoneId"]); e != nil { + logger.Println(e) } + } peerConnection, err := zm.createPeerConnection(from, to, req["zoneId"], webrtc.SDPTypeAnswer, cb) if err != nil { @@ -708,8 +712,8 @@ func (zm *ZoneManager) AddCandidate(candidate *webrtc.ICECandidateInit, from str } func (zm *ZoneManager) HandleLeavingMember(id string, zoneId string) (err error) { - defer func () { - logger.Println(zm.notifyLeavingMember(id,zoneId,zm.ID)) + defer func() { + logger.Println(zm.notifyLeavingMember(id, zoneId, zm.ID)) }() logger.Println("---------------- handling leaving member", id) if err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { @@ -771,16 +775,16 @@ func (zm *ZoneManager) HandleLeavingMember(id string, zoneId string) (err error) return } -func (zm *ZoneManager) notifyLeavingMember(userId,zoneId,hostId string) (err error) { +func (zm *ZoneManager) notifyLeavingMember(userId, zoneId, hostId string) (err error) { em := NewEncryptionManager() sig := em.SignRequestHMAC(hostId) body, err := json.Marshal(map[string]interface{}{ - "type": DISCONNECT_ZONE_MEMBER, - "mac": sig, - "from": hostId, - "peerType":"node", + "type": DISCONNECT_ZONE_MEMBER, + "mac": sig, + "from": hostId, + "peerType": "node", "payload": map[string]string{ - "zoneId": zoneId, + "zoneId": zoneId, "userId": userId, }, }) diff --git a/zoneNotificationsDBHandler.go b/zoneNotificationsDBHandler.go new file mode 100644 index 0000000..59bba28 --- /dev/null +++ b/zoneNotificationsDBHandler.go @@ -0,0 +1,41 @@ +package localserver + +import ( + "path/filepath" + sync "sync" + + "github.com/dgraph-io/badger/v3" +) + +type ZoneNotification struct { + ID string + Type string + Description string + Recipients []string +} + +type ZoneNotificationDBHandler struct { + ZoneID string + db func(func(*badger.DB) (err error)) (err error) + lock *sync.RWMutex +} + +func NewZoneNotificationDBHandler(zoneId string) (zoneFilesDBHandler *ZoneNotificationDBHandler, err error) { + zoneFilesDBHandler = &ZoneNotificationDBHandler{ + db: func(f func(*badger.DB) (err error)) (err error) { + path := filepath.Join("data", "zones", zoneId, "notifications") + db, err := badger.Open(badger.DefaultOptions(path).WithLogger(dbLogger)) + if err != nil { + return + } + defer db.Close() + err = f(db) + return + }, + ZoneID: zoneId, + lock: new(sync.RWMutex), + } + return +} + +//Todo: implement pull notification module for beta only push notification are enabled for simplicity diff --git a/zoneNotificationsHandler.go b/zoneNotificationsHandler.go index ba5ee6b..b40bacc 100644 --- a/zoneNotificationsHandler.go +++ b/zoneNotificationsHandler.go @@ -1,10 +1,179 @@ package localserver +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "path/filepath" +) + +const ( + CREATE_NOTIFICATION = "create_notification" +) + +const ( + NOTIFY = "notify" +) + type ZoneNotificationsHandler struct { - ZoneId string + ZoneId string ZoneMembersId []string DataChannels map[string]*DataChannel Flag *uint32 + DB *ZoneNotificationDBHandler Publishers []<-chan *ZoneRequest reqChans []chan<- *ZoneRequest -} \ No newline at end of file +} + +func NewZoneNotificationsHandler(_ string, zoneId string, owner string, authorizedMembers []string, dataChannels map[string]*DataChannel, flag *uint32) (zoneNotificationsHandler *ZoneNotificationsHandler, err error) { + db, err := NewZoneNotificationDBHandler(zoneId) + if err != nil { + return + } + if _, dirErr := os.ReadDir(filepath.Join("data", "zones", zoneId, "notifications")); os.IsNotExist(dirErr) { + dirErr := os.MkdirAll(filepath.Join("data", "zones", zoneId, "notifications"), 0700) + if dirErr != nil { + return + } + } + zoneNotificationsHandler = &ZoneNotificationsHandler{ + ZoneId: zoneId, + ZoneMembersId: authorizedMembers, + DataChannels: dataChannels, + DB: db, + Flag: flag, + } + return +} + +func (znh *ZoneNotificationsHandler) sendZoneRequest(reqType string, from string, payload map[string]interface{}) { + go func() { + for _, rc := range znh.reqChans { + rc <- &ZoneRequest{ + ReqType: reqType, + From: from, + Payload: payload, + } + } + }() +} + +func (znh *ZoneNotificationsHandler) sendDataChannelMessage(reqType string, from string, to string, payload map[string]interface{}) (<-chan struct{}, <-chan error) { + done, errCh := make(chan struct{}), make(chan error) + go func() { + if err := atomicallyExecute(znh.Flag, func() (err error) { + if _, ok := znh.DataChannels[to]; ok { + bs, jsonErr := json.Marshal(&ZoneResponse{ + Type: reqType, + From: from, + To: to, + Payload: payload, + }) + if jsonErr != nil { + return jsonErr + } + err = znh.DataChannels[to].DataChannel.SendText(string(bs)) + } + return + }); err != nil { + errCh <- err + return + } + done <- struct{}{} + }() + return done, errCh +} + +func (znh *ZoneNotificationsHandler) Init(ctx context.Context, authorizedMembers []string) (err error) { + //? initialization code here + return +} + +func (znh *ZoneNotificationsHandler) Subscribe(ctx context.Context, publisher <-chan *ZoneRequest) (reqChan chan *ZoneRequest, done chan struct{}, errCh chan error) { + reqChan, done, errCh = make(chan *ZoneRequest), make(chan struct{}), make(chan error) + znh.reqChans = append(znh.reqChans, reqChan) + go func() { + for { + select { + case <-ctx.Done(): + done <- struct{}{} + return + case req := <-publisher: + if err := znh.handleZoneRequest(ctx, req); err != nil { + errCh <- err + } + } + } + }() + return +} + +func (zng *ZoneNotificationsHandler) ListNotifications(userId string) {} + +func (zng *ZoneNotificationsHandler) CreateNotification(notificationType, title, body, payload string, isPushed bool, recipients ...string) (err error) { + if isPushed { + err = zng.PushNotification(notificationType, title, body, payload, recipients...) + } + return +} + +func (zng *ZoneNotificationsHandler) DeleteNotification() {} + +func (zng *ZoneNotificationsHandler) PushNotification(notificationType, title, body ,payload string, recipients ...string) (err error) { + em := NewEncryptionManager() + sig := em.SignRequestHMAC(NodeID) + b, err := json.Marshal(map[string]interface{}{ + "type": NOTIFY, + "mac": sig, + "from": NodeID, + "peerType": "node", + "payload": map[string]interface{}{ + "type": notificationType, + "title": title, + "body": body, + "recipients": recipients, + "payload": payload, + }, + }) + if err != nil { + return + } + _, err = http.Post("https://app.zippytal.com/req", "application/json", bytes.NewBuffer(b)) + if err != nil { + logger.Println("error come from there in zone manager") + return + } + return +} + +func (zng *ZoneNotificationsHandler) handleZoneRequest(ctx context.Context, req *ZoneRequest) (err error) { + switch req.ReqType { + case CREATE_NOTIFICATION: + if err = verifyFieldsString(req.Payload, "type", "title", "body","payload"); err != nil { + return + } + if err = verifyFieldsBool(req.Payload, "isPushed"); err != nil { + return + } + if _, ok := req.Payload["recipients"]; !ok { + err = fmt.Errorf("no field recipient in payload") + return + } + if _, ok := req.Payload["recipients"].([]string); !ok { + err = fmt.Errorf(" field recipient in payload is wrong type") + return + } + // recipients := []string{} + // for _, recipient := range req.Payload["recipients"].([]any) { + // if r, ok := recipient.(string); ok { + // recipients = append(recipients, r) + // } + // } + + err = zng.CreateNotification(req.Payload["type"].(string), req.Payload["title"].(string), req.Payload["body"].(string), req.Payload["payload"].(string),req.Payload["isPushed"].(bool),req.Payload["recipients"].([]string)...) + } + return +} diff --git a/zoneRequestScheduler.go b/zoneRequestScheduler.go index 9f8bf7b..c3ca1ce 100644 --- a/zoneRequestScheduler.go +++ b/zoneRequestScheduler.go @@ -72,7 +72,7 @@ func verifyFieldsSliceInterface(payload map[string]interface{}, fields ...string if _, ok := payload[field]; !ok { err = fmt.Errorf("no field %s in payload", field) return - } else if _, ok := payload[field].([]interface{}); !ok { + } else if _, ok := payload[field].([]any); !ok { err = fmt.Errorf("field %s in payload is not a []interface{}", field) return } diff --git a/zoneVideoChannelsHandler.go b/zoneVideoChannelsHandler.go index fbebd33..763f03b 100644 --- a/zoneVideoChannelsHandler.go +++ b/zoneVideoChannelsHandler.go @@ -282,7 +282,7 @@ func (zvch *ZoneVideoChannelsHandler) AddNewVideoChannel(channelName string, own ID: channelName, Owner: owner, ChannelType: channelType, - Members: append(m,owner), + Members: append(m, owner), } bs, jsonErr := json.Marshal(baseConfig) if jsonErr != nil {