@@ -102,6 +102,12 @@ func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.Cl
102102 BaseProxy : & baseProxy ,
103103 cfg : cfg ,
104104 }
105+ case * config.SudpProxyConf :
106+ pxy = & SudpProxy {
107+ BaseProxy : & baseProxy ,
108+ cfg : cfg ,
109+ closeCh : make (chan struct {}),
110+ }
105111 }
106112 return
107113}
@@ -540,6 +546,151 @@ func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
540546 udp .Forwarder (pxy .localAddr , pxy .readCh , pxy .sendCh )
541547}
542548
549+ type SudpProxy struct {
550+ * BaseProxy
551+
552+ cfg * config.SudpProxyConf
553+
554+ localAddr * net.UDPAddr
555+
556+ closeCh chan struct {}
557+ }
558+
559+ func (pxy * SudpProxy ) Run () (err error ) {
560+ pxy .localAddr , err = net .ResolveUDPAddr ("udp" , fmt .Sprintf ("%s:%d" , pxy .cfg .LocalIp , pxy .cfg .LocalPort ))
561+ if err != nil {
562+ return
563+ }
564+ return
565+ }
566+
567+ func (pxy * SudpProxy ) Close () {
568+ pxy .mu .Lock ()
569+ defer pxy .mu .Unlock ()
570+ select {
571+ case <- pxy .closeCh :
572+ return
573+ default :
574+ close (pxy .closeCh )
575+ }
576+ }
577+
578+ func (pxy * SudpProxy ) InWorkConn (conn net.Conn , m * msg.StartWorkConn ) {
579+ xl := pxy .xl
580+ xl .Info ("incoming a new work connection for sudp proxy, %s" , conn .RemoteAddr ().String ())
581+
582+ if pxy .limiter != nil {
583+ rwc := frpIo .WrapReadWriteCloser (limit .NewReader (conn , pxy .limiter ), limit .NewWriter (conn , pxy .limiter ), func () error {
584+ return conn .Close ()
585+ })
586+ conn = frpNet .WrapReadWriteCloserToConn (rwc , conn )
587+ }
588+
589+ workConn := conn
590+ readCh := make (chan * msg.UdpPacket , 1024 )
591+ sendCh := make (chan msg.Message , 1024 )
592+ isClose := false
593+
594+ mu := & sync.Mutex {}
595+
596+ closeFn := func () {
597+ mu .Lock ()
598+ defer mu .Unlock ()
599+ if isClose {
600+ return
601+ }
602+
603+ isClose = true
604+ if workConn != nil {
605+ workConn .Close ()
606+ }
607+ close (readCh )
608+ close (sendCh )
609+ }
610+
611+ // udp service <- frpc <- frps <- frpc visitor <- user
612+ workConnReaderFn := func (conn net.Conn , readCh chan * msg.UdpPacket ) {
613+ defer closeFn ()
614+
615+ for {
616+ // first to check sudp proxy is closed or not
617+ select {
618+ case <- pxy .closeCh :
619+ xl .Trace ("frpc sudp proxy is closed" )
620+ return
621+ default :
622+ }
623+
624+ var udpMsg msg.UdpPacket
625+ if errRet := msg .ReadMsgInto (conn , & udpMsg ); errRet != nil {
626+ xl .Warn ("read from workConn for sudp error: %v" , errRet )
627+ return
628+ }
629+
630+ if errRet := errors .PanicToError (func () {
631+ readCh <- & udpMsg
632+ }); errRet != nil {
633+ xl .Warn ("reader goroutine for sudp work connection closed: %v" , errRet )
634+ return
635+ }
636+ }
637+ }
638+
639+ // udp service -> frpc -> frps -> frpc visitor -> user
640+ workConnSenderFn := func (conn net.Conn , sendCh chan msg.Message ) {
641+ defer func () {
642+ closeFn ()
643+ xl .Info ("writer goroutine for sudp work connection closed" )
644+ }()
645+
646+ var errRet error
647+ for rawMsg := range sendCh {
648+ switch m := rawMsg .(type ) {
649+ case * msg.UdpPacket :
650+ xl .Trace ("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]" ,
651+ m .LocalAddr .String (), m .RemoteAddr .String (), conn .LocalAddr ().String (), conn .RemoteAddr ().String ())
652+ case * msg.Ping :
653+ xl .Trace ("frpc send ping message to frpc visitor" )
654+ }
655+
656+ if errRet = msg .WriteMsg (conn , rawMsg ); errRet != nil {
657+ xl .Error ("sudp work write error: %v" , errRet )
658+ return
659+ }
660+ }
661+ }
662+
663+ heartbeatFn := func (conn net.Conn , sendCh chan msg.Message ) {
664+ ticker := time .NewTicker (30 * time .Second )
665+ defer func () {
666+ ticker .Stop ()
667+ closeFn ()
668+ }()
669+
670+ var errRet error
671+ for {
672+ select {
673+ case <- ticker .C :
674+ if errRet = errors .PanicToError (func () {
675+ sendCh <- & msg.Ping {}
676+ }); errRet != nil {
677+ xl .Warn ("heartbeat goroutine for sudp work connection closed" )
678+ return
679+ }
680+ case <- pxy .closeCh :
681+ xl .Trace ("frpc sudp proxy is closed" )
682+ return
683+ }
684+ }
685+ }
686+
687+ go workConnSenderFn (workConn , sendCh )
688+ go workConnReaderFn (workConn , readCh )
689+ go heartbeatFn (workConn , sendCh )
690+
691+ udp .Forwarder (pxy .localAddr , readCh , sendCh )
692+ }
693+
543694// Common handler for tcp work connections.
544695func HandleTcpWorkConnection (ctx context.Context , localInfo * config.LocalSvrConf , proxyPlugin plugin.Plugin ,
545696 baseInfo * config.BaseProxyConf , limiter * rate.Limiter , workConn net.Conn , encKey []byte , m * msg.StartWorkConn ) {
0 commit comments