package localserver import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "strconv" sync "sync" "sync/atomic" "github.com/pion/webrtc/v3" ) const ( LIST_ZONES_BY_HOST = "list_zones_by_host" LEAVE_ZONE = "leave_zone" ) type ZoneManager struct { ID string Zones map[string]*Zone LocalSD map[string]*webrtc.SessionDescription RTCPeerConnections map[string]*RTCPeerConnection DataChannels map[string]*DataChannel PendingCandidates map[string][]*webrtc.ICECandidate stream GrpcManager_LinkClient zoneFlag *uint32 peerConnectionFlag *uint32 localSDFlag *uint32 dataChannelFlag *uint32 candidateFlag *uint32 } type Zone struct { ID string Name string ImageURL string Owner string CreationDate string AuthorizedMembers []string DataChannels map[string]*DataChannel DataChannelsFlag *uint32 ZoneRequestScheduler *ZoneRequestScheduler Initialized bool } func NewZone(hostId string, zoneId string, zoneName string, imageUrl string, owner string, creationDate string, initialized bool, authorizedMembers []string) (zone *Zone, err error) { dataChannels, dataChannelFlag := make(map[string]*DataChannel), uint32(0) zoneChatHandler, err := NewZoneChatsHandler(zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return nil, err } zoneUsersHandler, err := NewZoneUsersHandler(zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } zoneAudioChannelsHandler, err := NewZoneAudioChannelsHandler(hostId, zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } zoneVideoChannelsHandler, err := NewZoneVideoChannelsHandler(hostId, zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } zoneFileHandler, err := NewZoneFileHandler(hostId, zoneId, owner, authorizedMembers, dataChannels, &dataChannelFlag) if err != nil { return } zoneScheduler, e := NewZoneRequestScheduler(authorizedMembers, zoneUsersHandler, zoneChatHandler, zoneAudioChannelsHandler, zoneVideoChannelsHandler, zoneFileHandler) go func() { for schedErr := range e { logger.Println("from scheduler :", schedErr) } }() zone = &Zone{ ID: zoneId, Name: zoneName, ImageURL: imageUrl, Owner: owner, CreationDate: creationDate, Initialized: initialized, ZoneRequestScheduler: zoneScheduler, DataChannels: dataChannels, DataChannelsFlag: &dataChannelFlag, } return } func NewZoneManager(id string, token string) (zoneManager *ZoneManager, err error) { zoneFlag := uint32(0) peerConnectionFlag := uint32(0) localSDFlag := uint32(0) dataChannelFlag := uint32(0) candidateFlag := uint32(0) dataChannels := make(map[string]*DataChannel) zoneMap := make(map[string]*Zone) zones, err := zoneManager.fetchZones(id, token) if err != nil { return } for _, zone := range zones { z, err := NewZone(id, zone.ID, zone.Name, zone.ImageURL, zone.Owner, zone.CreationDate, true, zone.AuthorizedMembers) if err != nil { return nil, err } zoneMap[zone.ID] = z } logger.Println(zoneMap) zoneManager = &ZoneManager{ ID: id, Zones: zoneMap, LocalSD: make(map[string]*webrtc.SessionDescription), RTCPeerConnections: make(map[string]*RTCPeerConnection), DataChannels: dataChannels, PendingCandidates: make(map[string][]*webrtc.ICECandidate), zoneFlag: &zoneFlag, peerConnectionFlag: &peerConnectionFlag, localSDFlag: &localSDFlag, dataChannelFlag: &dataChannelFlag, candidateFlag: &candidateFlag, } return } func atomicallyExecute(flag *uint32, job func() (err error)) (err error) { for { if atomic.CompareAndSwapUint32(flag, 0, 1) { defer atomic.SwapUint32(flag, 0) err = job() break } } return } func (zm *ZoneManager) DeleteZone(zoneId string) {} func (zm *ZoneManager) fetchZones(nodeId string, token string) (zones []*Zone, err error) { body, err := json.Marshal(map[string]interface{}{ "type": LIST_ZONES_BY_HOST, "token": token, "from": nodeId, "payload": map[string]string{ "host": nodeId, "lastIndex": "0", }, }) if err != nil { return } res, err := http.Post("https://app.zippytal.com/req", "application/json", bytes.NewBuffer(body)) if err != nil { logger.Println("error come from there inn zone manager") return } bs, err := io.ReadAll(res.Body) if err != nil { return } err = json.Unmarshal(bs, &zones) return } func (zm *ZoneManager) CreateOffer(ctx context.Context, target string, from string, zoneId string, cb OnICECandidateFunc) (err error) { peerConnection, err := zm.createPeerConnection(target, from, zoneId, webrtc.SDPTypeOffer, cb) if err != nil { return } logger.Println("connection created") rawOffer, err := peerConnection.CreateOffer(nil) if err != nil { return } if err = peerConnection.SetLocalDescription(rawOffer); err != nil { return } _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { logger.Println("adding for target", target) zm.RTCPeerConnections[target] = &RTCPeerConnection{ PeerConnection: peerConnection, makingOffer: true, makingOfferLock: &sync.Mutex{}, negotiate: zm.negotiate, } return }) err = zm.stream.Send(&Request{ Type: string(ZONE_OFFER), From: zm.ID, Token: "none", Payload: map[string]string{ "to": target, "from": zm.ID, "sdp": rawOffer.SDP, }, }) return } func (zm *ZoneManager) HandleOffer(ctx context.Context, req map[string]string, cb OnICECandidateFunc) (err error) { done, errCh := make(chan struct{}), make(chan error) go func() { if _, ok := zm.Zones[req["zoneId"]]; !ok { err = fmt.Errorf("no corresponding zone") errCh <- err return } logger.Println("handling zone offer") peerConnection, err := zm.createPeerConnection(req[FROM], req[TO], req["zoneId"], webrtc.SDPTypeAnswer, cb) if err != nil { errCh <- err return } logger.Println("peer connection created") _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { zm.RTCPeerConnections[req[FROM]] = &RTCPeerConnection{ PeerConnection: peerConnection, makingOffer: false, makingOfferLock: &sync.Mutex{}, negotiate: zm.negotiate, } return }) offer := webrtc.SessionDescription{ Type: webrtc.SDPTypeOffer, SDP: req[SDP], } if err = peerConnection.SetRemoteDescription(offer); err != nil { errCh <- err return } rawAnswer, err := peerConnection.CreateAnswer(nil) if err != nil { errCh <- err return } _ = atomicallyExecute(zm.localSDFlag, func() (err error) { zm.LocalSD[req[FROM]] = &rawAnswer return }) _ = atomicallyExecute(zm.zoneFlag, func() (err error) { //zm.Zones[req[SQUAD_ID]].Members = append(zm.Squads[req[SQUAD_ID]].Members, req[FROM]) return }) if err = zm.stream.Send(&Request{ Type: string(ZONE_ANSWER), From: zm.ID, Token: "none", Payload: map[string]string{ "to": req[FROM], "from": zm.ID, "sdp": rawAnswer.SDP, }, }); err != nil { errCh <- err return } done <- struct{}{} }() select { case <-done: return case err = <-errCh: return case <-ctx.Done(): err = ctx.Err() return } } func (zm *ZoneManager) HandleAnswer(ctx context.Context, req map[string]string) (err error) { defer func() { if r := recover(); err != nil { logger.Printf("recover from panic in handle answer : %v\n", r) } }() if err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[req[FROM]]; !ok { err = fmt.Errorf("no corresponding peer connection for id : %s", req[FROM]) return } peerConnnection := zm.RTCPeerConnections[req[FROM]] logger.Println("---------------------") logger.Println(req[SDP]) logger.Println("---------------------") if err = peerConnnection.SetRemoteDescription(webrtc.SessionDescription{ Type: webrtc.SDPTypeAnswer, SDP: req[SDP], }); err != nil { logger.Println("error occured while setting remote description in handle answer") return } return }); err != nil { return } if err = zm.stream.Send(&Request{ Type: string(ZONE_COUNTER_OFFER), From: zm.ID, Token: "none", Payload: map[string]string{ "from": zm.ID, "to": req[FROM], }, }); err != nil { return } _ = atomicallyExecute(zm.candidateFlag, func() (err error) { for _, candidate := range zm.PendingCandidates[req[FROM]] { logger.Println("sending candidate from answer to", req[FROM]) if err = zm.stream.Send(&Request{ Type: string(ZONE_WEBRTC_CANDIDATE), From: zm.ID, Token: "none", Payload: map[string]string{ "from": zm.ID, "to": req[FROM], "candidate": candidate.ToJSON().Candidate, "sdpMid": *candidate.ToJSON().SDPMid, "sdpMlineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)), }, }); err != nil { logger.Println(err) continue } } delete(zm.PendingCandidates, req[FROM]) return }) _ = atomicallyExecute(zm.localSDFlag, func() (err error) { delete(zm.LocalSD, req[FROM]) return }) return } func (zm *ZoneManager) HandleCounterOffer(ctx context.Context, req map[string]string) (err error) { logger.Println("handling counter offer 1") if err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { logger.Println("start job") if _, ok := zm.RTCPeerConnections[req[FROM]]; !ok { logger.Println("error here") err = fmt.Errorf("no field corresponding peer connection for id %s", req[FROM]) return } logger.Println("handling counter offer") connection := zm.RTCPeerConnections[req[FROM]] err = atomicallyExecute(zm.localSDFlag, func() (err error) { if err = connection.SetLocalDescription(*zm.LocalSD[req[FROM]]); err != nil { logger.Println(err) return } return }) return }); err != nil { return } logger.Println("handling counter offer 2") _ = atomicallyExecute(zm.candidateFlag, func() (err error) { for _, candidate := range zm.PendingCandidates[req[FROM]] { logger.Println("sending candidate to", req[FROM]) if err = zm.stream.Send(&Request{ Type: string(ZONE_WEBRTC_CANDIDATE), From: zm.ID, Token: "none", Payload: map[string]string{ "from": zm.ID, "to": req[FROM], "candidate": candidate.ToJSON().Candidate, "sdpMid": *candidate.ToJSON().SDPMid, "sdpMlineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)), }, }); err != nil { return } } delete(zm.PendingCandidates, req[FROM]) return }) _ = atomicallyExecute(zm.localSDFlag, func() (err error) { delete(zm.LocalSD, req[FROM]) return }) return } func (zm *ZoneManager) createPeerConnection(target string, from string, zoneId string, peerType webrtc.SDPType, cb OnICECandidateFunc) (peerConnection *webrtc.PeerConnection, err error) { defer func() { if r := recover(); err != nil { logger.Printf("recover from panic : %v\n", r) } }() config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302", "stun:stunserver.org:3478"}, }, }, SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, } peerConnection, err = webrtc.NewPeerConnection(config) if err != nil { return } logger.Println("---------------------------------------------------") if peerType == webrtc.SDPTypeAnswer { maxRetransmits := uint16(100) channel, err := peerConnection.CreateDataChannel("data", &webrtc.DataChannelInit{ MaxRetransmits: &maxRetransmits, }) if err != nil { return nil, err } reqChan := make(chan *ZoneRequest) channel.OnOpen(func() { logger.Println(zoneId) if _, ok := zm.Zones[zoneId]; ok { logger.Println("this zone exist") _ = atomicallyExecute(zm.Zones[zoneId].DataChannelsFlag, func() (err error) { x := int32(0) zm.Zones[zoneId].DataChannels[target] = &DataChannel{DataChannel: channel, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &x} return }) if _, ok := zm.Zones[zoneId]; !ok { err = fmt.Errorf("no corresponding zones") return } done, err := zm.Zones[zoneId].ZoneRequestScheduler.Schedule(reqChan) bs, jsonErr := json.Marshal(&ZoneResponse{ Type: "user_zone_init", From: zoneId, To: target, Payload: map[string]interface{}{}, }) if jsonErr != nil { logger.Println("error in open channel", jsonErr) return } if sendErr := channel.SendText(string(bs)); sendErr != nil { logger.Println("error in open channel send", sendErr) return } for { select { case <-done: return case <-err: } } } }) channel.OnClose(func() { close(reqChan) }) channel.OnMessage(func(msg webrtc.DataChannelMessage) { var req ZoneRequest if err := json.Unmarshal(msg.Data, &req); err != nil { logger.Println(err) return } logger.Println("incoming request", req) reqChan <- &req }) logger.Println("new channel for target : ", target) logger.Println(target) _ = atomicallyExecute(zm.dataChannelFlag, func() (err error) { l := int32(0) zm.DataChannels[target] = &DataChannel{ DataChannel: channel, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &l, } return }) } else { peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) { _ = atomicallyExecute(zm.dataChannelFlag, func() (err error) { l := int32(0) zm.DataChannels[target] = &DataChannel{ DataChannel: dc, l: &l, } return }) reqChan := make(chan *ZoneRequest) dc.OnOpen(func() { logger.Println(zoneId) if _, ok := zm.Zones[zoneId]; ok { logger.Println("this zone exist") _ = atomicallyExecute(zm.Zones[zoneId].DataChannelsFlag, func() (err error) { x := int32(0) zm.Zones[zoneId].DataChannels[target] = &DataChannel{DataChannel: dc, bufferedAmountLowThresholdReached: make(<-chan struct{}), l: &x} return }) done, err := zm.Zones[zoneId].ZoneRequestScheduler.Schedule(reqChan) bs, jsonErr := json.Marshal(&ZoneResponse{ Type: "user_zone_init", From: zoneId, To: target, Payload: map[string]interface{}{}, }) if jsonErr != nil { logger.Println("error in open channel", jsonErr) return } if sendErr := dc.SendText(string(bs)); sendErr != nil { logger.Println("error in open channel send", sendErr) return } for { select { case <-done: return case <-err: } } } }) dc.OnClose(func() { close(reqChan) }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { var req ZoneRequest if err := json.Unmarshal(msg.Data, &req); err != nil { logger.Println(err) return } logger.Println("incoming request", req) reqChan <- &req }) }) } peerConnection.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { if pcs == webrtc.PeerConnectionStateClosed || pcs == webrtc.PeerConnectionStateDisconnected || pcs == webrtc.PeerConnectionStateFailed { logger.Println(pcs) if err = zm.HandleLeavingMember(target, zoneId); err != nil { logger.Println(err) } } }) peerConnection.OnNegotiationNeeded(func() { logger.Println("------------------- negotiation is needed --------------------") if pc, ok := zm.RTCPeerConnections[target]; ok { if pc.SignalingState() == webrtc.ICETransportStateConnected { localSd, err := pc.CreateOffer(nil) if err != nil { logger.Println(err) return } if err = pc.SetLocalDescription(localSd); err != nil { logger.Println(err) return } if err = zm.stream.Send(&Request{ Type: string(ZONE_WEBRTC_RENNEGOTIATION_OFFER), From: zm.ID, Token: "", Payload: map[string]string{ "to": target, "sdp": localSd.SDP, }, }); err != nil { logger.Println(err) return } } } }) peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) { logger.Printf("ICE connection state has changed %s\n", is.String()) if is == webrtc.ICEConnectionStateDisconnected || is == webrtc.ICEConnectionStateFailed { logger.Println(is) } }) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { return } _ = atomicallyExecute(zm.candidateFlag, func() (err error) { desc := peerConnection.RemoteDescription() if desc == nil { logger.Println("generated candidate appended to list : ", i) zm.PendingCandidates[target] = append(zm.PendingCandidates[target], i) } else { logger.Println("generated candidate : ", i) if iceCandidateErr := cb(target, i); iceCandidateErr != nil { logger.Println(iceCandidateErr) } } return }) }) return } func (zm *ZoneManager) HandleRennegotiationOffer(from string, sdp string) (err error) { err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id %s", from) return } zm.RTCPeerConnections[from].makingOfferLock.Lock() if zm.RTCPeerConnections[from].makingOffer { zm.RTCPeerConnections[from].makingOfferLock.Unlock() err = fmt.Errorf("already making an offer or state is stable") return } zm.RTCPeerConnections[from].makingOfferLock.Unlock() if err = zm.RTCPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeOffer}); err != nil { return } localSd, err := zm.RTCPeerConnections[from].CreateAnswer(nil) if err != nil { return } if err = zm.RTCPeerConnections[from].SetLocalDescription(localSd); err != nil { return } if err = zm.stream.Send(&Request{ Type: string(ZONE_WEBRTC_RENNEGOTIATION_ANSWER), From: zm.ID, Token: "", Payload: map[string]string{ "to": from, "sdp": localSd.SDP, }, }); err != nil { logger.Println(err) return } return }) return } func (zm *ZoneManager) HandleRennegotiationAnswer(from string, sdp string) (err error) { _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id %s", from) return } err = zm.RTCPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer}) return }) return } func (zm *ZoneManager) AddCandidate(candidate *webrtc.ICECandidateInit, from string) (err error) { _ = atomicallyExecute(zm.candidateFlag, func() (err error) { if candidate != nil { if connection, ok := zm.RTCPeerConnections[from]; ok { err = connection.AddICECandidate(*candidate) } } return }) return } func (zm *ZoneManager) HandleLeavingMember(id string, zoneId string) (err error) { if err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[id]; !ok { err = fmt.Errorf("no correponding peerconnection for id %s", id) return } return }); err != nil { return } if err = atomicallyExecute(zm.zoneFlag, func() (err error) { if zone, ok := zm.Zones[zoneId]; ok { for _, handlersPublishers := range zone.ZoneRequestScheduler.handlersPublishers { handlersPublishers <- &ZoneRequest{ ReqType: LEAVE_ZONE, From: "node", Payload: map[string]interface{}{ "userId": id, }, } } if err = atomicallyExecute(zone.DataChannelsFlag, func() (err error) { defer delete(zone.DataChannels, id) if dataChannel, ok := zone.DataChannels[id]; ok { if err = dataChannel.DataChannel.Close(); err != nil { return } } return }); err != nil { return } } else { err = fmt.Errorf("no corresponding zone for zoneId %s", zoneId) } return }); err != nil { return } if err = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[id]; ok { defer delete(zm.RTCPeerConnections, id) if err = zm.RTCPeerConnections[id].Close(); err != nil { return } } return }); err != nil { return } return } func (zm *ZoneManager) negotiate(target string, zoneId string) { _ = atomicallyExecute(zm.peerConnectionFlag, func() (err error) { if _, ok := zm.RTCPeerConnections[target]; !ok { return } zm.RTCPeerConnections[target].makingOfferLock.Lock() zm.RTCPeerConnections[target].makingOffer = true zm.RTCPeerConnections[target].makingOfferLock.Unlock() defer func() { zm.RTCPeerConnections[target].makingOfferLock.Lock() zm.RTCPeerConnections[target].makingOffer = false zm.RTCPeerConnections[target].makingOfferLock.Unlock() }() return }) }