Skip to content

Commit 2406ecd

Browse files
authored
Merge pull request fatedier#1780 from fatedier/dev
bump version
2 parents 8668fef + 7266154 commit 2406ecd

File tree

28 files changed

+1161
-98
lines changed

28 files changed

+1161
-98
lines changed

client/admin_api.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ type StatusResp struct {
8888
Https []ProxyStatusResp `json:"https"`
8989
Stcp []ProxyStatusResp `json:"stcp"`
9090
Xtcp []ProxyStatusResp `json:"xtcp"`
91+
Sudp []ProxyStatusResp `json:"sudp"`
9192
}
9293

9394
type ProxyStatusResp struct {
@@ -155,6 +156,11 @@ func NewProxyStatusResp(status *proxy.ProxyStatus, serverAddr string) ProxyStatu
155156
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
156157
}
157158
psr.Plugin = cfg.Plugin
159+
case *config.SudpProxyConf:
160+
if cfg.LocalPort != 0 {
161+
psr.LocalAddr = fmt.Sprintf("%s:%d", cfg.LocalIp, cfg.LocalPort)
162+
}
163+
psr.Plugin = cfg.Plugin
158164
}
159165
return psr
160166
}
@@ -171,6 +177,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) {
171177
res.Https = make([]ProxyStatusResp, 0)
172178
res.Stcp = make([]ProxyStatusResp, 0)
173179
res.Xtcp = make([]ProxyStatusResp, 0)
180+
res.Sudp = make([]ProxyStatusResp, 0)
174181

175182
log.Info("Http request [/api/status]")
176183
defer func() {
@@ -194,6 +201,8 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) {
194201
res.Stcp = append(res.Stcp, NewProxyStatusResp(status, svr.cfg.ServerAddr))
195202
case "xtcp":
196203
res.Xtcp = append(res.Xtcp, NewProxyStatusResp(status, svr.cfg.ServerAddr))
204+
case "sudp":
205+
res.Sudp = append(res.Sudp, NewProxyStatusResp(status, svr.cfg.ServerAddr))
197206
}
198207
}
199208
sort.Sort(ByProxyStatusResp(res.Tcp))
@@ -202,6 +211,7 @@ func (svr *Service) apiStatus(w http.ResponseWriter, r *http.Request) {
202211
sort.Sort(ByProxyStatusResp(res.Https))
203212
sort.Sort(ByProxyStatusResp(res.Stcp))
204213
sort.Sort(ByProxyStatusResp(res.Xtcp))
214+
sort.Sort(ByProxyStatusResp(res.Sudp))
205215
return
206216
}
207217

client/proxy/proxy.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,12 @@ func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.Cl
102102
BaseProxy: &baseProxy,
103103
cfg: cfg,
104104
}
105+
case *config.SudpProxyConf:
106+
pxy = &SudpProxy{
107+
BaseProxy: &baseProxy,
108+
cfg: cfg,
109+
closeCh: make(chan struct{}),
110+
}
105111
}
106112
return
107113
}
@@ -540,6 +546,151 @@ func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
540546
udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh)
541547
}
542548

549+
type SudpProxy struct {
550+
*BaseProxy
551+
552+
cfg *config.SudpProxyConf
553+
554+
localAddr *net.UDPAddr
555+
556+
closeCh chan struct{}
557+
}
558+
559+
func (pxy *SudpProxy) Run() (err error) {
560+
pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort))
561+
if err != nil {
562+
return
563+
}
564+
return
565+
}
566+
567+
func (pxy *SudpProxy) Close() {
568+
pxy.mu.Lock()
569+
defer pxy.mu.Unlock()
570+
select {
571+
case <-pxy.closeCh:
572+
return
573+
default:
574+
close(pxy.closeCh)
575+
}
576+
}
577+
578+
func (pxy *SudpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
579+
xl := pxy.xl
580+
xl.Info("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())
581+
582+
if pxy.limiter != nil {
583+
rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
584+
return conn.Close()
585+
})
586+
conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
587+
}
588+
589+
workConn := conn
590+
readCh := make(chan *msg.UdpPacket, 1024)
591+
sendCh := make(chan msg.Message, 1024)
592+
isClose := false
593+
594+
mu := &sync.Mutex{}
595+
596+
closeFn := func() {
597+
mu.Lock()
598+
defer mu.Unlock()
599+
if isClose {
600+
return
601+
}
602+
603+
isClose = true
604+
if workConn != nil {
605+
workConn.Close()
606+
}
607+
close(readCh)
608+
close(sendCh)
609+
}
610+
611+
// udp service <- frpc <- frps <- frpc visitor <- user
612+
workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) {
613+
defer closeFn()
614+
615+
for {
616+
// first to check sudp proxy is closed or not
617+
select {
618+
case <-pxy.closeCh:
619+
xl.Trace("frpc sudp proxy is closed")
620+
return
621+
default:
622+
}
623+
624+
var udpMsg msg.UdpPacket
625+
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
626+
xl.Warn("read from workConn for sudp error: %v", errRet)
627+
return
628+
}
629+
630+
if errRet := errors.PanicToError(func() {
631+
readCh <- &udpMsg
632+
}); errRet != nil {
633+
xl.Warn("reader goroutine for sudp work connection closed: %v", errRet)
634+
return
635+
}
636+
}
637+
}
638+
639+
// udp service -> frpc -> frps -> frpc visitor -> user
640+
workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
641+
defer func() {
642+
closeFn()
643+
xl.Info("writer goroutine for sudp work connection closed")
644+
}()
645+
646+
var errRet error
647+
for rawMsg := range sendCh {
648+
switch m := rawMsg.(type) {
649+
case *msg.UdpPacket:
650+
xl.Trace("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]",
651+
m.LocalAddr.String(), m.RemoteAddr.String(), conn.LocalAddr().String(), conn.RemoteAddr().String())
652+
case *msg.Ping:
653+
xl.Trace("frpc send ping message to frpc visitor")
654+
}
655+
656+
if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
657+
xl.Error("sudp work write error: %v", errRet)
658+
return
659+
}
660+
}
661+
}
662+
663+
heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
664+
ticker := time.NewTicker(30 * time.Second)
665+
defer func() {
666+
ticker.Stop()
667+
closeFn()
668+
}()
669+
670+
var errRet error
671+
for {
672+
select {
673+
case <-ticker.C:
674+
if errRet = errors.PanicToError(func() {
675+
sendCh <- &msg.Ping{}
676+
}); errRet != nil {
677+
xl.Warn("heartbeat goroutine for sudp work connection closed")
678+
return
679+
}
680+
case <-pxy.closeCh:
681+
xl.Trace("frpc sudp proxy is closed")
682+
return
683+
}
684+
}
685+
}
686+
687+
go workConnSenderFn(workConn, sendCh)
688+
go workConnReaderFn(workConn, readCh)
689+
go heartbeatFn(workConn, sendCh)
690+
691+
udp.Forwarder(pxy.localAddr, readCh, sendCh)
692+
}
693+
543694
// Common handler for tcp work connections.
544695
func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
545696
baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {

0 commit comments

Comments
 (0)