package localserver import ( "bytes" "context" "encoding/json" "fmt" "io" "os" "path/filepath" "strconv" sync "sync" "github.com/google/uuid" "github.com/pion/webrtc/v3" ) type SquadManager struct { ID string Squads map[string]*Squad LocalSD map[string]*webrtc.SessionDescription RTCPeerConnections map[string]*RTCPeerConnection DataChannels map[string]*DataChannel PendingCandidates map[string][]*webrtc.ICECandidate stream SignalingService_LinkClient squadFlag *uint32 peerConnectionFlag *uint32 localSDFlag *uint32 dataChannelFlag *uint32 candidateFlag *uint32 } type Squad struct { ID string Name string ImageURL string Owner string CreationDate string AuthorizedMembers []string DataChannels map[string]*DataChannel DataChannelsFlag *uint32 SquadRequestScheduler *SquadRequestScheduler Initialized bool } func NewSquad(hostId string, squadId string, squadName string, imageUrl string, owner string, creationDate string, initialized bool, authorizedMembers []string) (squad *Squad, err error) { dataChannels, dataChannelFlag := make(map[string]*DataChannel), uint32(0) squadChatHandler, err := NewSquadChatsHandler(hostId, squadId, squadName, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return nil, err } squadVideoChannelsHandler, err := NewSquadVideoChannelsHandler(hostId, squadId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } squadNotificationsHandler, err := NewSquadNotificationsHandler(hostId, squadId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } squadScheduler, e := NewSquadRequestScheduler(authorizedMembers, squadVideoChannelsHandler, squadChatHandler, squadNotificationsHandler) go func() { for range e { // logger.Println("from scheduler :", schedErr) } }() squad = &Squad{ ID: squadId, Name: squadName, ImageURL: imageUrl, Owner: owner, CreationDate: creationDate, Initialized: initialized, SquadRequestScheduler: squadScheduler, DataChannels: dataChannels, DataChannelsFlag: &dataChannelFlag, } fmt.Println("squad fking creeaeted", squad.Name) return } func NewSquadManager(id string, token string) (squadManager *SquadManager, err error) { squadFlag := uint32(0) peerConnectionFlag := uint32(0) localSDFlag := uint32(0) dataChannelFlag := uint32(0) candidateFlag := uint32(0) dataChannels := make(map[string]*DataChannel) squadMap := make(map[string]*Squad) squads, err := squadManager.fetchSquads(id, token) if err != nil { return } for _, squad := range squads { z, err := NewSquad(id, squad.ID, squad.Name, squad.ImageURL, squad.Owner, squad.CreationDate, true, squad.AuthorizedMembers) if err != nil { return nil, err } squadMap[squad.ID] = z } squadsFolder, err := os.ReadDir(filepath.Join(dataPath, "data", "squads")) if err != nil { if os.IsNotExist(err) { if err = os.MkdirAll(filepath.Join(dataPath, "data", "squads"), 0770); err != nil { return } } else { return nil, err } } // logger.Println(squadMap) squadManager = &SquadManager{ ID: id, Squads: squadMap, LocalSD: make(map[string]*webrtc.SessionDescription), RTCPeerConnections: make(map[string]*RTCPeerConnection), DataChannels: dataChannels, PendingCandidates: make(map[string][]*webrtc.ICECandidate), squadFlag: &squadFlag, peerConnectionFlag: &peerConnectionFlag, localSDFlag: &localSDFlag, dataChannelFlag: &dataChannelFlag, candidateFlag: &candidateFlag, } for _, z := range squadsFolder { if _, ok := squadMap[z.Name()]; !ok { // logger.Println(squadManager.DeleteSquad(z.Name())) } } return } func (zm *SquadManager) sendSignalingMessage(messageType, from, to string, payload map[string]interface{}) (err error) { bs, err := json.Marshal(payload) if err != nil { return } err = zm.stream.Send(&SignalingMessage{ Type: messageType, From: from, To: to, Payload: bs, }) return } func (zm *SquadManager) DeleteSquad(squadId string) error { return os.RemoveAll(filepath.Join(dataPath, "data", "squads", squadId)) } func (zm *SquadManager) fetchSquads(nodeId string, token string) (squads []*Squad, err error) { em := NewEncryptionManager() sig := em.SignRequestHMAC(nodeId) body, err := json.Marshal(map[string]interface{}{ "type": LIST_HOSTED_SQUADS_BY_HOST, "mac": sig, "from": nodeId, "peerType": "node", "payload": map[string]string{ "host": nodeId, "lastIndex": "0", }, }) if err != nil { return } res, err := HTTPClient.Post("https://dev.zippytal.com/req", "application/json", bytes.NewReader(body)) if err != nil { // logger.Println("error come from there inn squad manager") return } bs, err := io.ReadAll(res.Body) if err != nil { return } var payload map[string]any if err = json.Unmarshal(bs, &payload); err != nil { return } b, err := json.Marshal(payload["squads"]) if err != nil { return } err = json.Unmarshal(b, &squads) return } func (zm *SquadManager) CreateOffer(ctx context.Context, target string, from string, squadId string, cb OnICECandidateFunc) (err error) { peerConnection, err := zm.createPeerConnection(target, from, squadId, webrtc.SDPTypeOffer, cb) if err != nil { return } // logger.Println("connection created") rawOffer, err := peerConnection.CreateOffer(nil) if err != nil { return } if err = peerConnection.SetLocalDescription(rawOffer); err != nil { return } _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { id := uuid.New().String() // logger.Println("adding for target", target) zm.RTCPeerConnections[target] = &RTCPeerConnection{ id: id, PeerConnection: peerConnection, makingOffer: true, makingOfferLock: &sync.Mutex{}, negotiate: zm.negotiate, } return }) err = zm.sendSignalingMessage(string(HOSTED_SQUAD_OFFER), zm.ID, target, map[string]any{ "to": target, "from": zm.ID, "sdp": rawOffer.SDP, }) return } func (zm *SquadManager) HandleOffer(ctx context.Context, from string, to string, req map[string]string, cb OnICECandidateFunc) (err error) { done, errCh := make(chan struct{}), make(chan error) go func() { if _, ok := zm.Squads[req["squadId"]]; !ok { err = fmt.Errorf("no corresponding squad") errCh <- err return } // logger.Println("handling squad offer") _ = atomicallyExecute(zm.squadFlag, func() (err error) { return }) if _, ok := zm.RTCPeerConnections[from]; ok { if e := zm.HandleLeavingMember(from, req["squadId"], false); e != nil { // logger.Println(e) } } peerConnection, err := zm.createPeerConnection(from, to, req["squadId"], webrtc.SDPTypeAnswer, cb) if err != nil { errCh <- err return } // logger.Println("peer connection created") _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { id := uuid.New().String() zm.RTCPeerConnections[from] = &RTCPeerConnection{ PeerConnection: peerConnection, id: id, makingOffer: false, makingOfferLock: &sync.Mutex{}, negotiate: zm.negotiate, } // logger.Println("peer connection added to map") offer := webrtc.SessionDescription{ Type: webrtc.SDPTypeOffer, SDP: req[SDP], } if err = peerConnection.SetRemoteDescription(offer); err != nil { errCh <- err return } rawAnswer, err := peerConnection.CreateAnswer(nil) if err != nil { errCh <- err return } _ = atomicallyExecute(zm.localSDFlag, func() (err error) { zm.LocalSD[from] = &rawAnswer return }) if err = peerConnection.SetLocalDescription(rawAnswer); err != nil { errCh <- err return } if err = zm.sendSignalingMessage(string(HOSTED_SQUAD_ANSWER), zm.ID, from, map[string]any{ "to": from, "from": zm.ID, "sdp": rawAnswer.SDP, }); err != nil { errCh <- err return } done <- struct{}{} return }) }() select { case <-done: return case err = <-errCh: return case <-ctx.Done(): err = ctx.Err() return } } func (zm *SquadManager) HandleAnswer(ctx context.Context, from string, to string, req map[string]string) (err error) { defer func() { if r := recover(); err != nil { logger.Printf("recover from panic in handle answer : %v\n", r) } }() if err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id : %s", from) return } peerConnnection := zm.RTCPeerConnections[from] // logger.Println("---------------------") // logger.Println(req[SDP]) // logger.Println("---------------------") if err = peerConnnection.SetRemoteDescription(webrtc.SessionDescription{ Type: webrtc.SDPTypeAnswer, SDP: req[SDP], }); err != nil { // logger.Println("error occured while setting remote description in handle answer") return } return }); err != nil { return } if err = zm.sendSignalingMessage(string(HOSTED_SQUAD_COUNTER_OFFER), zm.ID, from, map[string]any{ "from": zm.ID, "to": from, }); err != nil { return } _ = atomicallyExecute(zm.candidateFlag, func() (err error) { for _, candidate := range zm.PendingCandidates[from] { // logger.Println("sending candidate from answer to", from) if err = zm.sendSignalingMessage(string(HOSTED_SQUAD_WEBRTC_CANDIDATE), zm.ID, from, map[string]any{ "from": zm.ID, "to": from, "candidate": candidate.ToJSON().Candidate, "sdpMid": *candidate.ToJSON().SDPMid, "sdpMLineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)), }); err != nil { // logger.Println(err) continue } } delete(zm.PendingCandidates, from) return }) _ = atomicallyExecute(zm.localSDFlag, func() (err error) { delete(zm.LocalSD, from) return }) return } func (zm *SquadManager) HandleCounterOffer(ctx context.Context, from string, to string, req map[string]string) (err error) { _ = atomicallyExecute(zm.candidateFlag, func() (err error) { for _, candidate := range zm.PendingCandidates[from] { // logger.Println("sending candidate to", from) if err = zm.sendSignalingMessage(string(HOSTED_SQUAD_WEBRTC_CANDIDATE), zm.ID, from, map[string]any{ "from": zm.ID, "to": from, "candidate": candidate.ToJSON().Candidate, "sdpMid": *candidate.ToJSON().SDPMid, "sdpMLineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)), }); err != nil { return } } delete(zm.PendingCandidates, from) return }) _ = atomicallyExecute(zm.localSDFlag, func() (err error) { delete(zm.LocalSD, from) return }) return } func (zm *SquadManager) createPeerConnection(target string, from string, squadId string, peerType webrtc.SDPType, cb OnICECandidateFunc) (peerConnection *webrtc.PeerConnection, err error) { defer func() { if r := recover(); err != nil { logger.Printf("recover from panic : %v\n", r) } }() config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, } peerConnection, err = webrtc.NewPeerConnection(config) if err != nil { return } // logger.Println("---------------------------------------------------") if peerType == webrtc.SDPTypeOffer { channel, err := peerConnection.CreateDataChannel("data", &webrtc.DataChannelInit{}) if err != nil { return nil, err } reqChan := make(chan *SquadRequest) channel.OnOpen(func() { // logger.Println(squadId) if _, ok := zm.Squads[squadId]; ok { // logger.Println("this squad exist") _ = atomicallyExecute(zm.Squads[squadId].DataChannelsFlag, func() (err error) { x := uint32(0) zm.Squads[squadId].DataChannels[target] = &DataChannel{DataChannel: channel, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &x} return }) if _, ok := zm.Squads[squadId]; !ok { err = fmt.Errorf("no corresponding squads") return } done, err := zm.Squads[squadId].SquadRequestScheduler.Schedule(reqChan) bs, jsonErr := json.Marshal(&SquadResponse{ Type: "user_squad_init", From: squadId, To: target, Payload: map[string]interface{}{}, }) if jsonErr != nil { // logger.Println("error in open channel", jsonErr) return } if sendErr := channel.SendText(string(bs)); sendErr != nil { // logger.Println("error in open channel send", sendErr) return } go func() { for { select { case <-done: return case <-err: // logger.Println("----- error from scheduler:", e) } } }() } }) channel.OnClose(func() { if _, closed := <-reqChan; !closed { close(reqChan) } //_ = zm.HandleLeavingMember(target, squadId, true) }) channel.OnError(func(err error) { if _, closed := <-reqChan; !closed { close(reqChan) } //_ = zm.HandleLeavingMember(target, squadId, true) }) channel.OnMessage(func(msg webrtc.DataChannelMessage) { var req SquadRequest if err := json.Unmarshal(msg.Data, &req); err != nil { // logger.Println(err) return } // logger.Println("incoming request", req) reqChan <- &req }) // logger.Println("new channel for target : ", target) // logger.Println(target) _ = atomicallyExecute(zm.dataChannelFlag, func() (err error) { l := uint32(0) zm.DataChannels[target] = &DataChannel{ DataChannel: channel, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &l, } return }) } else { peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) { _ = atomicallyExecute(zm.dataChannelFlag, func() (err error) { l := uint32(0) zm.DataChannels[target] = &DataChannel{ DataChannel: dc, l: &l, } return }) reqChan := make(chan *SquadRequest, 100) if dc.Label() == "data" { dc.OnOpen(func() { // logger.Println(squadId) if _, ok := zm.Squads[squadId]; ok { // logger.Println("this squad exist") _ = atomicallyExecute(zm.Squads[squadId].DataChannelsFlag, func() (err error) { // logger.Println("adding dc to dc map") x := uint32(0) zm.Squads[squadId].DataChannels[target] = &DataChannel{DataChannel: dc, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &x} return }) if _, ok := zm.Squads[squadId]; !ok { err = fmt.Errorf("no corresponding squads") return } done, err := zm.Squads[squadId].SquadRequestScheduler.Schedule(reqChan) go func() { for { select { case <-done: return case <-err: } } }() } }) dc.OnClose(func() { fmt.Println("closing gracefully event dc...") if _, closed := <-reqChan; !closed { close(reqChan) } }) dc.OnError(func(err error) { // logger.Println("--------------- error in dc:", err) if _, closed := <-reqChan; !closed { close(reqChan) } }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { var req SquadRequest if err := json.Unmarshal(msg.Data, &req); err != nil { // logger.Println(err) return } // logger.Println("incoming request", req) go func() { reqChan <- &req }() }) _ = atomicallyExecute(zm.dataChannelFlag, func() (err error) { l := uint32(0) zm.DataChannels[target] = &DataChannel{ DataChannel: dc, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &l, } return }) } else { if _, ok := zm.Squads[squadId]; ok { fmt.Println("got new mtfking datachannel") fmt.Println(dc.Label()) scheduler := zm.Squads[squadId].SquadRequestScheduler l := uint32(0) datachannel := &DataChannel{ DataChannel: dc, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &l, } catched := scheduler.DispatchDatachannel(context.Background(), datachannel) if !catched { if closeErr := datachannel.DataChannel.Close(); closeErr != nil { // logger.Println(closeErr) } } } } }) } peerConnection.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { if pcs == webrtc.PeerConnectionStateDisconnected || pcs == webrtc.PeerConnectionStateFailed { // logger.Println(pcs) if err = zm.HandleLeavingMember(target, squadId, true); err != nil { // logger.Println(err) } } }) peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) { // logger.Printf("ICE connection state has changed %s\n", is.String()) }) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { return } _ = atomicallyExecute(zm.candidateFlag, func() (err error) { desc := peerConnection.RemoteDescription() if desc == nil { // logger.Println("generated candidate appended to list : ", i) zm.PendingCandidates[target] = append(zm.PendingCandidates[target], i) } else { // logger.Println("generated candidate : ", i) if iceCandidateErr := cb(target, i); iceCandidateErr != nil { // logger.Println(iceCandidateErr) } } return }) }) return } func (zm *SquadManager) HandleRennegotiationOffer(from, sdp string) (err error) { err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id %s", from) return } if err = zm.RTCPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeOffer}); err != nil { return } localSd, err := zm.RTCPeerConnections[from].CreateAnswer(nil) if err != nil { return } if err = zm.RTCPeerConnections[from].SetLocalDescription(localSd); err != nil { return } if err = zm.sendSignalingMessage(string(HOSTED_SQUAD_WEBRTC_RENNEGOTIATION_ANSWER), zm.ID, from, map[string]any{ "to": from, "sdp": localSd.SDP, }); err != nil { // logger.Println(err) return } return }) return } func (zm *SquadManager) HandleRennegotiationAnswer(from string, sdp string) (err error) { _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id %s", from) return } err = zm.RTCPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer}) return }) return } func (zm *SquadManager) AddCandidate(candidate *webrtc.ICECandidateInit, from string) (err error) { _ = atomicallyExecute(zm.candidateFlag, func() (err error) { if candidate != nil { if connection, ok := zm.RTCPeerConnections[from]; ok { err = connection.AddICECandidate(*candidate) } } return }) return } func (zm *SquadManager) HandleLeavingMember(id, squadId string, signalLeaving bool) (err error) { fmt.Println("uwuwuwuuwuwuwuwuuwuwuuwuwuwuwuuwwuwu") fmt.Println("handling leaving member") // logger.Println("---------------- handling leaving member", id) if err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[id]; !ok { err = fmt.Errorf("no correponding peerconnection for id %s", id) return } return }); err != nil { return } if signalLeaving { nerr := zm.notifyLeavingMember(id, squadId, NodeID) fmt.Println(nerr) } err = atomicallyExecute(zm.squadFlag, func() (err error) { // logger.Println(err) // logger.Println("---------------- cleaning squad handlers", id) if squad, ok := zm.Squads[squadId]; ok { for _, handlersPublishers := range squad.SquadRequestScheduler.handlersPublishers { go func(hp chan<- *SquadRequest) { hp <- &SquadRequest{ ReqType: LEAVE_SQUAD, From: id, Payload: map[string]interface{}{ "userId": id, }, } }(handlersPublishers) } if err = atomicallyExecute(squad.DataChannelsFlag, func() (err error) { defer delete(squad.DataChannels, id) if dataChannel, ok := squad.DataChannels[id]; ok { if err = dataChannel.DataChannel.Close(); err != nil { return } } return }); err != nil { fmt.Println(err) } // logger.Println("datachannels cleaned", id) } else { err = fmt.Errorf("no corresponding squad for squadId %s", squadId) } // logger.Println(err) err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[id]; ok { defer delete(zm.RTCPeerConnections, id) if err = zm.RTCPeerConnections[id].Close(); err != nil { return } } return }) fmt.Println(err) _ = atomicallyExecute(zm.candidateFlag, func() (err error) { delete(zm.PendingCandidates, id) return }) return }) return } func (zm *SquadManager) notifyLeavingMember(userId, squadId, hostId string) (err error) { em := NewEncryptionManager() sig := em.SignRequestHMAC(NodeID) body, err := json.Marshal(map[string]interface{}{ "type": DISCONNECT_SQUAD_MEMBER, "mac": sig, "from": NodeID, "peerType": "node", "payload": map[string]string{ "squadId": squadId, "userId": userId, }, }) if err != nil { return } _, err = HTTPClient.Post("https://dev.zippytal.com/req", "application/json", bytes.NewReader(body)) if err != nil { // logger.Println("error come from there in squad manager") return } return } func (zm *SquadManager) negotiate(target string, squadId string) { _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[target]; !ok { err = fmt.Errorf("no peerConnections") return } return }) }