package localserver import ( "bufio" "context" "encoding/json" "fmt" "io" "os" "path/filepath" "strconv" sync "sync" "time" "github.com/pion/webrtc/v3" ) const ( ZONE_FS_WEBRTC_OFFER ReqType = "zone_fs_offer" ZONE_FS_WEBRTC_ANSWER ReqType = "zone_fs_answer" ZONE_FS_WEBRTC_RENNEGOTIATION_OFFER ReqType = "zone_fs_rennegotiation_offer" ZONE_FS_WEBRTC_RENNEGOTIATION_ANSWER ReqType = "zone_fs_rennegotiation_answer" ZONE_FS_WEBRTC_COUNTER_OFFER ReqType = "zone_fs_webrtc_counter_offer" ZONE_FS_WEBRTC_CANDIDATE ReqType = "zone_fs_webrtc_candidate" ) type FSInstance struct { ZoneID string `json:"id"` Owner string `json:"owner"` Members []string `json:"members"` OpenFiles map[string]*os.File `json:"-"` OpenFilesForUser map[string][]string `json:"-"` localSD map[string]*webrtc.SessionDescription `json:"-"` rtcPeerConnections map[string]*ZoneRTCPeerConnection `json:"-"` zoneFSDataChannels map[string]map[string]*DataChannel `json:"-"` pendingCandidates map[string][]*webrtc.ICECandidate `json:"-"` middlewares []interface{} `json:"-"` candidateFlag *uint32 `json:"-"` filesFlag *uint32 `json:"-"` rtcPeerConnectionMapFlag *uint32 `json:"-"` dataChannelMapFlag *uint32 `json:"-"` localSDMapFlag *uint32 `json:"-"` } type FSInstanceOnICECandidateFunc = func(string, string, *webrtc.ICECandidate) error func NewFSInstance(id, owner string, members []string) (audioChannel *FSInstance) { candidateFlag := uint32(0) rtcPeerConnectionMapFlag := uint32(0) dataChannelMapFlag := uint32(0) filesFlag := uint32(0) localSDMapFlag := uint32(0) audioChannel = &FSInstance{ ZoneID: id, Owner: owner, Members: members, OpenFiles: make(map[string]*os.File), OpenFilesForUser: make(map[string][]string), localSD: make(map[string]*webrtc.SessionDescription), rtcPeerConnections: make(map[string]*ZoneRTCPeerConnection), zoneFSDataChannels: make(map[string]map[string]*DataChannel), pendingCandidates: make(map[string][]*webrtc.ICECandidate), middlewares: make([]interface{}, 0), candidateFlag: &candidateFlag, filesFlag: &filesFlag, rtcPeerConnectionMapFlag: &rtcPeerConnectionMapFlag, dataChannelMapFlag: &dataChannelMapFlag, localSDMapFlag: &localSDMapFlag, } return } func (fs *FSInstance) SetupFileUpload(path, filename, userId string) (err error) { concretePath := filepath.Join("data", "zones", fs.ZoneID, "fs", path, "__files__", filename) if _, rErr := os.ReadDir(filepath.Join("data", "zones", fs.ZoneID, "fs", path, "__files__")); os.IsNotExist(rErr) { if err = os.MkdirAll(filepath.Join("data", "zones", fs.ZoneID, "fs", path, "__files__"), 0700); err != nil { return } } else if rErr != nil { return rErr } if err = os.Remove(concretePath); err != nil { logger.Println(err) } file, err := os.OpenFile(concretePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0755) if err != nil { return } _ = atomicallyExecute(fs.filesFlag, func() (err error) { fs.OpenFiles[filename] = file return }) setupDataChan := func(id, fileName string) (err error) { if pc, ok := fs.rtcPeerConnections[id]; ok { maxRetransmits := uint16(100) var dc *webrtc.DataChannel dc, err = pc.CreateDataChannel(fileName, &webrtc.DataChannelInit{ MaxRetransmits: &maxRetransmits, }) if err != nil { return } dc.OnOpen(func() { logger.Println("!-----------------------------!") logger.Printf("datachannel with id %s is now open\n", dc.Label()) logger.Println("updated") logger.Println("!-----------------------------!") }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { logger.Println(msg) _, _ = file.Write(msg.Data) }) dc.OnClose(func() { //fmt.Println("closing datachannel with id", dc.Label()) _ = atomicallyExecute(fs.filesFlag, func() (err error) { if f, ok := fs.OpenFiles[fileName]; ok { err = f.Close() } delete(fs.OpenFiles, fileName) return }) _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { delete(fs.zoneFSDataChannels[userId], fileName) return }) dc.Close() }) err = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[id]; !ok { fs.zoneFSDataChannels[id] = make(map[string]*DataChannel) } l := int32(0) fs.zoneFSDataChannels[id][dc.Label()] = &DataChannel{ DataChannel: dc, l: &l, } return }) } else { err = fmt.Errorf("no peerconnection for id %s", id) } return } err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { err = setupDataChan(userId, filename) return }) return } func (fs *FSInstance) FileUploadDone(path, filename, userId string) (err error) { err = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[userId]; ok { if dc, ok := fs.zoneFSDataChannels[userId][filename]; ok { err = dc.DataChannel.Close() return } } return }) return } func (fs *FSInstance) FileUploadFailed(path, filename, userId string) (err error) { concretePath := filepath.Join("data", "zones", fs.ZoneID, "fs", path, "__files__", filename) if err = os.Remove(concretePath); err != nil { return } err = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[userId]; ok { if dc, ok := fs.zoneFSDataChannels[userId][filename]; ok { err = dc.DataChannel.Close() return } } return }) return } func (fs *FSInstance) SetupFileDownload(path, filename, userId string) (err error) { concretePath := filepath.Join("data", "zones", fs.ZoneID, "fs", path, "__files__", filename) file, err := os.OpenFile(concretePath, os.O_RDONLY, 0755) if err != nil { return } _ = atomicallyExecute(fs.filesFlag, func() (err error) { fs.OpenFiles[filename] = file if _, ok := fs.OpenFilesForUser[userId]; !ok { fs.OpenFilesForUser[userId] = []string{} } fs.OpenFilesForUser[userId] = append(fs.OpenFilesForUser[userId], filename) return }) var dc *webrtc.DataChannel err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { if pc, ok := fs.rtcPeerConnections[userId]; ok { maxRetransmits := uint16(100) dc, err = pc.CreateDataChannel(filename, &webrtc.DataChannelInit{ MaxRetransmits: &maxRetransmits, }) if err != nil { return } } return }) if dc != nil { dc.SetBufferedAmountLowThreshold(15000000) bufferedAmountLock := make(chan struct{}) done := make(chan struct{}) dc.OnOpen(func() { go func() { defer func() { close(bufferedAmountLock) bufferedAmountLock = nil _ = atomicallyExecute(fs.filesFlag, func() (err error) { if f, ok := fs.OpenFiles[filename]; ok { err = f.Close() } delete(fs.OpenFiles, filename) if _, ok := fs.OpenFilesForUser[userId]; ok { var index int for i, v := range fs.OpenFilesForUser[userId] { if v == filename { index = i break } } if len(fs.OpenFilesForUser[userId]) > 1 { fs.OpenFilesForUser[userId] = append(fs.OpenFilesForUser[userId][:index], fs.OpenFilesForUser[userId][:index+1]...) } else { delete(fs.OpenFilesForUser, userId) } } return }) }() r := bufio.NewReader(file) buf := make([]byte, 0, 15000) sendingLoop: for { n, readErr := r.Read(buf[:cap(buf)]) buf = buf[:n] if n == 0 { if err == nil { logger.Println("n is 0 weird") break } if err == io.EOF { break } logger.Println(readErr) return } if err = dc.Send(buf); err != nil { dc.Close() logger.Println(err) break sendingLoop } if dc.BufferedAmount() > dc. BufferedAmountLowThreshold() { <-bufferedAmountLock } } logger.Println("done") <-time.After(4*time.Second) _ = dc.SendText("done") <-time.After(time.Second) _ = dc.Close() _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[userId]; ok { delete(fs.zoneFSDataChannels[userId], dc.Label()) } return }) }() }) dc.OnBufferedAmountLow(func() { if bufferedAmountLock != nil { bufferedAmountLock <- struct{}{} } }) dc.OnClose(func() { done <- struct{}{} defer close(done) _ = atomicallyExecute(fs.filesFlag, func() (err error) { if f, ok := fs.OpenFiles[filename]; ok { err = f.Close() } delete(fs.OpenFiles, filename) return }) }) err = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[userId]; !ok { fs.zoneFSDataChannels[userId] = make(map[string]*DataChannel) } l := int32(0) fs.zoneFSDataChannels[userId][dc.Label()] = &DataChannel{ DataChannel: dc, l: &l, } return }) } else { err = fmt.Errorf("datachannel not created") _ = atomicallyExecute(fs.filesFlag, func() (err error) { if file, ok := fs.OpenFiles[filename]; ok { file.Close() } delete(fs.OpenFiles, filename) return }) } return } func (fs *FSInstance) HandleOffer(ctx context.Context, channelId, userId, sdp, hostId string, sendDCMessage SendDCMessageFunc, cb FSInstanceOnICECandidateFunc) (done chan struct{}, errCh chan error) { done, errCh = make(chan struct{}), make(chan error) go func() { peerConnection, err := fs.createPeerConnection(userId, fs.ZoneID, webrtc.SDPTypeAnswer, cb, sendDCMessage) if err != nil { errCh <- err return } _ = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { fs.rtcPeerConnections[userId] = &ZoneRTCPeerConnection{ PeerConnection: peerConnection, makingOffer: false, makingOfferLock: &sync.Mutex{}, } return }) offer := webrtc.SessionDescription{ Type: webrtc.SDPTypeOffer, SDP: sdp, } if err = peerConnection.SetRemoteDescription(offer); err != nil { errCh <- err return } rawAnswer, err := peerConnection.CreateAnswer(nil) if err != nil { errCh <- err return } if err = peerConnection.SetLocalDescription(rawAnswer); err != nil { errCh <- err return } _, _ = sendDCMessage(string(ZONE_FS_WEBRTC_ANSWER), hostId, userId, map[string]interface{}{ "to": userId, "from": fs.ZoneID, "channelId": channelId, "sdp": rawAnswer.SDP, }) done <- struct{}{} logger.Println("handle offer done") }() return } func (fs *FSInstance) HandleCounterOffer(ctx context.Context, userId string, sendDCMessage SendDCMessageFunc) (err error) { // if err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { // if _, ok := fs.rtcPeerConnections[userId]; !ok { // err = fmt.Errorf("no field corresponding peer connection for id %s", userId) // return // } // logger.Println("handling counter offer") // connection := fs.rtcPeerConnections[userId] // err = atomicallyExecute(fs.localSDMapFlag, func() (err error) { // err = connection.SetLocalDescription(*fs.localSD[userId]) // return // }) // return // }); err != nil { // return // } // _ = atomicallyExecute(fs.localSDMapFlag, func() (err error) { // delete(fs.localSD, userId) // return // }) if err = atomicallyExecute(fs.candidateFlag, func() (err error) { for _, candidate := range fs.pendingCandidates[userId] { logger.Println("sending candidate to", userId, candidate) d, e := sendDCMessage(string(ZONE_FS_WEBRTC_CANDIDATE), "", userId, map[string]interface{}{ "from": fs.ZoneID, "to": userId, "candidate": candidate.ToJSON().Candidate, "sdpMid": *candidate.ToJSON().SDPMid, "sdpMLineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)), }) select { case <-d: case err = <-e: return } } delete(fs.pendingCandidates, userId) return }); err != nil { return } return } func (fs *FSInstance) HandleRennegotiationOffer(from, sdp string, sendDCMessage SendDCMessageFunc) (err error) { err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { if _, ok := fs.rtcPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id %s", from) return } // fs.rtcPeerConnections[from].makingOfferLock.Lock() // if fs.rtcPeerConnections[from].makingOffer { // fs.rtcPeerConnections[from].makingOfferLock.Unlock() // return fmt.Errorf("already making an offer or state is stable") // } // fs.rtcPeerConnections[from].makingOfferLock.Unlock() if err = fs.rtcPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeOffer}); err != nil { return } localSd, err := fs.rtcPeerConnections[from].CreateAnswer(nil) if err != nil { return } if err = fs.rtcPeerConnections[from].SetLocalDescription(localSd); err != nil { return } d, e := sendDCMessage(string(ZONE_FS_WEBRTC_RENNEGOTIATION_ANSWER), fs.ZoneID, from, map[string]interface{}{ "from": fs.ZoneID, "to": from, "sdp": localSd.SDP, }) select { case <-d: case err = <-e: } return }) return } func (fs *FSInstance) HandleRennegotiationAnswer(from, sdp string) (err error) { logger.Println("---------------------handling rennego answer") err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { err = fs.rtcPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer}) return }) return } func (fs *FSInstance) AddCandidate(candidate *webrtc.ICECandidateInit, from string) (err error) { logger.Println("adding ice candidate", candidate) err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { if _, ok := fs.rtcPeerConnections[from]; ok && candidate != nil { err = fs.rtcPeerConnections[from].AddICECandidate(*candidate) } return }) return } func (fs *FSInstance) createPeerConnection(target, from string, peerType webrtc.SDPType, cb FSInstanceOnICECandidateFunc, sendDCMessage SendDCMessageFunc) (peerConnection *webrtc.PeerConnection, err error) { defer func() { if r := recover(); err != nil { logger.Printf("recover from panic : %v\n", r) } }() config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302", "stun:stunserver.org:3478"}, }, }, SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, } peerConnection, err = webrtc.NewPeerConnection(config) if err != nil { return } logger.Println("---------------------------------------------------") if peerType == webrtc.SDPTypeAnswer { maxRetransmits := uint16(100) channel, err := peerConnection.CreateDataChannel("data", &webrtc.DataChannelInit{ MaxRetransmits: &maxRetransmits, }) if err != nil { return nil, err } channel.OnOpen(func() { logger.Println("channel opened") if chanErr := channel.SendText("yooo man this is open"); chanErr != nil { logger.Println(chanErr) } }) channel.OnMessage(func(msg webrtc.DataChannelMessage) { var event CallEvent if err := json.Unmarshal(msg.Data, &event); err != nil { logger.Println(err) return } if e := fs.HandleDataChannelEvents(event.From, event.EventId, event.Payload); e != nil { logger.Println("*-------------- datachannel error: ", e) } }) logger.Println("new channel for target : ", target) channel.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) channel.OnBufferedAmountLow(func() { }) _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { logger.Println(target) l := int32(0) if _, ok := fs.zoneFSDataChannels[target]; !ok { fs.zoneFSDataChannels[target] = make(map[string]*DataChannel) } fs.zoneFSDataChannels[target][channel.Label()] = &DataChannel{ DataChannel: channel, l: &l, } return }) } else { peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) { _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { l := int32(0) if _, ok := fs.zoneFSDataChannels[target]; !ok { fs.zoneFSDataChannels[target] = make(map[string]*DataChannel) } fs.zoneFSDataChannels[target][dc.Label()] = &DataChannel{ DataChannel: dc, l: &l, } return }) dc.OnOpen(func() { logger.Printf("got a new open datachannel %s\n", dc.Label()) }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { var event CallEvent if err := json.Unmarshal(msg.Data, &event); err != nil { logger.Println(err) return } if e := fs.HandleDataChannelEvents(event.From, event.EventId, event.Payload); e != nil { logger.Println("*-------------- datachannel error: ", e) } }) }) } peerConnection.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { if pcs == webrtc.PeerConnectionStateClosed || pcs == webrtc.PeerConnectionStateDisconnected || pcs == webrtc.PeerConnectionStateFailed { logger.Println(pcs) //fs.HandleLeavingMember(target) } }) peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) { logger.Printf("ICE connection state has changed %s\n", is.String()) if is == webrtc.ICEConnectionStateDisconnected || is == webrtc.ICEConnectionStateFailed { logger.Println(is) } }) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { return } _ = atomicallyExecute(fs.candidateFlag, func() (err error) { desc := peerConnection.RemoteDescription() if desc == nil { logger.Println("generated candidate appended to list : ", i) fs.pendingCandidates[target] = append(fs.pendingCandidates[target], i) } else { logger.Println("generated candidate : ", i) if iceCandidateErr := cb(from, target, i); iceCandidateErr != nil { logger.Println(iceCandidateErr) } } return }) }) // peerConnection.OnNegotiationNeeded(func() { // logger.Println("---------------- rennego is needed -----------") // _ = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { // logger.Println("----------------- sending renego to peer with id", target) // if peerConnection.SignalingState() == webrtc.SignalingStateStable { // localSd, localSdErr := peerConnection.CreateOffer(nil) // if localSdErr != nil { // logger.Println(localSdErr) // return localSdErr // } // if err = peerConnection.SetLocalDescription(localSd); err != nil { // logger.Println(err) // return // } // d, e := sendDCMessage(string(ZONE_FS_WEBRTC_RENNEGOTIATION_OFFER), fs.ZoneID, target, map[string]interface{}{ // "from": fs.ZoneID, // "to": target, // "sdp": localSd.SDP, // }) // select { // case <-d: // case err = <-e: // logger.Println(err) // } // } // return // }) // }) return } func (fs *FSInstance) HandleLeavingMember(id string) { _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[id]; ok { for _, dc := range fs.zoneFSDataChannels[id] { dc.DataChannel.Close() } } delete(fs.zoneFSDataChannels, id) return }) logger.Println("chatfs datachannels cleaned") _ = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { if pc, ok := fs.rtcPeerConnections[id]; ok { if closeErr := pc.Close(); closeErr != nil { err = closeErr logger.Println("peer connection close error", closeErr) } } delete(fs.rtcPeerConnections, id) return }) logger.Println("chats perrconnections cleaned") _ = atomicallyExecute(fs.candidateFlag, func() (err error) { delete(fs.pendingCandidates, id) return }) _ = atomicallyExecute(fs.filesFlag, func() (err error) { for _, openFile := range fs.OpenFilesForUser[id] { if f, ok := fs.OpenFiles[openFile]; ok { _ = f.Close() } delete(fs.OpenFiles, openFile) } delete(fs.OpenFilesForUser, id) return }) } func (fs *FSInstance) HandleDataChannelEvents(from, eventId string, payload map[string]interface{}) (err error) { switch eventId { } return }