package localserver import ( "bytes" "context" "encoding/json" "fmt" "io" "os" "path/filepath" "strconv" sync "sync" "github.com/google/uuid" "github.com/pion/webrtc/v3" ) const ( LIST_ZONES_BY_HOST = "list_zones_by_host" LEAVE_ZONE = "leave_zone" ) type ZoneManager struct { ID string Zones map[string]*Zone LocalSD map[string]*webrtc.SessionDescription RTCPeerConnections map[string]*RTCPeerConnection DataChannels map[string]*DataChannel PendingCandidates map[string][]*webrtc.ICECandidate stream SignalingService_LinkClient zoneFlag *uint32 peerConnectionFlag *uint32 localSDFlag *uint32 dataChannelFlag *uint32 candidateFlag *uint32 } type Zone struct { ID string Name string ImageURL string Owner string CreationDate string AuthorizedMembers []string DataChannels map[string]*DataChannel DataChannelsFlag *uint32 ZoneRequestScheduler *ZoneRequestScheduler Initialized bool } func NewZone(hostId string, zoneId string, zoneName string, imageUrl string, owner string, creationDate string, initialized bool, authorizedMembers []string) (zone *Zone, err error) { dataChannels, dataChannelFlag := make(map[string]*DataChannel), uint32(0) zoneChatHandler, err := NewZoneChatsHandler(hostId, zoneId, zoneName, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return nil, err } zoneUsersHandler, err := NewZoneUsersHandler(zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } zoneAudioChannelsHandler, err := NewZoneAudioChannelsHandler(hostId, zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } zoneVideoChannelsHandler, err := NewZoneVideoChannelsHandler(hostId, zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } zoneNotificationsHandler, err := NewZoneNotificationsHandler(hostId, zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } zoneFileHandler, err := NewZoneFileHandler(hostId, zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } zoneScheduler, e := NewZoneRequestScheduler(authorizedMembers, zoneUsersHandler, zoneAudioChannelsHandler, zoneVideoChannelsHandler, zoneFileHandler, zoneChatHandler, zoneNotificationsHandler) go func() { for schedErr := range e { logger.Println("from scheduler :", schedErr) } }() zone = &Zone{ ID: zoneId, Name: zoneName, ImageURL: imageUrl, Owner: owner, CreationDate: creationDate, Initialized: initialized, ZoneRequestScheduler: zoneScheduler, DataChannels: dataChannels, DataChannelsFlag: &dataChannelFlag, } return } func NewZoneManager(id string, token string) (zoneManager *ZoneManager, err error) { zoneFlag := uint32(0) peerConnectionFlag := uint32(0) localSDFlag := uint32(0) dataChannelFlag := uint32(0) candidateFlag := uint32(0) dataChannels := make(map[string]*DataChannel) zoneMap := make(map[string]*Zone) zones, err := zoneManager.fetchZones(id, token) if err != nil { return } for _, zone := range zones { z, err := NewZone(id, zone.ID, zone.Name, zone.ImageURL, zone.Owner, zone.CreationDate, true, zone.AuthorizedMembers) if err != nil { return nil, err } zoneMap[zone.ID] = z } zonesFolder, err := os.ReadDir(filepath.Join(dataPath, "data", "zones")) if err != nil { if os.IsNotExist(err) { if err = os.MkdirAll(filepath.Join(dataPath, "data", "zones"), 0770); err != nil { return } } else { return nil, err } } logger.Println(zoneMap) zoneManager = &ZoneManager{ ID: id, Zones: zoneMap, LocalSD: make(map[string]*webrtc.SessionDescription), RTCPeerConnections: make(map[string]*RTCPeerConnection), DataChannels: dataChannels, PendingCandidates: make(map[string][]*webrtc.ICECandidate), zoneFlag: &zoneFlag, peerConnectionFlag: &peerConnectionFlag, localSDFlag: &localSDFlag, dataChannelFlag: &dataChannelFlag, candidateFlag: &candidateFlag, } for _, z := range zonesFolder { if _, ok := zoneMap[z.Name()]; !ok { logger.Println(zoneManager.DeleteZone(z.Name())) } } return } func (zm *ZoneManager) sendSignalingMessage(messageType, from, to string, payload map[string]interface{}) (err error) { bs, err := json.Marshal(payload) if err != nil { return } err = zm.stream.Send(&SignalingMessage{ Type: messageType, From: from, To: to, Payload: bs, }) return } func (zm *ZoneManager) DeleteZone(zoneId string) error { return os.RemoveAll(filepath.Join(dataPath, "data", "zones", zoneId)) } func (zm *ZoneManager) fetchZones(nodeId string, token string) (zones []*Zone, err error) { em := NewEncryptionManager() sig := em.SignRequestHMAC(nodeId) body, err := json.Marshal(map[string]interface{}{ "type": LIST_ZONES_BY_HOST, "mac": sig, "from": nodeId, "peerType": "node", "payload": map[string]string{ "host": nodeId, "lastIndex": "0", }, }) if err != nil { return } res, err := HTTPClient.Post("https://dev.zippytal.com/req", "application/json", bytes.NewReader(body)) if err != nil { logger.Println("error come from there inn zone manager") return } bs, err := io.ReadAll(res.Body) if err != nil { return } var payload map[string]any if err = json.Unmarshal(bs, &payload); err != nil { return } b, err := json.Marshal(payload["zones"]) if err != nil { return } err = json.Unmarshal(b, &zones) return } func (zm *ZoneManager) CreateOffer(ctx context.Context, target string, from string, zoneId string, cb OnICECandidateFunc) (err error) { peerConnection, err := zm.createPeerConnection(target, from, zoneId, webrtc.SDPTypeOffer, cb) if err != nil { return } logger.Println("connection created") rawOffer, err := peerConnection.CreateOffer(nil) if err != nil { return } if err = peerConnection.SetLocalDescription(rawOffer); err != nil { return } _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { id := uuid.New().String() logger.Println("adding for target", target) zm.RTCPeerConnections[target] = &RTCPeerConnection{ id: id, PeerConnection: peerConnection, makingOffer: true, makingOfferLock: &sync.Mutex{}, negotiate: zm.negotiate, } return }) err = zm.sendSignalingMessage(string(ZONE_OFFER), zm.ID, target, map[string]any{ "to": target, "from": zm.ID, "sdp": rawOffer.SDP, }) return } func (zm *ZoneManager) HandleOffer(ctx context.Context, from string, to string, req map[string]string, cb OnICECandidateFunc) (err error) { done, errCh := make(chan struct{}), make(chan error) go func() { if _, ok := zm.Zones[req["zoneId"]]; !ok { err = fmt.Errorf("no corresponding zone") errCh <- err return } logger.Println("handling zone offer") _ = atomicallyExecute(zm.zoneFlag, func() (err error) { return }) if _, ok := zm.RTCPeerConnections[from]; ok { if e := zm.HandleLeavingMember(from, req["zoneId"], false); e != nil { logger.Println(e) } } peerConnection, err := zm.createPeerConnection(from, to, req["zoneId"], webrtc.SDPTypeAnswer, cb) if err != nil { errCh <- err return } logger.Println("peer connection created") _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { id := uuid.New().String() zm.RTCPeerConnections[from] = &RTCPeerConnection{ PeerConnection: peerConnection, id: id, makingOffer: false, makingOfferLock: &sync.Mutex{}, negotiate: zm.negotiate, } logger.Println("peer connection added to map") offer := webrtc.SessionDescription{ Type: webrtc.SDPTypeOffer, SDP: req[SDP], } if err = peerConnection.SetRemoteDescription(offer); err != nil { errCh <- err return } rawAnswer, err := peerConnection.CreateAnswer(nil) if err != nil { errCh <- err return } _ = atomicallyExecute(zm.localSDFlag, func() (err error) { zm.LocalSD[from] = &rawAnswer return }) if err = peerConnection.SetLocalDescription(rawAnswer); err != nil { errCh <- err return } if err = zm.sendSignalingMessage(string(ZONE_ANSWER), zm.ID, from, map[string]any{ "to": from, "from": zm.ID, "sdp": rawAnswer.SDP, }); err != nil { errCh <- err return } done <- struct{}{} return }) }() select { case <-done: return case err = <-errCh: return case <-ctx.Done(): err = ctx.Err() return } } func (zm *ZoneManager) HandleAnswer(ctx context.Context, from string, to string, req map[string]string) (err error) { defer func() { if r := recover(); err != nil { logger.Printf("recover from panic in handle answer : %v\n", r) } }() if err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id : %s", from) return } peerConnnection := zm.RTCPeerConnections[from] logger.Println("---------------------") logger.Println(req[SDP]) logger.Println("---------------------") if err = peerConnnection.SetRemoteDescription(webrtc.SessionDescription{ Type: webrtc.SDPTypeAnswer, SDP: req[SDP], }); err != nil { logger.Println("error occured while setting remote description in handle answer") return } return }); err != nil { return } if err = zm.sendSignalingMessage(string(ZONE_COUNTER_OFFER), zm.ID, from, map[string]any{ "from": zm.ID, "to": from, }); err != nil { return } _ = atomicallyExecute(zm.candidateFlag, func() (err error) { for _, candidate := range zm.PendingCandidates[from] { logger.Println("sending candidate from answer to", from) if err = zm.sendSignalingMessage(string(ZONE_WEBRTC_CANDIDATE), zm.ID, from, map[string]any{ "from": zm.ID, "to": from, "candidate": candidate.ToJSON().Candidate, "sdpMid": *candidate.ToJSON().SDPMid, "sdpMLineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)), }); err != nil { logger.Println(err) continue } } delete(zm.PendingCandidates, from) return }) _ = atomicallyExecute(zm.localSDFlag, func() (err error) { delete(zm.LocalSD, from) return }) return } func (zm *ZoneManager) HandleCounterOffer(ctx context.Context, from string, to string, req map[string]string) (err error) { _ = atomicallyExecute(zm.candidateFlag, func() (err error) { for _, candidate := range zm.PendingCandidates[from] { logger.Println("sending candidate to", from) if err = zm.sendSignalingMessage(string(ZONE_WEBRTC_CANDIDATE), zm.ID, from, map[string]any{ "from": zm.ID, "to": from, "candidate": candidate.ToJSON().Candidate, "sdpMid": *candidate.ToJSON().SDPMid, "sdpMLineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)), }); err != nil { return } } delete(zm.PendingCandidates, from) return }) _ = atomicallyExecute(zm.localSDFlag, func() (err error) { delete(zm.LocalSD, from) return }) return } func (zm *ZoneManager) createPeerConnection(target string, from string, zoneId string, peerType webrtc.SDPType, cb OnICECandidateFunc) (peerConnection *webrtc.PeerConnection, err error) { defer func() { if r := recover(); err != nil { logger.Printf("recover from panic : %v\n", r) } }() config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, } peerConnection, err = webrtc.NewPeerConnection(config) if err != nil { return } logger.Println("---------------------------------------------------") if peerType == webrtc.SDPTypeOffer { channel, err := peerConnection.CreateDataChannel("data", &webrtc.DataChannelInit{}) if err != nil { return nil, err } reqChan := make(chan *ZoneRequest) channel.OnOpen(func() { logger.Println(zoneId) if _, ok := zm.Zones[zoneId]; ok { logger.Println("this zone exist") _ = atomicallyExecute(zm.Zones[zoneId].DataChannelsFlag, func() (err error) { x := uint32(0) zm.Zones[zoneId].DataChannels[target] = &DataChannel{DataChannel: channel, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &x} return }) if _, ok := zm.Zones[zoneId]; !ok { err = fmt.Errorf("no corresponding zones") return } done, err := zm.Zones[zoneId].ZoneRequestScheduler.Schedule(reqChan) bs, jsonErr := json.Marshal(&ZoneResponse{ Type: "user_zone_init", From: zoneId, To: target, Payload: map[string]interface{}{}, }) if jsonErr != nil { logger.Println("error in open channel", jsonErr) return } if sendErr := channel.SendText(string(bs)); sendErr != nil { logger.Println("error in open channel send", sendErr) return } go func() { for { select { case <-done: return case e := <-err: logger.Println("----- error from scheduler:", e) } } }() } }) channel.OnClose(func() { close(reqChan) //_ = zm.HandleLeavingMember(target, zoneId, true) }) channel.OnError(func(err error) { close(reqChan) //_ = zm.HandleLeavingMember(target, zoneId, true) }) channel.OnMessage(func(msg webrtc.DataChannelMessage) { var req ZoneRequest if err := json.Unmarshal(msg.Data, &req); err != nil { logger.Println(err) return } logger.Println("incoming request", req) reqChan <- &req }) logger.Println("new channel for target : ", target) logger.Println(target) _ = atomicallyExecute(zm.dataChannelFlag, func() (err error) { l := uint32(0) zm.DataChannels[target] = &DataChannel{ DataChannel: channel, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &l, } return }) } else { peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) { _ = atomicallyExecute(zm.dataChannelFlag, func() (err error) { l := uint32(0) zm.DataChannels[target] = &DataChannel{ DataChannel: dc, l: &l, } return }) reqChan := make(chan *ZoneRequest, 100) if dc.Label() == "data" { dc.OnOpen(func() { logger.Println(zoneId) if _, ok := zm.Zones[zoneId]; ok { logger.Println("this zone exist") _ = atomicallyExecute(zm.Zones[zoneId].DataChannelsFlag, func() (err error) { logger.Println("adding dc to dc map") x := uint32(0) zm.Zones[zoneId].DataChannels[target] = &DataChannel{DataChannel: dc, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &x} return }) if _, ok := zm.Zones[zoneId]; !ok { err = fmt.Errorf("no corresponding zones") return } done, err := zm.Zones[zoneId].ZoneRequestScheduler.Schedule(reqChan) go func() { for { select { case <-done: return case <-err: } } }() } }) dc.OnClose(func() { fmt.Println("closing gracefully event dc...") close(reqChan) }) dc.OnError(func(err error) { logger.Println("--------------- error in dc:", err) close(reqChan) }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { var req ZoneRequest if err := json.Unmarshal(msg.Data, &req); err != nil { logger.Println(err) return } logger.Println("incoming request", req) go func() { reqChan <- &req }() }) _ = atomicallyExecute(zm.dataChannelFlag, func() (err error) { l := uint32(0) zm.DataChannels[target] = &DataChannel{ DataChannel: dc, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &l, } return }) } else { if _, ok := zm.Zones[zoneId]; ok { fmt.Println("got new mtfking datachannel") fmt.Println(dc.Label()) scheduler := zm.Zones[zoneId].ZoneRequestScheduler l := uint32(0) datachannel := &DataChannel{ DataChannel: dc, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &l, } catched := scheduler.DispatchDatachannel(context.Background(), datachannel) if !catched { if closeErr := datachannel.DataChannel.Close(); closeErr != nil { logger.Println(closeErr) } } } } }) } peerConnection.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { if pcs == webrtc.PeerConnectionStateDisconnected || pcs == webrtc.PeerConnectionStateFailed { logger.Println(pcs) if err = zm.HandleLeavingMember(target, zoneId, true); err != nil { logger.Println(err) } } }) peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) { logger.Printf("ICE connection state has changed %s\n", is.String()) }) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { return } _ = atomicallyExecute(zm.candidateFlag, func() (err error) { desc := peerConnection.RemoteDescription() if desc == nil { logger.Println("generated candidate appended to list : ", i) zm.PendingCandidates[target] = append(zm.PendingCandidates[target], i) } else { logger.Println("generated candidate : ", i) if iceCandidateErr := cb(target, i); iceCandidateErr != nil { logger.Println(iceCandidateErr) } } return }) }) return } func (zm *ZoneManager) HandleRennegotiationOffer(from, sdp string) (err error) { err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id %s", from) return } if err = zm.RTCPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeOffer}); err != nil { return } localSd, err := zm.RTCPeerConnections[from].CreateAnswer(nil) if err != nil { return } if err = zm.RTCPeerConnections[from].SetLocalDescription(localSd); err != nil { return } if err = zm.sendSignalingMessage(string(ZONE_WEBRTC_RENNEGOTIATION_ANSWER), zm.ID, from, map[string]any{ "to": from, "sdp": localSd.SDP, }); err != nil { logger.Println(err) return } return }) return } func (zm *ZoneManager) HandleRennegotiationAnswer(from string, sdp string) (err error) { _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id %s", from) return } err = zm.RTCPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer}) return }) return } func (zm *ZoneManager) AddCandidate(candidate *webrtc.ICECandidateInit, from string) (err error) { _ = atomicallyExecute(zm.candidateFlag, func() (err error) { if candidate != nil { if connection, ok := zm.RTCPeerConnections[from]; ok { err = connection.AddICECandidate(*candidate) } } return }) return } func (zm *ZoneManager) HandleLeavingMember(id, zoneId string, signalLeaving bool) (err error) { logger.Println("---------------- handling leaving member", id) if err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[id]; !ok { err = fmt.Errorf("no correponding peerconnection for id %s", id) return } return }); err != nil { return } if signalLeaving { nerr := zm.notifyLeavingMember(id, zoneId, NodeID) fmt.Println(nerr) } err = atomicallyExecute(zm.zoneFlag, func() (err error) { logger.Println(err) logger.Println("---------------- cleaning zone handlers", id) if zone, ok := zm.Zones[zoneId]; ok { for _, handlersPublishers := range zone.ZoneRequestScheduler.handlersPublishers { go func(hp chan<- *ZoneRequest) { hp <- &ZoneRequest{ ReqType: LEAVE_ZONE, From: id, Payload: map[string]interface{}{ "userId": id, }, } }(handlersPublishers) } if err = atomicallyExecute(zone.DataChannelsFlag, func() (err error) { defer delete(zone.DataChannels, id) if dataChannel, ok := zone.DataChannels[id]; ok { if err = dataChannel.DataChannel.Close(); err != nil { return } } return }); err != nil { fmt.Println(err) } logger.Println("datachannels cleaned", id) } else { err = fmt.Errorf("no corresponding zone for zoneId %s", zoneId) } logger.Println(err) err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[id]; ok { defer delete(zm.RTCPeerConnections, id) if err = zm.RTCPeerConnections[id].Close(); err != nil { return } } return }) fmt.Println(err) _ = atomicallyExecute(zm.candidateFlag, func() (err error) { delete(zm.PendingCandidates, id) return }) return }) return } func (zm *ZoneManager) notifyLeavingMember(userId, zoneId, hostId string) (err error) { em := NewEncryptionManager() sig := em.SignRequestHMAC(NodeID) body, err := json.Marshal(map[string]interface{}{ "type": DISCONNECT_ZONE_MEMBER, "mac": sig, "from": NodeID, "peerType": "node", "payload": map[string]string{ "zoneId": zoneId, "userId": userId, }, }) if err != nil { return } _, err = HTTPClient.Post("https://dev.zippytal.com/req", "application/json", bytes.NewReader(body)) if err != nil { logger.Println("error come from there in zone manager") return } return } func (zm *ZoneManager) negotiate(target string, zoneId string) { _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[target]; !ok { err = fmt.Errorf("no peerConnections") return } return }) }