package localserver import ( "bufio" "context" "encoding/json" "fmt" "io" "os" "path/filepath" "strconv" sync "sync" "time" "github.com/pion/webrtc/v3" ) const ( ZONE_CHAT_WEBRTC_OFFER ReqType = "zone_chat_offer" ZONE_CHAT_WEBRTC_ANSWER ReqType = "zone_chat_answer" ZONE_CHAT_WEBRTC_RENNEGOTIATION_OFFER ReqType = "zone_chat_rennegotiation_offer" ZONE_CHAT_WEBRTC_RENNEGOTIATION_ANSWER ReqType = "zone_chat_rennegotiation_answer" ZONE_CHAT_WEBRTC_COUNTER_OFFER ReqType = "zone_chat_webrtc_counter_offer" ZONE_CHAT_WEBRTC_CANDIDATE ReqType = "zone_chat_webrtc_candidate" ) type ChatFSInstance struct { ZoneID string `json:"id"` Owner string `json:"owner"` Members []string `json:"members"` OpenFiles map[string]*os.File `json:"-"` OpenFilesForUser map[string][]string `json:"-"` localSD map[string]*webrtc.SessionDescription `json:"-"` rtcPeerConnections map[string]*ZoneRTCPeerConnection `json:"-"` zoneFSDataChannels map[string]map[string]*DataChannel `json:"-"` pendingCandidates map[string][]*webrtc.ICECandidate `json:"-"` middlewares []interface{} `json:"-"` candidateFlag *uint32 `json:"-"` filesFlag *uint32 `json:"-"` rtcPeerConnectionMapFlag *uint32 `json:"-"` dataChannelMapFlag *uint32 `json:"-"` localSDMapFlag *uint32 `json:"-"` } type ChatFSInstanceOnICECandidateFunc = func(string, string, *webrtc.ICECandidate) error func NewChatFSInstance(id, owner string, members []string) (audioChannel *ChatFSInstance) { candidateFlag := uint32(0) rtcPeerConnectionMapFlag := uint32(0) dataChannelMapFlag := uint32(0) filesFlag := uint32(0) localSDMapFlag := uint32(0) audioChannel = &ChatFSInstance{ ZoneID: id, Owner: owner, Members: members, OpenFiles: make(map[string]*os.File), OpenFilesForUser: make(map[string][]string), localSD: make(map[string]*webrtc.SessionDescription), rtcPeerConnections: make(map[string]*ZoneRTCPeerConnection), zoneFSDataChannels: make(map[string]map[string]*DataChannel), pendingCandidates: make(map[string][]*webrtc.ICECandidate), middlewares: make([]interface{}, 0), candidateFlag: &candidateFlag, filesFlag: &filesFlag, rtcPeerConnectionMapFlag: &rtcPeerConnectionMapFlag, dataChannelMapFlag: &dataChannelMapFlag, localSDMapFlag: &localSDMapFlag, } return } func (fs *ChatFSInstance) SetupFileUpload(chatId, filename, userId string) (err error) { concretePath := filepath.Join("data", "zones", fs.ZoneID, "chats", chatId, "__files__", filename) if _, rErr := os.ReadDir(filepath.Join("data", "zones", fs.ZoneID, "chats", chatId, "__files__")); os.IsNotExist(rErr) { if err = os.MkdirAll(filepath.Join("data", "zones", fs.ZoneID, "chats", chatId, "__files__"), 0700); err != nil { return } } else if rErr != nil { return rErr } // if err = os.Remove(concretePath); err != nil { // logger.Println(err) // } file, err := os.OpenFile(concretePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0755) if err != nil { return } _ = atomicallyExecute(fs.filesFlag, func() (err error) { fs.OpenFiles[fmt.Sprintf("%s/%s", filename, userId)] = file if _, ok := fs.OpenFilesForUser[userId]; !ok { fs.OpenFilesForUser[userId] = []string{} } fs.OpenFilesForUser[userId] = append(fs.OpenFilesForUser[userId], fmt.Sprintf("%s/%s", filename, userId)) return }) var dc *webrtc.DataChannel if err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { if pc, ok := fs.rtcPeerConnections[userId]; ok { maxRetransmits := uint16(100) dc, err = pc.CreateDataChannel(filename, &webrtc.DataChannelInit{ MaxRetransmits: &maxRetransmits, }) if err != nil { return } } else { err = fmt.Errorf("no peerconnection for id %s", userId) } return }); err != nil { return } go func() { dc.SetBufferedAmountLowThreshold(15000000) dc.OnMessage(func(msg webrtc.DataChannelMessage) { logger.Println(msg.Data) _, _ = file.Write(msg.Data) }) dc.OnOpen(func() { logger.Println("updated") err = dc.SendText("yooooo man this channel is open you should be able to send stuff") logger.Println("!-----------------------------!") logger.Printf("datachannel with id %s is now open\n", dc.Label()) logger.Println(*dc.ID()) logger.Println(err) logger.Println("!-----------------------------!") }) dc.OnClose(func() { logger.Println("channel closed") _ = atomicallyExecute(fs.filesFlag, func() (err error) { if f, ok := fs.OpenFiles[fmt.Sprintf("%s/%s", filename, userId)]; ok { err = f.Close() } delete(fs.OpenFiles, fmt.Sprintf("%s/%s", filename, userId)) if _, ok := fs.OpenFilesForUser[userId]; ok { var index int for i, v := range fs.OpenFilesForUser[userId] { if v == fmt.Sprintf("%s/%s", filename, userId) { index = i break } } if len(fs.OpenFilesForUser[userId]) > 1 { fs.OpenFilesForUser[userId] = append(fs.OpenFilesForUser[userId][:index], fs.OpenFilesForUser[userId][:index+1]...) } else { delete(fs.OpenFilesForUser, userId) } } return }) _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { delete(fs.zoneFSDataChannels[userId], fmt.Sprintf("%s/%s", filename, userId)) return }) }) }() err = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[userId]; !ok { fs.zoneFSDataChannels[userId] = make(map[string]*DataChannel) } l := int32(0) fs.zoneFSDataChannels[userId][dc.Label()] = &DataChannel{ DataChannel: dc, l: &l, } return }) return } func (fs *ChatFSInstance) FileUploadDone(chatId, filename, userId string) (err error) { err = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[userId]; ok { if dc, ok := fs.zoneFSDataChannels[userId][filename]; ok { err = dc.DataChannel.Close() return } } return }) return } func (fs *ChatFSInstance) FileUploadFailed(chatId, filename, userId string) (err error) { concretePath := filepath.Join("data", "zones", fs.ZoneID, "chats", chatId, "__files__", filename) if err = os.Remove(concretePath); err != nil { return } err = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[userId]; ok { if dc, ok := fs.zoneFSDataChannels[userId][filename]; ok { err = dc.DataChannel.Close() return } } return }) return } func (fs *ChatFSInstance) SetupFileDownload(chatId, filename, userId string) (err error) { concretePath := filepath.Join("data", "zones", fs.ZoneID, "chats", chatId, "__files__", filename) file, err := os.OpenFile(concretePath, os.O_RDONLY, 0755) if err != nil { return } _ = atomicallyExecute(fs.filesFlag, func() (err error) { fs.OpenFiles[fmt.Sprintf("%s/%s", filename, userId)] = file if _, ok := fs.OpenFilesForUser[userId]; !ok { fs.OpenFilesForUser[userId] = []string{} } fs.OpenFilesForUser[userId] = append(fs.OpenFilesForUser[userId], fmt.Sprintf("%s/%s", filename, userId)) return }) var dc *webrtc.DataChannel if err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { if pc, ok := fs.rtcPeerConnections[userId]; ok { maxRetransmits := uint16(100) dc, err = pc.CreateDataChannel(filename, &webrtc.DataChannelInit{ MaxRetransmits: &maxRetransmits, }) if err != nil { return } } else { err = fmt.Errorf("no peerconnection for id %s", userId) } return }); err != nil { return } if dc != nil { dc.SetBufferedAmountLowThreshold(15000000) bufferedAmountLock := make(chan struct{}) dc.OnOpen(func() { go func() { defer func() { close(bufferedAmountLock) bufferedAmountLock = nil _ = atomicallyExecute(fs.filesFlag, func() (err error) { if f, ok := fs.OpenFiles[fmt.Sprintf("%s/%s", filename, userId)]; ok { err = f.Close() } delete(fs.OpenFiles, fmt.Sprintf("%s/%s", filename, userId)) if _, ok := fs.OpenFilesForUser[userId]; ok { var index int for i, v := range fs.OpenFilesForUser[userId] { if v == fmt.Sprintf("%s/%s", filename, userId) { index = i break } } if len(fs.OpenFilesForUser[userId]) > 1 { fs.OpenFilesForUser[userId] = append(fs.OpenFilesForUser[userId][:index], fs.OpenFilesForUser[userId][:index+1]...) } else { delete(fs.OpenFilesForUser, userId) } } return }) }() r := bufio.NewReader(file) buf := make([]byte, 0, 50000) sendingLoop: for { n, readErr := r.Read(buf[:cap(buf)]) buf = buf[:n] if n == 0 { if err == nil { logger.Println("n is 0 weird") break sendingLoop } if err == io.EOF { break sendingLoop } logger.Println(readErr) return } if err = dc.Send(buf); err != nil { dc.Close() logger.Println(err) break sendingLoop } if dc.BufferedAmount() > dc. BufferedAmountLowThreshold() { <-bufferedAmountLock } } logger.Println("done") <-time.After(time.Second * 4) if err = dc.SendText("done"); err != nil { logger.Println(err) } dc.Close() _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[userId]; ok { delete(fs.zoneFSDataChannels[userId], dc.Label()) } return }) logger.Println("done 2") }() }) dc.OnBufferedAmountLow(func() { if bufferedAmountLock != nil { bufferedAmountLock <- struct{}{} } }) dc.OnClose(func() { _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[userId]; ok { delete(fs.zoneFSDataChannels[userId], dc.Label()) //fs.HandleLeavingMember(target) } return }) }) err = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[userId]; !ok { fs.zoneFSDataChannels[userId] = make(map[string]*DataChannel) } l := int32(0) fs.zoneFSDataChannels[userId][dc.Label()] = &DataChannel{ DataChannel: dc, l: &l, } return }) } else { err = fmt.Errorf("datachannel not created") _ = atomicallyExecute(fs.filesFlag, func() (err error) { if file, ok := fs.OpenFiles[fmt.Sprintf("%s/%s", filename, userId)]; ok { file.Close() } delete(fs.OpenFiles, fmt.Sprintf("%s/%s", filename, userId)) return }) } return } func (fs *ChatFSInstance) HandleOffer(ctx context.Context, channelId, userId, sdp, hostId string, sendDCMessage SendDCMessageFunc, cb ChatFSInstanceOnICECandidateFunc) (done chan struct{}, errCh chan error) { done, errCh = make(chan struct{}), make(chan error) go func() { peerConnection, err := fs.createPeerConnection(userId, fs.ZoneID, webrtc.SDPTypeAnswer, cb, sendDCMessage) if err != nil { errCh <- err return } _ = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { fs.rtcPeerConnections[userId] = &ZoneRTCPeerConnection{ PeerConnection: peerConnection, makingOffer: false, makingOfferLock: &sync.Mutex{}, } return }) offer := webrtc.SessionDescription{ Type: webrtc.SDPTypeOffer, SDP: sdp, } if err = peerConnection.SetRemoteDescription(offer); err != nil { errCh <- err return } rawAnswer, err := peerConnection.CreateAnswer(nil) if err != nil { errCh <- err return } if err = peerConnection.SetLocalDescription(rawAnswer); err != nil { errCh <- err return } _, _ = sendDCMessage(string(ZONE_CHAT_WEBRTC_ANSWER), hostId, userId, map[string]interface{}{ "to": userId, "from": fs.ZoneID, "channelId": channelId, "sdp": rawAnswer.SDP, }) done <- struct{}{} logger.Println("handle offer done") }() return } func (fs *ChatFSInstance) HandleCounterOffer(ctx context.Context, userId string, sendDCMessage SendDCMessageFunc) (err error) { // if err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { // if _, ok := fs.rtcPeerConnections[userId]; !ok { // err = fmt.Errorf("no field corresponding peer connection for id %s", userId) // return // } // logger.Println("handling counter offer") // connection := fs.rtcPeerConnections[userId] // err = atomicallyExecute(fs.localSDMapFlag, func() (err error) { // err = connection.SetLocalDescription(*fs.localSD[userId]) // return // }) // return // }); err != nil { // return // } // _ = atomicallyExecute(fs.localSDMapFlag, func() (err error) { // delete(fs.localSD, userId) // return // }) if err = atomicallyExecute(fs.candidateFlag, func() (err error) { for _, candidate := range fs.pendingCandidates[userId] { logger.Println("sending candidate to", userId, candidate) d, e := sendDCMessage(string(ZONE_CHAT_WEBRTC_CANDIDATE), "", userId, map[string]interface{}{ "from": fs.ZoneID, "to": userId, "candidate": candidate.ToJSON().Candidate, "sdpMid": *candidate.ToJSON().SDPMid, "sdpMLineIndex": strconv.Itoa(int(*candidate.ToJSON().SDPMLineIndex)), }) select { case <-d: case err = <-e: return } } delete(fs.pendingCandidates, userId) return }); err != nil { return } return } func (fs *ChatFSInstance) HandleRennegotiationOffer(from, sdp string, sendDCMessage SendDCMessageFunc) (err error) { err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { if _, ok := fs.rtcPeerConnections[from]; !ok { err = fmt.Errorf("no corresponding peer connection for id %s", from) return } //fs.rtcPeerConnections[from].makingOfferLock.Lock() if fs.rtcPeerConnections[from].makingOffer { //fs.rtcPeerConnections[from].makingOfferLock.Unlock() return fmt.Errorf("already making an offer or state is stable") } //fs.rtcPeerConnections[from].makingOfferLock.Unlock() if err = fs.rtcPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeOffer}); err != nil { return } localSd, err := fs.rtcPeerConnections[from].CreateAnswer(nil) if err != nil { return } if err = fs.rtcPeerConnections[from].SetLocalDescription(localSd); err != nil { return } d, e := sendDCMessage(string(ZONE_CHAT_WEBRTC_RENNEGOTIATION_ANSWER), fs.ZoneID, from, map[string]interface{}{ "from": fs.ZoneID, "to": from, "sdp": localSd.SDP, }) select { case <-d: case err = <-e: } return }) return } func (fs *ChatFSInstance) HandleRennegotiationAnswer(from, sdp string) (err error) { logger.Println("---------------------handling rennego answer") err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { err = fs.rtcPeerConnections[from].SetRemoteDescription(webrtc.SessionDescription{SDP: sdp, Type: webrtc.SDPTypeAnswer}) return }) return } func (fs *ChatFSInstance) AddCandidate(candidate *webrtc.ICECandidateInit, from string) (err error) { logger.Println("adding ice candidate", candidate) err = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { if _, ok := fs.rtcPeerConnections[from]; ok && candidate != nil { err = fs.rtcPeerConnections[from].AddICECandidate(*candidate) } return }) return } func (fs *ChatFSInstance) createPeerConnection(target, from string, peerType webrtc.SDPType, cb ChatFSInstanceOnICECandidateFunc, sendDCMessage SendDCMessageFunc) (peerConnection *webrtc.PeerConnection, err error) { defer func() { if r := recover(); err != nil { logger.Printf("recover from panic : %v\n", r) } }() config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { URLs: []string{"stun:stun.l.google.com:19302", "stun:stunserver.org:3478"}, }, }, } peerConnection, err = webrtc.NewPeerConnection(config) if err != nil { return } logger.Println("---------------------------------------------------") if peerType == webrtc.SDPTypeAnswer { maxRetransmits := uint16(1) channel, err := peerConnection.CreateDataChannel("data", &webrtc.DataChannelInit{ MaxRetransmits: &maxRetransmits, }) if err != nil { return nil, err } channel.OnOpen(func() { logger.Println("channel opened") if chanErr := channel.SendText("yooo man this is open"); chanErr != nil { logger.Println(chanErr) } }) channel.OnMessage(func(msg webrtc.DataChannelMessage) { var event CallEvent if err := json.Unmarshal(msg.Data, &event); err != nil { logger.Println(err) return } if e := fs.HandleDataChannelEvents(event.From, event.EventId, event.Payload); e != nil { logger.Println("*-------------- datachannel error: ", e) } }) logger.Println("new channel for target : ", target) channel.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) channel.OnBufferedAmountLow(func() { }) channel.OnClose(func() { }) _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { logger.Println(target) l := int32(0) if _, ok := fs.zoneFSDataChannels[target]; !ok { fs.zoneFSDataChannels[target] = make(map[string]*DataChannel) } fs.zoneFSDataChannels[target][channel.Label()] = &DataChannel{ DataChannel: channel, l: &l, } return }) } else { peerConnection.OnDataChannel(func(dc *webrtc.DataChannel) { _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { l := int32(0) if _, ok := fs.zoneFSDataChannels[target]; !ok { fs.zoneFSDataChannels[target] = make(map[string]*DataChannel) } fs.zoneFSDataChannels[target][dc.Label()] = &DataChannel{ DataChannel: dc, l: &l, } return }) dc.OnOpen(func() { logger.Printf("got a new open datachannel %s\n", dc.Label()) }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { var event CallEvent if err := json.Unmarshal(msg.Data, &event); err != nil { logger.Println(err) return } if e := fs.HandleDataChannelEvents(event.From, event.EventId, event.Payload); e != nil { logger.Println("*-------------- datachannel error: ", e) } }) dc.OnClose(func() { _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[target]; ok { delete(fs.zoneFSDataChannels[target], dc.Label()) fs.HandleLeavingMember(target) } return }) }) _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { logger.Println(target) l := int32(0) if _, ok := fs.zoneFSDataChannels[target]; !ok { fs.zoneFSDataChannels[target] = make(map[string]*DataChannel) } fs.zoneFSDataChannels[target][dc.Label()] = &DataChannel{ DataChannel: dc, l: &l, } return }) }) } peerConnection.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { if pcs == webrtc.PeerConnectionStateClosed || pcs == webrtc.PeerConnectionStateDisconnected || pcs == webrtc.PeerConnectionStateFailed { logger.Println(pcs) fs.HandleLeavingMember(target) } }) peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) { logger.Printf("ICE connection state has changed %s\n", is.String()) if is == webrtc.ICEConnectionStateDisconnected || is == webrtc.ICEConnectionStateFailed { logger.Println(is) fs.HandleLeavingMember(target) } }) peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { if i == nil { return } _ = atomicallyExecute(fs.candidateFlag, func() (err error) { desc := peerConnection.RemoteDescription() if desc == nil { logger.Println("generated candidate appended to list : ", i) fs.pendingCandidates[target] = append(fs.pendingCandidates[target], i) } else { logger.Println("generated candidate : ", i) if iceCandidateErr := cb(from, target, i); iceCandidateErr != nil { logger.Println(iceCandidateErr) } } return }) }) // peerConnection.OnNegotiationNeeded(func() { // logger.Println("---------------- rennego is needed -----------") // _ = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { // logger.Println("----------------- sending renego to peer with id", target) // if peerConnection.SignalingState() == webrtc.SignalingStateStable { // localSd, localSdErr := peerConnection.CreateOffer(nil) // if localSdErr != nil { // logger.Println(localSdErr) // return localSdErr // } // if err = peerConnection.SetLocalDescription(localSd); err != nil { // logger.Println(err) // return // } // d, e := sendDCMessage(string(ZONE_CHAT_WEBRTC_RENNEGOTIATION_OFFER), fs.ZoneID, target, map[string]interface{}{ // "from": fs.ZoneID, // "to": target, // "sdp": localSd.SDP, // }) // select { // case <-d: // case err = <-e: // logger.Println(err) // } // } // return // }) // }) return } func (fs *ChatFSInstance) HandleLeavingMember(id string) { _ = atomicallyExecute(fs.dataChannelMapFlag, func() (err error) { if _, ok := fs.zoneFSDataChannels[id]; ok { for _, dc := range fs.zoneFSDataChannels[id] { dc.DataChannel.Close() } } delete(fs.zoneFSDataChannels, id) return }) logger.Println("chatfs datachannels cleaned") _ = atomicallyExecute(fs.rtcPeerConnectionMapFlag, func() (err error) { if pc, ok := fs.rtcPeerConnections[id]; ok { if closeErr := pc.Close(); closeErr != nil { err = closeErr logger.Println("peer connection close error", closeErr) } } delete(fs.rtcPeerConnections, id) return }) logger.Println("chats perrconnections cleaned") _ = atomicallyExecute(fs.candidateFlag, func() (err error) { delete(fs.pendingCandidates, id) return }) _ = atomicallyExecute(fs.filesFlag, func() (err error) { for _, openFile := range fs.OpenFilesForUser[id] { if f, ok := fs.OpenFiles[openFile]; ok { _ = f.Close() } delete(fs.OpenFiles, openFile) } delete(fs.OpenFilesForUser, id) return }) } func (fs *ChatFSInstance) HandleDataChannelEvents(from, eventId string, payload map[string]interface{}) (err error) { switch eventId { } return }