package localserver import ( "bytes" "context" "encoding/json" "fmt" "io" "os" "path/filepath" "strconv" "sync" "github.com/google/uuid" "github.com/pion/webrtc/v3" ) type ChatsManager struct { ID string Chats map[string]*NodeChat LocalSD map[string]*webrtc.SessionDescription RTCPeerConnections map[string]*RTCPeerConnection DataChannels map[string]*DataChannel PendingCandidates map[string][]*webrtc.ICECandidate stream SignalingService_LinkClient chatsFlag *uint32 peerConnectionFlag *uint32 localSDFlag *uint32 dataChannelFlag *uint32 candidateFlag *uint32 } type NodeChat struct { ID string `json:"id"` Name string `json:"name"` Initiator string `json:"initiator"` Target string `json:"target"` InitiatorHost string `json:"initiatorHost"` TargetHost string `json:"targetHost"` DataChannels map[string]*DataChannel `json:"-"` DataChannelsFlag *uint32 `json:"-"` ChatRequestScheduler *ChatRequestScheduler `json:"-"` Initialized bool `json:"-"` } func NewNodeChat(id, name, initiator, target, initiatorHost, targetHost string, initialized bool) (chat *NodeChat, err error) { dataChannels, dataChannelFlag := make(map[string]*DataChannel), uint32(0) nodeChatMessageHandler, err := NewNodeChatChannelsHandler(chat.ID, chat.ID, chat.Initiator, chat.Target, chat.InitiatorHost, chat.TargetHost, dataChannels, &dataChannelFlag) chatScheduler, e := NewChatRequestScheduler(initiator, target, initiatorHost, targetHost, nodeChatMessageHandler) go func() { for schedErr := range e { logger.Println("chat error:", schedErr) } }() chat = &NodeChat{ ID: id, Name: name, Initiator: initiator, Target: target, InitiatorHost: initiatorHost, TargetHost: targetHost, DataChannels: dataChannels, DataChannelsFlag: &dataChannelFlag, ChatRequestScheduler: chatScheduler, } return } func NewChatManager(id, token string) (chatManager *ChatsManager, err error) { chatsFlag := uint32(0) peerConnectionFlag := uint32(0) localSDFlag := uint32(0) dataChannelFlag := uint32(0) candidateFlag := uint32(0) dataChannels := make(map[string]*DataChannel) chatsMap := make(map[string]*NodeChat) chats, err := chatManager.fetchChats(id, token) if err != nil { return } for _, chat := range chats { c, err := NewNodeChat(chat.ID, chat.Name, chat.Initiator, chat.Target, chat.InitiatorHost, chat.TargetHost, false) if err != nil { return nil, err } _ = atomicallyExecute(&chatsFlag, func() (err error) { chatsMap[chat.ID] = c return }) } chatsFolder, err := os.ReadDir(filepath.Join(dataPath, "data", "chats")) if err != nil { if os.IsNotExist(err) { if err = os.MkdirAll(filepath.Join(dataPath, "data", "chats"), 0770); err != nil { return } } else { return nil, err } } chatManager = &ChatsManager{ ID: id, Chats: chatsMap, LocalSD: make(map[string]*webrtc.SessionDescription), RTCPeerConnections: make(map[string]*RTCPeerConnection), PendingCandidates: make(map[string][]*webrtc.ICECandidate), DataChannels: dataChannels, chatsFlag: &chatsFlag, peerConnectionFlag: &peerConnectionFlag, localSDFlag: &localSDFlag, dataChannelFlag: &dataChannelFlag, candidateFlag: &candidateFlag, } for _, c := range chatsFolder { if _, ok := chatsMap[c.Name()]; !ok { logger.Println(chatManager.DeleteChat(c.Name())) } } return } func (cm *ChatsManager) fetchChats(nodeId string, token string) (chats []*NodeChat, err error) { em := NewEncryptionManager() sig := em.SignRequestHMAC(nodeId) body, err := json.Marshal(map[string]interface{}{ "type": GET_NODE_CHATS, "mac": sig, "from": nodeId, "peerType": "node", "payload": map[string]string{ "host": nodeId, "lastIndex": "0", }, }) if err != nil { return } res, err := HTTPClient.Post("https://dev.zippytal.com/req", "application/json", bytes.NewReader(body)) if err != nil { logger.Println("error come from there inn chat manager") return } bs, err := io.ReadAll(res.Body) if err != nil { return } var payload map[string]any if err = json.Unmarshal(bs, &payload); err != nil { return } b, err := json.Marshal(payload["Chats"]) if err != nil { return } err = json.Unmarshal(b, &chats) return } func (cm *ChatsManager) sendSignalingMessage(messageType, from, to string, payload map[string]interface{}) (err error) { bs, err := json.Marshal(payload) if err != nil { return } err = cm.stream.Send(&SignalingMessage{ Type: messageType, From: from, To: to, Payload: bs, }) return } func (cm *ChatsManager) DeleteChat(chatId string) error { return os.RemoveAll(filepath.Join(dataPath, "data", "chats", chatId)) } func (cm *ChatsManager) CreateOffer(ctx context.Context, target string, from string, chatId string, cb OnICECandidateFunc) (err error) { peerConnection, err := cm.createPeerConnection(target, from, chatId, webrtc.SDPTypeOffer, cb) if err != nil { return } logger.Println("connection created") rawOffer, err := peerConnection.CreateOffer(nil) if err != nil { return } if err = peerConnection.SetLocalDescription(rawOffer); err != nil { return } _ = atomicallyExecute(cm.peerConnectionFlag, func() (err error) { id := uuid.New().String() logger.Println("adding for target", target) cm.RTCPeerConnections[target] = &RTCPeerConnection{ id: id, PeerConnection: peerConnection, makingOffer: true, makingOfferLock: &sync.Mutex{}, negotiate: cm.negotiate, } return }) err = cm.sendSignalingMessage(string(CHAT_WEBRTC_OFFER), cm.ID, target, map[string]any{ "to": target, "from": cm.ID, "sdp": rawOffer.SDP, }) return } func (cm *ChatsManager) HandleOffer(ctx context.Context, from string, to string, req map[string]string, cb OnICECandidateFunc) (err error) { done, errCh := make(chan struct{}), make(chan error) go func() { if _, ok := cm.Chats[req["chatId"]]; !ok { err = fmt.Errorf("no corresponding chat") errCh <- err return } logger.Println("handling chat offer") _ = atomicallyExecute(cm.chatsFlag, func() (err error) { return }) if _, ok := cm.RTCPeerConnections[from]; ok { if e := cm.HandleLeavingMember(from, req["chatId"], false); e != nil { logger.Println(e) } } peerConnection, err := cm.createPeerConnection(from, to, req["chatId"], webrtc.SDPTypeAnswer, cb) if err != nil { errCh <- err return } logger.Println("peer connection created") _ = atomicallyExecute(cm.peerConnectionFlag, func() (err error) { id := uuid.New().String() cm.RTCPeerConnections[from] = &RTCPeerConnection{ PeerConnection: peerConnection, id: id, makingOffer: false, makingOfferLock: &sync.Mutex{}, negotiate: cm.negotiate, } logger.Println("peer connection added to map") offer := webrtc.SessionDescription{ Type: webrtc.SDPTypeOffer, SDP: req[SDP], } if err = peerConnection.SetRemoteDescription(offer); err != nil { errCh <- err return } rawAnswer, err := peerConnection.CreateAnswer(nil) if err != nil { errCh <- err return } _ = atomicallyExecute(cm.localSDFlag, func() (err error) { cm.LocalSD[from] = &rawAnswer return }) if err = peerConnection.SetLocalDescription(rawAnswer); err != nil { errCh <- err return } if err = cm.sendSignalingMessage(string(CHAT_WEBRTC_ANSWER), cm.ID, from, map[string]any{ "to": from, "from": cm.ID, "sdp": rawAnswer.SDP, }); err != nil { errCh <- err return } done <- struct{}{} return }) }() select { case <-done: return case err = <-errCh: return case <-ctx.Done(): err = ctx.Err() return } } func (cm *ChatsManager) HandleAnswer(ctx context.Context, from string, to string, req map[string]string) (err error) { defer func() { if r := recover(); err != nil { logger.Printf("recover from panic in handle answer : %v\n", r) } }() if err = atomicallyExecute(cm.peerConnectionFlag, func() (err error) { if _, ok := cm.RTCPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id : %s", from) return } peerConnnection := cm.RTCPeerConnections[from] logger.Println("---------------------") logger.Println(req[SDP]) logger.Println("---------------------") if err = peerConnnection.SetRemoteDescription(webrtc.SessionDescription{ Type: webrtc.SDPTypeAnswer, SDP: req[SDP], }); err != nil { logger.Println("error occured while setting remote description in handle answer") return } return }); err != nil { return } if err = cm.sendSignalingMessage(string(CHAT_WEBRTC_COUNTER_OFFER), cm.ID, from, map[string]any{ "from": cm.ID, "to": from, }); err != nil { return } _ = atomicallyExecute(cm.candidateFlag, func() (err error) { for _, candidate := range cm.PendingCandidates[from] { logger.Println("sending candidate from answer to", from) if err = cm.sendSignalingMessage(string(CHAT_WEBRTC_CANDIDATE), cm.ID, from, map[string]any{ "from": cm.ID, "to": from, "candidate": candidate.ToJSON().Candidate, "sdpMid": *candidate.ToJSON().SDPMid, "sdpMLineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)), }); err != nil { logger.Println(err) continue } } delete(cm.PendingCandidates, from) return }) _ = atomicallyExecute(cm.localSDFlag, func() (err error) { delete(cm.LocalSD, from) return }) return } func (cm *ChatsManager) HandleCounterOffer(ctx context.Context, from string, to string, req map[string]string) (err error) { _ = atomicallyExecute(cm.candidateFlag, func() (err error) { for _, candidate := range cm.PendingCandidates[from] { logger.Println("sending candidate to", from) if err = cm.sendSignalingMessage(string(CHAT_WEBRTC_CANDIDATE), cm.ID, from, map[string]any{ "from": cm.ID, "to": from, "candidate": candidate.ToJSON().Candidate, "sdpMid": *candidate.ToJSON().SDPMid, "sdpMLineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)), }); err != nil { return } } delete(cm.PendingCandidates, from) return }) _ = atomicallyExecute(cm.localSDFlag, func() (err error) { delete(cm.LocalSD, from) return }) return } func (cm *ChatsManager) createPeerConnection(target string, from string, chatId string, peerType webrtc.SDPType, cb OnICECandidateFunc) (peerConnection *webrtc.PeerConnection, err error) { defer func() { if r := recover(); err != nil { logger.Printf("recover from panic : %v\n", r) } }() config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302", "stun:stunserver.org:3478"}, }, }, SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, } peerConnection, err = webrtc.NewPeerConnection(config) if err != nil { return } logger.Println("---------------------------------------------------") if peerType == webrtc.SDPTypeOffer { channel, err := peerConnection.CreateDataChannel("data", &webrtc.DataChannelInit{}) if err != nil { return nil, err } reqChan := make(chan *ChatRequest) channel.OnOpen(func() { logger.Println(chatId) if _, ok := cm.Chats[chatId]; ok { logger.Println("this chat exist") _ = atomicallyExecute(cm.Chats[chatId].DataChannelsFlag, func() (err error) { x := uint32(0) cm.Chats[chatId].DataChannels[target] = &DataChannel{DataChannel: channel, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &x} return }) if _, ok := cm.Chats[chatId]; !ok { err = fmt.Errorf("no corresponding Chats") return } done, err := cm.Chats[chatId].ChatRequestScheduler.Schedule(reqChan) bs, jsonErr := json.Marshal(&ZoneResponse{ Type: "user_chat_init", From: chatId, To: target, Payload: map[string]interface{}{}, }) if jsonErr != nil { logger.Println("error in open channel", jsonErr) return } if sendErr := channel.SendText(string(bs)); sendErr != nil { logger.Println("error in open channel send", sendErr) return } go func() { for { select { case <-done: return case e := <-err: logger.Println("----- error from scheduler:", e) } } }() } }) channel.OnClose(func() { close(reqChan) //_ = cm.HandleLeavingMember(target, chatId, true) }) channel.OnError(func(err error) { close(reqChan) //_ = cm.HandleLeavingMember(target, chatId, true) }) channel.OnMessage(func(msg webrtc.DataChannelMessage) { var req ChatRequest if err := json.Unmarshal(msg.Data, &req); err != nil { logger.Println(err) return } logger.Println("incoming request", req) reqChan <- &req }) logger.Println("new channel for target : ", target) logger.Println(target) _ = atomicallyExecute(cm.dataChannelFlag, func() (err error) { l := uint32(0) cm.DataChannels[target] = &DataChannel{ DataChannel: channel, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &l, } return }) } else { peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) { _ = atomicallyExecute(cm.dataChannelFlag, func() (err error) { l := uint32(0) cm.DataChannels[target] = &DataChannel{ DataChannel: dc, l: &l, } return }) reqChan := make(chan *ChatRequest, 100) if dc.Label() == "data" { dc.OnOpen(func() { logger.Println(chatId) if _, ok := cm.Chats[chatId]; ok { logger.Println("this chat exist") _ = atomicallyExecute(cm.Chats[chatId].DataChannelsFlag, func() (err error) { logger.Println("adding dc to dc map") x := uint32(0) cm.Chats[chatId].DataChannels[target] = &DataChannel{DataChannel: dc, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &x} return }) if _, ok := cm.Chats[chatId]; !ok { err = fmt.Errorf("no corresponding Chats") return } done, err := cm.Chats[chatId].ChatRequestScheduler.Schedule(reqChan) go func() { for { select { case <-done: return case <-err: } } }() } }) dc.OnClose(func() { fmt.Println("closing gracefully event dc...") close(reqChan) }) dc.OnError(func(err error) { logger.Println("--------------- error in dc:", err) close(reqChan) }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { var req ChatRequest if err := json.Unmarshal(msg.Data, &req); err != nil { logger.Println(err) return } logger.Println("incoming request", req) go func() { reqChan <- &req }() }) _ = atomicallyExecute(cm.dataChannelFlag, func() (err error) { l := uint32(0) cm.DataChannels[target] = &DataChannel{ DataChannel: dc, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &l, } return }) } else { if _, ok := cm.Chats[chatId]; ok { fmt.Println("got new mtfking datachannel") fmt.Println(dc.Label()) scheduler := cm.Chats[chatId].ChatRequestScheduler l := uint32(0) datachannel := &DataChannel{ DataChannel: dc, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &l, } catched := scheduler.DispatchDataChannel(context.Background(), datachannel) if !catched { if closeErr := datachannel.DataChannel.Close(); closeErr != nil { logger.Println(closeErr) } } } } }) } peerConnection.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { if pcs == webrtc.PeerConnectionStateDisconnected || pcs == webrtc.PeerConnectionStateFailed { logger.Println(pcs) if err = cm.HandleLeavingMember(target, chatId, true); err != nil { logger.Println(err) } } }) peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) { logger.Printf("ICE connection state has changed %s\n", is.String()) }) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { return } _ = atomicallyExecute(cm.candidateFlag, func() (err error) { desc := peerConnection.RemoteDescription() if desc == nil { logger.Println("generated candidate appended to list : ", i) cm.PendingCandidates[target] = append(cm.PendingCandidates[target], i) } else { logger.Println("generated candidate : ", i) if iceCandidateErr := cb(target, i); iceCandidateErr != nil { logger.Println(iceCandidateErr) } } return }) }) return } func (cm *ChatsManager) HandleRennegotiationOffer(from, sdp string) (err error) { err = atomicallyExecute(cm.peerConnectionFlag, func() (err error) { if _, ok := cm.RTCPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id %s", from) return } if err = cm.RTCPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeOffer}); err != nil { return } localSd, err := cm.RTCPeerConnections[from].CreateAnswer(nil) if err != nil { return } if err = cm.RTCPeerConnections[from].SetLocalDescription(localSd); err != nil { return } if err = cm.sendSignalingMessage(string(CHAT_WEBRTC_RENNEGOTIATION_ANSWER), cm.ID, from, map[string]any{ "to": from, "sdp": localSd.SDP, }); err != nil { logger.Println(err) return } return }) return } func (cm *ChatsManager) HandleRennegotiationAnswer(from string, sdp string) (err error) { _ = atomicallyExecute(cm.peerConnectionFlag, func() (err error) { if _, ok := cm.RTCPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id %s", from) return } err = cm.RTCPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer}) return }) return } func (cm *ChatsManager) AddCandidate(candidate *webrtc.ICECandidateInit, from string) (err error) { _ = atomicallyExecute(cm.candidateFlag, func() (err error) { if candidate != nil { if connection, ok := cm.RTCPeerConnections[from]; ok { err = connection.AddICECandidate(*candidate) } } return }) return } func (cm *ChatsManager) HandleLeavingMember(id, chatId string, signalLeaving bool) (err error) { logger.Println("---------------- handling leaving member", id) if err = atomicallyExecute(cm.peerConnectionFlag, func() (err error) { if _, ok := cm.RTCPeerConnections[id]; !ok { err = fmt.Errorf("no correponding peerconnection for id %s", id) return } return }); err != nil { return } if signalLeaving { nerr := cm.notifyLeavingMember(id, chatId, NodeID) fmt.Println(nerr) } err = atomicallyExecute(cm.chatsFlag, func() (err error) { logger.Println(err) logger.Println("---------------- cleaning chat handlers", id) if chat, ok := cm.Chats[chatId]; ok { for _, handlersPublishers := range chat.ChatRequestScheduler.handlersPublishers { go func(hp chan<- *ChatRequest) { hp <- &ChatRequest{ ReqType: LEAVE_CHAT, From: id, Payload: map[string]interface{}{ "userId": id, }, } }(handlersPublishers) } if err = atomicallyExecute(chat.DataChannelsFlag, func() (err error) { defer delete(chat.DataChannels, id) if dataChannel, ok := chat.DataChannels[id]; ok { if err = dataChannel.DataChannel.Close(); err != nil { return } } return }); err != nil { fmt.Println(err) } logger.Println("datachannels cleaned", id) } else { err = fmt.Errorf("no corresponding chat for chatId %s", chatId) } logger.Println(err) err = atomicallyExecute(cm.peerConnectionFlag, func() (err error) { if _, ok := cm.RTCPeerConnections[id]; ok { defer delete(cm.RTCPeerConnections, id) if err = cm.RTCPeerConnections[id].Close(); err != nil { return } } return }) fmt.Println(err) _ = atomicallyExecute(cm.candidateFlag, func() (err error) { delete(cm.PendingCandidates, id) return }) return }) return } func (cm *ChatsManager) notifyLeavingMember(userId, chatId, hostId string) (err error) { em := NewEncryptionManager() sig := em.SignRequestHMAC(NodeID) body, err := json.Marshal(map[string]interface{}{ "type": "DISCONNECT_CHAT_MEMBER", "mac": sig, "from": NodeID, "peerType": "node", "payload": map[string]string{ "chatId": chatId, "userId": userId, }, }) if err != nil { return } _, err = HTTPClient.Post("https://dev.zippytal.com/req", "application/json", bytes.NewReader(body)) if err != nil { logger.Println("error come from there in chat manager") return } return } func (cm *ChatsManager) negotiate(target string, chatId string) { _ = atomicallyExecute(cm.peerConnectionFlag, func() (err error) { if _, ok := cm.RTCPeerConnections[target]; !ok { err = fmt.Errorf("no peerConnections") return } return }) }