Zippytal-Node/chatManager.go

757 lines
22 KiB
Go

package localserver
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"sync"
"github.com/google/uuid"
"github.com/pion/webrtc/v3"
)
const (
UPDATE_UNREAD_COUNT = "update_unread_count"
UPDATE_LAST_INTERACTION_TIME = "update_last_interaction_time"
DISCONNECT_USER_FROM_CHAT = "disconnect_user_from_chat"
)
type ChatsManager struct {
ID string
Chats map[string]*NodeChat
LocalSD map[string]*webrtc.SessionDescription
RTCPeerConnections map[string]*RTCPeerConnection
DataChannels map[string]*DataChannel
PendingCandidates map[string][]*webrtc.ICECandidate
stream SignalingService_LinkClient
chatsFlag *uint32
peerConnectionFlag *uint32
localSDFlag *uint32
dataChannelFlag *uint32
candidateFlag *uint32
}
type NodeChat struct {
ID string `json:"id"`
Name string `json:"name"`
Initiator string `json:"initiator"`
Target string `json:"target"`
InitiatorHost string `json:"initiatorHost"`
TargetHost string `json:"targetHost"`
DataChannels map[string]*DataChannel `json:"-"`
DataChannelsFlag *uint32 `json:"-"`
ChatRequestScheduler *ChatRequestScheduler `json:"-"`
Initialized bool `json:"-"`
}
func NewNodeChat(id, name, initiator, target, initiatorHost, targetHost string, initialized bool) (chat *NodeChat, err error) {
dataChannels, dataChannelFlag := make(map[string]*DataChannel), uint32(0)
nodeChatMessageHandler, err := NewNodeChatChannelsHandler(id, id, initiator, target, initiatorHost, targetHost, dataChannels, &dataChannelFlag)
if err != nil {
fmt.Println("error occured there")
fmt.Println(err)
return
}
nodeChatNotificationsHandler, err := NewChatNotificationsHandler(id, initiator, target, initiatorHost, targetHost, dataChannels, &dataChannelFlag)
if err != nil {
return
}
chatScheduler, e := NewChatRequestScheduler(initiator, target, initiatorHost, targetHost, nodeChatMessageHandler, nodeChatNotificationsHandler)
go func() {
for range e {
}
}()
chat = &NodeChat{
ID: id,
Name: name,
Initiator: initiator,
Target: target,
InitiatorHost: initiatorHost,
TargetHost: targetHost,
DataChannels: dataChannels,
DataChannelsFlag: &dataChannelFlag,
ChatRequestScheduler: chatScheduler,
}
return
}
func NewChatManager(id, token string) (chatManager *ChatsManager, err error) {
chatsFlag := uint32(0)
peerConnectionFlag := uint32(0)
localSDFlag := uint32(0)
dataChannelFlag := uint32(0)
candidateFlag := uint32(0)
dataChannels := make(map[string]*DataChannel)
chatsMap := make(map[string]*NodeChat)
chats, err := chatManager.fetchChats(id, token)
if err != nil {
return
}
for _, chat := range chats {
c, err := NewNodeChat(chat.ID, chat.Name, chat.Initiator, chat.Target, chat.InitiatorHost, chat.TargetHost, false)
if err != nil {
return nil, err
}
_ = atomicallyExecute(&chatsFlag, func() (err error) {
chatsMap[chat.ID] = c
return
})
}
chatsFolder, err := os.ReadDir(filepath.Join(dataPath, "data", "chats"))
if err != nil {
if os.IsNotExist(err) {
if err = os.MkdirAll(filepath.Join(dataPath, "data", "chats"), 0770); err != nil {
return
}
} else {
return nil, err
}
}
chatManager = &ChatsManager{
ID: id,
Chats: chatsMap,
LocalSD: make(map[string]*webrtc.SessionDescription),
RTCPeerConnections: make(map[string]*RTCPeerConnection),
PendingCandidates: make(map[string][]*webrtc.ICECandidate),
DataChannels: dataChannels,
chatsFlag: &chatsFlag,
peerConnectionFlag: &peerConnectionFlag,
localSDFlag: &localSDFlag,
dataChannelFlag: &dataChannelFlag,
candidateFlag: &candidateFlag,
}
for _, c := range chatsFolder {
if _, ok := chatsMap[c.Name()]; !ok {
// logger.Println(chatManager.DeleteChat(c.Name()))
}
}
return
}
func (cm *ChatsManager) fetchChats(nodeId string, token string) (chats []*NodeChat, err error) {
em := NewEncryptionManager()
sig := em.SignRequestHMAC(nodeId)
body, err := json.Marshal(map[string]interface{}{
"type": GET_NODE_CHATS,
"mac": sig,
"from": nodeId,
"peerType": "node",
"payload": map[string]string{
"host": nodeId,
"lastIndex": "0",
},
})
if err != nil {
return
}
res, err := HTTPClient.Post("https://dev.zippytal.com/req", "application/json", bytes.NewReader(body))
if err != nil {
// logger.Println("error come from there inn chat manager")
return
}
bs, err := io.ReadAll(res.Body)
if err != nil {
return
}
var payload map[string]any
if err = json.Unmarshal(bs, &payload); err != nil {
return
}
fmt.Println(payload)
b, err := json.Marshal(payload["chats"])
if err != nil {
return
}
err = json.Unmarshal(b, &chats)
return
}
func (cm *ChatsManager) sendSignalingMessage(messageType, from, to string, payload map[string]interface{}) (err error) {
bs, err := json.Marshal(payload)
if err != nil {
return
}
err = cm.stream.Send(&SignalingMessage{
Type: messageType,
From: from,
To: to,
Payload: bs,
})
return
}
func (cm *ChatsManager) DeleteChat(chatId string) error {
return os.RemoveAll(filepath.Join(dataPath, "data", "chats", chatId))
}
func (cm *ChatsManager) CreateOffer(ctx context.Context, target string, from string, chatId string, cb OnICECandidateFunc) (err error) {
peerConnection, err := cm.createPeerConnection(target, from, chatId, webrtc.SDPTypeOffer, cb)
if err != nil {
return
}
// logger.Println("connection created")
rawOffer, err := peerConnection.CreateOffer(nil)
if err != nil {
return
}
if err = peerConnection.SetLocalDescription(rawOffer); err != nil {
return
}
_ = atomicallyExecute(cm.peerConnectionFlag, func() (err error) {
id := uuid.New().String()
// logger.Println("adding for target", target)
cm.RTCPeerConnections[target] = &RTCPeerConnection{
id: id,
PeerConnection: peerConnection,
makingOffer: true,
makingOfferLock: &sync.Mutex{},
negotiate: cm.negotiate,
}
return
})
err = cm.sendSignalingMessage(string(CHAT_WEBRTC_OFFER), cm.ID, target, map[string]any{
"to": target,
"from": cm.ID,
"sdp": rawOffer.SDP,
})
return
}
func (cm *ChatsManager) HandleOffer(ctx context.Context, from string, to string, req map[string]string, cb OnICECandidateFunc) (err error) {
done, errCh := make(chan struct{}), make(chan error)
go func() {
if id, ok := cm.Chats[req["chatId"]]; !ok {
err = fmt.Errorf("no corresponding chat")
fmt.Println(req["chatId"])
fmt.Println(cm.Chats)
fmt.Println(id)
fmt.Println(err)
errCh <- err
return
}
// logger.Println("handling chat offer")
_ = atomicallyExecute(cm.chatsFlag, func() (err error) {
return
})
if _, ok := cm.RTCPeerConnections[from]; ok {
if e := cm.HandleLeavingMember(from, req["chatId"], false); e != nil {
// logger.Println(e)
}
}
peerConnection, err := cm.createPeerConnection(from, to, req["chatId"], webrtc.SDPTypeAnswer, cb)
if err != nil {
errCh <- err
return
}
// logger.Println("peer connection created")
_ = atomicallyExecute(cm.peerConnectionFlag, func() (err error) {
id := uuid.New().String()
cm.RTCPeerConnections[from] = &RTCPeerConnection{
PeerConnection: peerConnection,
id: id,
makingOffer: false,
makingOfferLock: &sync.Mutex{},
negotiate: cm.negotiate,
}
// logger.Println("peer connection added to map")
offer := webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: req[SDP],
}
if err = peerConnection.SetRemoteDescription(offer); err != nil {
errCh <- err
return
}
rawAnswer, err := peerConnection.CreateAnswer(nil)
if err != nil {
errCh <- err
return
}
_ = atomicallyExecute(cm.localSDFlag, func() (err error) {
cm.LocalSD[from] = &rawAnswer
return
})
if err = peerConnection.SetLocalDescription(rawAnswer); err != nil {
errCh <- err
return
}
if err = cm.sendSignalingMessage(string(CHAT_WEBRTC_ANSWER), cm.ID, from, map[string]any{
"to": from,
"from": cm.ID,
"sdp": rawAnswer.SDP,
}); err != nil {
errCh <- err
return
}
done <- struct{}{}
return
})
}()
select {
case <-done:
return
case err = <-errCh:
return
case <-ctx.Done():
err = ctx.Err()
return
}
}
func (cm *ChatsManager) HandleAnswer(ctx context.Context, from string, to string, req map[string]string) (err error) {
defer func() {
if r := recover(); err != nil {
logger.Printf("recover from panic in handle answer : %v\n", r)
}
}()
if err = atomicallyExecute(cm.peerConnectionFlag, func() (err error) {
if _, ok := cm.RTCPeerConnections[from]; !ok {
err = fmt.Errorf("no corresponding peer connection for id : %s", from)
return
}
peerConnnection := cm.RTCPeerConnections[from]
// logger.Println("---------------------")
// logger.Println(req[SDP])
// logger.Println("---------------------")
if err = peerConnnection.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: req[SDP],
}); err != nil {
// logger.Println("error occured while setting remote description in handle answer")
return
}
return
}); err != nil {
return
}
if err = cm.sendSignalingMessage(string(CHAT_WEBRTC_COUNTER_OFFER), cm.ID, from, map[string]any{
"from": cm.ID,
"to": from,
}); err != nil {
return
}
_ = atomicallyExecute(cm.candidateFlag, func() (err error) {
for _, candidate := range cm.PendingCandidates[from] {
// logger.Println("sending candidate from answer to", from)
if err = cm.sendSignalingMessage(string(CHAT_WEBRTC_CANDIDATE), cm.ID, from, map[string]any{
"from": cm.ID,
"to": from,
"candidate": candidate.ToJSON().Candidate,
"sdpMid": *candidate.ToJSON().SDPMid,
"sdpMLineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)),
}); err != nil {
// logger.Println(err)
continue
}
}
delete(cm.PendingCandidates, from)
return
})
_ = atomicallyExecute(cm.localSDFlag, func() (err error) {
delete(cm.LocalSD, from)
return
})
return
}
func (cm *ChatsManager) HandleCounterOffer(ctx context.Context, from string, to string, req map[string]string) (err error) {
_ = atomicallyExecute(cm.candidateFlag, func() (err error) {
for _, candidate := range cm.PendingCandidates[from] {
// logger.Println("sending candidate to", from)
if err = cm.sendSignalingMessage(string(CHAT_WEBRTC_CANDIDATE), cm.ID, from, map[string]any{
"from": cm.ID,
"to": from,
"candidate": candidate.ToJSON().Candidate,
"sdpMid": *candidate.ToJSON().SDPMid,
"sdpMLineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)),
}); err != nil {
return
}
}
delete(cm.PendingCandidates, from)
return
})
_ = atomicallyExecute(cm.localSDFlag, func() (err error) {
delete(cm.LocalSD, from)
return
})
return
}
func (cm *ChatsManager) createPeerConnection(target string, from string, chatId string, peerType webrtc.SDPType, cb OnICECandidateFunc) (peerConnection *webrtc.PeerConnection, err error) {
defer func() {
if r := recover(); err != nil {
logger.Printf("recover from panic : %v\n", r)
}
}()
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
}
peerConnection, err = webrtc.NewPeerConnection(config)
if err != nil {
return
}
// logger.Println("---------------------------------------------------")
if peerType == webrtc.SDPTypeOffer {
channel, err := peerConnection.CreateDataChannel("data", &webrtc.DataChannelInit{})
if err != nil {
return nil, err
}
reqChan := make(chan *ChatRequest)
channel.OnOpen(func() {
// logger.Println(chatId)
if _, ok := cm.Chats[chatId]; ok {
// logger.Println("this chat exist")
_ = atomicallyExecute(cm.Chats[chatId].DataChannelsFlag, func() (err error) {
x := uint32(0)
cm.Chats[chatId].DataChannels[target] = &DataChannel{DataChannel: channel, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &x}
return
})
if _, ok := cm.Chats[chatId]; !ok {
err = fmt.Errorf("no corresponding Chats")
return
}
done, err := cm.Chats[chatId].ChatRequestScheduler.Schedule(reqChan)
bs, jsonErr := json.Marshal(&ZoneResponse{
Type: "user_chat_init",
From: chatId,
To: target,
Payload: map[string]interface{}{},
})
if jsonErr != nil {
// logger.Println("error in open channel", jsonErr)
return
}
if sendErr := channel.SendText(string(bs)); sendErr != nil {
// logger.Println("error in open channel send", sendErr)
return
}
go func() {
for {
select {
case <-done:
return
case <-err:
// logger.Println("----- error from scheduler:", e)
}
}
}()
}
})
channel.OnClose(func() {
if _, closed := <-reqChan; !closed {
close(reqChan)
}
//_ = cm.HandleLeavingMember(target, chatId, true)
})
channel.OnError(func(err error) {
if _, closed := <-reqChan; !closed {
close(reqChan)
}
//_ = cm.HandleLeavingMember(target, chatId, true)
})
channel.OnMessage(func(msg webrtc.DataChannelMessage) {
var req ChatRequest
if err := json.Unmarshal(msg.Data, &req); err != nil {
// logger.Println(err)
return
}
// logger.Println("incoming request", req)
reqChan <- &req
})
// logger.Println("new channel for target : ", target)
// logger.Println(target)
_ = atomicallyExecute(cm.dataChannelFlag, func() (err error) {
l := uint32(0)
cm.DataChannels[target] = &DataChannel{
DataChannel: channel,
bufferedAmountLowThresholdReached: make(<-chan struct{}),
l: &l,
}
return
})
} else {
peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) {
_ = atomicallyExecute(cm.dataChannelFlag, func() (err error) {
l := uint32(0)
cm.DataChannels[target] = &DataChannel{
DataChannel: dc,
l: &l,
}
return
})
reqChan := make(chan *ChatRequest, 100)
if dc.Label() == "data" {
dc.OnOpen(func() {
// logger.Println(chatId)
if _, ok := cm.Chats[chatId]; ok {
// logger.Println("this chat exist")
_ = atomicallyExecute(cm.Chats[chatId].DataChannelsFlag, func() (err error) {
// logger.Println("adding dc to dc map")
x := uint32(0)
cm.Chats[chatId].DataChannels[target] = &DataChannel{DataChannel: dc, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &x}
return
})
if _, ok := cm.Chats[chatId]; !ok {
err = fmt.Errorf("no corresponding Chats")
return
}
done, err := cm.Chats[chatId].ChatRequestScheduler.Schedule(reqChan)
go func() {
for {
select {
case <-done:
return
case <-err:
}
}
}()
}
})
dc.OnClose(func() {
fmt.Println("closing gracefully event dc...")
close(reqChan)
})
dc.OnError(func(err error) {
// logger.Println("--------------- error in dc:", err)
close(reqChan)
})
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
var req ChatRequest
if err := json.Unmarshal(msg.Data, &req); err != nil {
// logger.Println(err)
return
}
fmt.Println("incoming req", req)
// logger.Println("incoming request", req)
go func() {
reqChan <- &req
}()
})
_ = atomicallyExecute(cm.dataChannelFlag, func() (err error) {
l := uint32(0)
cm.DataChannels[target] = &DataChannel{
DataChannel: dc,
bufferedAmountLowThresholdReached: make(<-chan struct{}),
l: &l,
}
return
})
} else {
if _, ok := cm.Chats[chatId]; ok {
fmt.Println("got new mtfking datachannel")
fmt.Println(dc.Label())
scheduler := cm.Chats[chatId].ChatRequestScheduler
l := uint32(0)
datachannel := &DataChannel{
DataChannel: dc,
bufferedAmountLowThresholdReached: make(<-chan struct{}),
l: &l,
}
catched := scheduler.DispatchDataChannel(context.Background(), datachannel)
if !catched {
if closeErr := datachannel.DataChannel.Close(); closeErr != nil {
// logger.Println(closeErr)
}
}
}
}
})
}
peerConnection.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
if pcs == webrtc.PeerConnectionStateDisconnected || pcs == webrtc.PeerConnectionStateFailed {
// logger.Println(pcs)
if err = cm.HandleLeavingMember(target, chatId, true); err != nil {
// logger.Println(err)
}
}
})
peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) {
// logger.Printf("ICE connection state has changed %s\n", is.String())
})
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
return
}
_ = atomicallyExecute(cm.candidateFlag, func() (err error) {
desc := peerConnection.RemoteDescription()
if desc == nil {
// logger.Println("generated candidate appended to list : ", i)
cm.PendingCandidates[target] = append(cm.PendingCandidates[target], i)
} else {
// logger.Println("generated candidate : ", i)
if iceCandidateErr := cb(target, i); iceCandidateErr != nil {
// logger.Println(iceCandidateErr)
}
}
return
})
})
return
}
func (cm *ChatsManager) HandleRennegotiationOffer(from, sdp string) (err error) {
err = atomicallyExecute(cm.peerConnectionFlag, func() (err error) {
if _, ok := cm.RTCPeerConnections[from]; !ok {
err = fmt.Errorf("no corresponding peer connection for id %s", from)
return
}
if err = cm.RTCPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeOffer}); err != nil {
return
}
localSd, err := cm.RTCPeerConnections[from].CreateAnswer(nil)
if err != nil {
return
}
if err = cm.RTCPeerConnections[from].SetLocalDescription(localSd); err != nil {
return
}
if err = cm.sendSignalingMessage(string(CHAT_WEBRTC_RENNEGOTIATION_ANSWER), cm.ID, from, map[string]any{
"to": from,
"sdp": localSd.SDP,
}); err != nil {
// logger.Println(err)
return
}
return
})
return
}
func (cm *ChatsManager) HandleRennegotiationAnswer(from string, sdp string) (err error) {
_ = atomicallyExecute(cm.peerConnectionFlag, func() (err error) {
if _, ok := cm.RTCPeerConnections[from]; !ok {
err = fmt.Errorf("no corresponding peer connection for id %s", from)
return
}
err = cm.RTCPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer})
return
})
return
}
func (cm *ChatsManager) AddCandidate(candidate *webrtc.ICECandidateInit, from string) (err error) {
_ = atomicallyExecute(cm.candidateFlag, func() (err error) {
if candidate != nil {
if connection, ok := cm.RTCPeerConnections[from]; ok {
err = connection.AddICECandidate(*candidate)
}
}
return
})
return
}
func (cm *ChatsManager) HandleLeavingMember(id, chatId string, signalLeaving bool) (err error) {
// logger.Println("---------------- handling leaving member", id)
if err = atomicallyExecute(cm.peerConnectionFlag, func() (err error) {
if _, ok := cm.RTCPeerConnections[id]; !ok {
err = fmt.Errorf("no correponding peerconnection for id %s", id)
return
}
return
}); err != nil {
return
}
if signalLeaving {
nerr := cm.notifyLeavingMember(id, chatId, NodeID)
fmt.Println(nerr)
}
err = atomicallyExecute(cm.chatsFlag, func() (err error) {
// logger.Println(err)
// logger.Println("---------------- cleaning chat handlers", id)
if chat, ok := cm.Chats[chatId]; ok {
for _, handlersPublishers := range chat.ChatRequestScheduler.handlersPublishers {
go func(hp chan<- *ChatRequest) {
hp <- &ChatRequest{
ReqType: LEAVE_CHAT,
From: id,
Payload: map[string]interface{}{
"userId": id,
},
}
}(handlersPublishers)
}
if err = atomicallyExecute(chat.DataChannelsFlag, func() (err error) {
defer delete(chat.DataChannels, id)
if dataChannel, ok := chat.DataChannels[id]; ok {
if err = dataChannel.DataChannel.Close(); err != nil {
return
}
}
return
}); err != nil {
fmt.Println(err)
}
// logger.Println("datachannels cleaned", id)
} else {
err = fmt.Errorf("no corresponding chat for chatId %s", chatId)
}
// logger.Println(err)
err = atomicallyExecute(cm.peerConnectionFlag, func() (err error) {
if _, ok := cm.RTCPeerConnections[id]; ok {
defer delete(cm.RTCPeerConnections, id)
if err = cm.RTCPeerConnections[id].Close(); err != nil {
return
}
}
return
})
fmt.Println(err)
_ = atomicallyExecute(cm.candidateFlag, func() (err error) {
delete(cm.PendingCandidates, id)
return
})
return
})
return
}
func (cm *ChatsManager) notifyLeavingMember(userId, chatId, hostId string) (err error) {
em := NewEncryptionManager()
sig := em.SignRequestHMAC(NodeID)
body, err := json.Marshal(map[string]interface{}{
"type": DISCONNECT_USER_FROM_CHAT,
"mac": sig,
"from": NodeID,
"peerType": "node",
"payload": map[string]string{
"chatId": chatId,
"userId": userId,
},
})
if err != nil {
return
}
_, err = HTTPClient.Post("https://dev.zippytal.com/req", "application/json", bytes.NewReader(body))
if err != nil {
// logger.Println("error come from there in chat manager")
return
}
return
}
func (cm *ChatsManager) negotiate(target string, chatId string) {
_ = atomicallyExecute(cm.peerConnectionFlag, func() (err error) {
if _, ok := cm.RTCPeerConnections[target]; !ok {
err = fmt.Errorf("no peerConnections")
return
}
return
})
}