package rpc import ( "context" "crypto/tls" "encoding/base64" "errors" "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeDNS/internal/configs" teaconst "github.com/TeaOSLab/EdgeDNS/internal/const" "github.com/TeaOSLab/EdgeDNS/internal/encrypt" "github.com/TeaOSLab/EdgeDNS/internal/utils" "github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/rands" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "net/url" "sync" "time" ) // RPCClient RPC客户端 type RPCClient struct { apiConfig *configs.APIConfig conns []*grpc.ClientConn locker sync.RWMutex APINodeRPC pb.APINodeServiceClient NodeLogRPC pb.NodeLogServiceClient NSNodeRPC pb.NSNodeServiceClient NSRecordRPC pb.NSRecordServiceClient NSDomainRPC pb.NSDomainServiceClient NSRouteRPC pb.NSRouteServiceClient NSKeyRPC pb.NSKeyServiceClient NSAccessLogRPC pb.NSAccessLogServiceClient NSRecordHourlyStatRPC pb.NSRecordHourlyStatServiceClient NSQuestionOptionRPC pb.NSQuestionOptionServiceClient NodeValueRPC pb.NodeValueServiceClient NodeTaskRPC pb.NodeTaskServiceClient RegionCountryRPC pb.RegionCountryServiceClient RegionProvinceRPC pb.RegionProvinceServiceClient RegionCityRPC pb.RegionCityServiceClient RegionProviderRPC pb.RegionProviderServiceClient IPLibraryArtifactRPC pb.IPLibraryArtifactServiceClient FileChunkRPC pb.FileChunkServiceClient ClientAgentIPRPC pb.ClientAgentIPServiceClient } // NewRPCClient 构造新的RPC客户端 func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { if apiConfig == nil { return nil, errors.New("api config should not be nil") } var client = &RPCClient{ apiConfig: apiConfig, } // 初始化RPC实例 client.APINodeRPC = pb.NewAPINodeServiceClient(client) client.NodeLogRPC = pb.NewNodeLogServiceClient(client) client.NSNodeRPC = pb.NewNSNodeServiceClient(client) client.NSRecordRPC = pb.NewNSRecordServiceClient(client) client.NSDomainRPC = pb.NewNSDomainServiceClient(client) client.NSRouteRPC = pb.NewNSRouteServiceClient(client) client.NSKeyRPC = pb.NewNSKeyServiceClient(client) client.NSAccessLogRPC = pb.NewNSAccessLogServiceClient(client) client.NSRecordHourlyStatRPC = pb.NewNSRecordHourlyStatServiceClient(client) client.NSQuestionOptionRPC = pb.NewNSQuestionOptionServiceClient(client) client.NodeValueRPC = pb.NewNodeValueServiceClient(client) client.NodeTaskRPC = pb.NewNodeTaskServiceClient(client) client.RegionCountryRPC = pb.NewRegionCountryServiceClient(client) client.RegionProvinceRPC = pb.NewRegionProvinceServiceClient(client) client.RegionCityRPC = pb.NewRegionCityServiceClient(client) client.RegionProviderRPC = pb.NewRegionProviderServiceClient(client) client.IPLibraryArtifactRPC = pb.NewIPLibraryArtifactServiceClient(client) client.FileChunkRPC = pb.NewFileChunkServiceClient(client) client.ClientAgentIPRPC = pb.NewClientAgentIPServiceClient(client) err := client.init() if err != nil { return nil, err } return client, nil } // Context 构造用户上下文 func (this *RPCClient) Context() context.Context { ctx := context.Background() m := maps.Map{ "timestamp": time.Now().Unix(), "type": "dns", "userId": 0, } method, err := encrypt.NewMethodInstance(teaconst.EncryptMethod, this.apiConfig.Secret, this.apiConfig.NodeId) if err != nil { utils.PrintError(err) return context.Background() } data, err := method.Encrypt(m.AsJSON()) if err != nil { utils.PrintError(err) return context.Background() } token := base64.StdEncoding.EncodeToString(data) ctx = metadata.AppendToOutgoingContext(ctx, "nodeId", this.apiConfig.NodeId, "token", token) return ctx } // UpdateConfig 修改配置 func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error { this.apiConfig = config this.locker.Lock() err := this.init() this.locker.Unlock() return err } // TestEndpoints 测试Endpoints是否可用 func (this *RPCClient) TestEndpoints(endpoints []string) bool { if len(endpoints) == 0 { return false } var wg = sync.WaitGroup{} wg.Add(len(endpoints)) var ok = false for _, endpoint := range endpoints { go func(endpoint string) { defer wg.Done() u, err := url.Parse(endpoint) if err != nil { return } ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) defer func() { cancelFunc() }() var conn *grpc.ClientConn if u.Scheme == "http" { conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) } else if u.Scheme == "https" { conn, err = grpc.DialContext(ctx, u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ InsecureSkipVerify: true, })), grpc.WithBlock()) } else { return } if err != nil { return } if conn == nil { return } defer func() { _ = conn.Close() }() var pingService = pb.NewPingServiceClient(conn) _, err = pingService.Ping(this.Context(), &pb.PingRequest{}) if err != nil { return } ok = true }(endpoint) } wg.Wait() return ok } // 初始化 func (this *RPCClient) init() error { // 重新连接 conns := []*grpc.ClientConn{} for _, endpoint := range this.apiConfig.RPCEndpoints { u, err := url.Parse(endpoint) if err != nil { return fmt.Errorf("parse endpoint failed: %w", err) } var conn *grpc.ClientConn var callOptions = grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(128<<20), grpc.MaxCallSendMsgSize(128<<20), grpc.UseCompressor(gzip.Name)) var keepaliveParams = grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 30 * time.Second, }) if u.Scheme == "http" { conn, err = grpc.Dial(u.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), callOptions, keepaliveParams) } else if u.Scheme == "https" { conn, err = grpc.Dial(u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ InsecureSkipVerify: true, })), callOptions, keepaliveParams) } else { return errors.New("parse endpoint failed: invalid scheme '" + u.Scheme + "'") } if err != nil { return err } conns = append(conns, conn) } if len(conns) == 0 { return errors.New("[RPC]no available endpoints") } // 这里不需要加锁,因为会和pickConn冲突 this.conns = conns return nil } // 随机选择一个连接 func (this *RPCClient) pickConn() *grpc.ClientConn { this.locker.Lock() defer this.locker.Unlock() // 检查连接状态 var countConns = len(this.conns) if countConns > 0 { if countConns == 1 { return this.conns[0] } for _, state := range []connectivity.State{ connectivity.Ready, connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure, } { var availableConns = []*grpc.ClientConn{} for _, conn := range this.conns { if conn.GetState() == state { availableConns = append(availableConns, conn) } } if len(availableConns) > 0 { return this.randConn(availableConns) } } } return this.randConn(this.conns) } func (this *RPCClient) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error { var conn = this.pickConn() if conn == nil { return errors.New("can not get available grpc connection") } return conn.Invoke(ctx, method, args, reply, opts...) } func (this *RPCClient) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { var conn = this.pickConn() if conn == nil { return nil, errors.New("can not get available grpc connection") } return conn.NewStream(ctx, desc, method, opts...) } func (this *RPCClient) randConn(conns []*grpc.ClientConn) *grpc.ClientConn { var l = len(conns) if l == 0 { return nil } if l == 1 { return conns[0] } return conns[rands.Int(0, l-1)] }