package installers import ( "encoding/json" "errors" "fmt" "net" "strconv" "strings" "time" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/sslconfigs" "github.com/iwind/TeaGo/logs" ) var sharedHTTPDNSNodeQueue = NewHTTPDNSNodeQueue() type HTTPDNSNodeQueue struct{} func NewHTTPDNSNodeQueue() *HTTPDNSNodeQueue { return &HTTPDNSNodeQueue{} } func SharedHTTPDNSNodeQueue() *HTTPDNSNodeQueue { return sharedHTTPDNSNodeQueue } // InstallNodeProcess 鍦ㄧ嚎瀹夎 HTTPDNS 鑺傜偣娴佺▼鎺у埗 func (q *HTTPDNSNodeQueue) InstallNodeProcess(nodeId int64, isUpgrading bool) error { installStatus := models.NewNodeInstallStatus() installStatus.IsRunning = true installStatus.IsFinished = false installStatus.IsOk = false installStatus.Error = "" installStatus.ErrorCode = "" installStatus.UpdatedAt = time.Now().Unix() err := models.SharedHTTPDNSNodeDAO.UpdateNodeInstallStatus(nil, nodeId, installStatus) if err != nil { return err } ticker := utils.NewTicker(3 * time.Second) goman.New(func() { for ticker.Wait() { installStatus.UpdatedAt = time.Now().Unix() updateErr := models.SharedHTTPDNSNodeDAO.UpdateNodeInstallStatus(nil, nodeId, installStatus) if updateErr != nil { logs.Println("[HTTPDNS_INSTALL]" + updateErr.Error()) } } }) defer ticker.Stop() err = q.InstallNode(nodeId, installStatus, isUpgrading) installStatus.IsRunning = false installStatus.IsFinished = true if err != nil { installStatus.IsOk = false installStatus.Error = err.Error() } else { installStatus.IsOk = true } installStatus.UpdatedAt = time.Now().Unix() updateErr := models.SharedHTTPDNSNodeDAO.UpdateNodeInstallStatus(nil, nodeId, installStatus) if updateErr != nil { return updateErr } if installStatus.IsOk { return models.SharedHTTPDNSNodeDAO.UpdateNodeIsInstalled(nil, nodeId, true) } return nil } // InstallNode 鍦ㄧ嚎瀹夎 HTTPDNS 鑺傜偣 func (q *HTTPDNSNodeQueue) InstallNode(nodeId int64, installStatus *models.NodeInstallStatus, isUpgrading bool) error { node, err := models.SharedHTTPDNSNodeDAO.FindEnabledNode(nil, nodeId) if err != nil { return err } if node == nil { return errors.New("can not find node, ID '" + numberutils.FormatInt64(nodeId) + "'") } cluster, err := models.SharedHTTPDNSClusterDAO.FindEnabledCluster(nil, int64(node.ClusterId)) if err != nil { return err } if cluster == nil { return errors.New("can not find cluster") } sshHost, sshPort, grantId, err := q.parseSSHInfo(node, installStatus) if err != nil { return err } grant, err := models.SharedNodeGrantDAO.FindEnabledNodeGrant(nil, grantId) if err != nil { return err } if grant == nil { installStatus.ErrorCode = "EMPTY_GRANT" return errors.New("can not find user grant with id '" + numberutils.FormatInt64(grantId) + "'") } apiNodes, err := models.SharedAPINodeDAO.FindAllEnabledAndOnAPINodes(nil) if err != nil { return err } if len(apiNodes) == 0 { return errors.New("no available api nodes") } apiEndpoints := make([]string, 0, 8) for _, apiNode := range apiNodes { addrConfigs, decodeErr := apiNode.DecodeAccessAddrs() if decodeErr != nil { return fmt.Errorf("decode api node access addresses failed: %w", decodeErr) } for _, addrConfig := range addrConfigs { apiEndpoints = append(apiEndpoints, addrConfig.FullAddresses()...) } } if len(apiEndpoints) == 0 { return errors.New("no available api endpoints") } tlsCertData, tlsKeyData, err := q.resolveClusterTLSCertPair(cluster) if err != nil { installStatus.ErrorCode = "EMPTY_TLS_CERT" return err } httpdnsListenAddr, err := q.resolveClusterTLSListenAddr(cluster) if err != nil { installStatus.ErrorCode = "INVALID_TLS_LISTEN" return err } params := &NodeParams{ Endpoints: apiEndpoints, NodeId: node.UniqueId, Secret: node.Secret, TLSCertData: tlsCertData, TLSKeyData: tlsKeyData, HTTPDNSListenAddr: httpdnsListenAddr, IsUpgrading: isUpgrading, } installer := &HTTPDNSNodeInstaller{} err = installer.Login(&Credentials{ Host: sshHost, Port: sshPort, Username: grant.Username, Password: grant.Password, PrivateKey: grant.PrivateKey, Passphrase: grant.Passphrase, Method: grant.Method, Sudo: grant.Su == 1, }) if err != nil { installStatus.ErrorCode = "SSH_LOGIN_FAILED" return err } defer func() { _ = installer.Close() }() installDir := node.InstallDir if len(installDir) == 0 { if cluster != nil && len(cluster.InstallDir) > 0 { installDir = cluster.InstallDir } if len(installDir) == 0 { installDir = installer.client.UserHome() + "/edge-httpdns" } } return installer.Install(installDir, params, installStatus) } // StartNode 启动HTTPDNS节点 func (q *HTTPDNSNodeQueue) StartNode(nodeId int64) error { node, err := models.SharedHTTPDNSNodeDAO.FindEnabledNode(nil, nodeId) if err != nil { return err } if node == nil { return errors.New("can not find node, ID '" + numberutils.FormatInt64(nodeId) + "'") } // 登录信息 login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(nil, nodeconfigs.NodeRoleHTTPDNS, nodeId) if err != nil { return err } if login == nil { return newGrantError("can not find node login information") } loginParams, err := login.DecodeSSHParams() if err != nil { return newGrantError(err.Error()) } if len(strings.TrimSpace(loginParams.Host)) == 0 { return newGrantError("ssh host should not be empty") } if loginParams.Port <= 0 { loginParams.Port = 22 } if loginParams.GrantId <= 0 { return newGrantError("can not find node grant") } grant, err := models.SharedNodeGrantDAO.FindEnabledNodeGrant(nil, loginParams.GrantId) if err != nil { return err } if grant == nil { return newGrantError("can not find user grant with id '" + numberutils.FormatInt64(loginParams.GrantId) + "'") } installer := &HTTPDNSNodeInstaller{} err = installer.Login(&Credentials{ Host: strings.TrimSpace(loginParams.Host), Port: loginParams.Port, Username: grant.Username, Password: grant.Password, PrivateKey: grant.PrivateKey, Passphrase: grant.Passphrase, Method: grant.Method, Sudo: grant.Su == 1, }) if err != nil { return err } defer func() { _ = installer.Close() }() installDir := strings.TrimSpace(node.InstallDir) if len(installDir) == 0 { cluster, err := models.SharedHTTPDNSClusterDAO.FindEnabledCluster(nil, int64(node.ClusterId)) if err != nil { return err } if cluster == nil { return errors.New("can not find cluster, ID '" + numberutils.FormatInt64(int64(node.ClusterId)) + "'") } installDir = strings.TrimSpace(cluster.InstallDir) if len(installDir) == 0 { installDir = installer.client.UserHome() + "/edge-httpdns" } } _, appDir := resolveHTTPDNSInstallPaths(installDir) exeFile := appDir + "/bin/edge-httpdns" _, err = installer.client.Stat(exeFile) if err != nil { return errors.New("httpdns node is not installed correctly, can not find executable file: " + exeFile) } // 先尝试 systemd 拉起 _, _, _ = installer.client.Exec("/usr/bin/systemctl start edge-httpdns") _, stderr, err := installer.client.Exec(exeFile + " start") if err != nil { return fmt.Errorf("start failed: %w", err) } if len(strings.TrimSpace(stderr)) > 0 { return errors.New("start failed: " + strings.TrimSpace(stderr)) } return nil } func (q *HTTPDNSNodeQueue) resolveClusterTLSCertPair(cluster *models.HTTPDNSCluster) ([]byte, []byte, error) { if cluster == nil { return nil, nil, errors.New("cluster not found") } if len(cluster.TLSPolicy) == 0 { return nil, nil, errors.New("cluster tls policy is empty") } tlsConfig := map[string]json.RawMessage{} if err := json.Unmarshal(cluster.TLSPolicy, &tlsConfig); err != nil { return nil, nil, fmt.Errorf("decode cluster tls policy failed: %w", err) } sslPolicyData := tlsConfig["sslPolicy"] if len(sslPolicyData) == 0 { // Compatible with old data where TLSPolicy stores SSLPolicy directly. sslPolicyData = json.RawMessage(cluster.TLSPolicy) } sslPolicy := &sslconfigs.SSLPolicy{} if err := json.Unmarshal(sslPolicyData, sslPolicy); err != nil { return nil, nil, fmt.Errorf("decode ssl policy failed: %w", err) } for _, cert := range sslPolicy.Certs { if cert == nil { continue } if len(cert.CertData) > 0 && len(cert.KeyData) > 0 { return cert.CertData, cert.KeyData, nil } } for _, certRef := range sslPolicy.CertRefs { if certRef == nil || certRef.CertId <= 0 { continue } certConfig, err := models.SharedSSLCertDAO.ComposeCertConfig(nil, certRef.CertId, false, nil, nil) if err != nil { return nil, nil, fmt.Errorf("load ssl cert %d failed: %w", certRef.CertId, err) } if certConfig == nil { continue } if len(certConfig.CertData) > 0 && len(certConfig.KeyData) > 0 { return certConfig.CertData, certConfig.KeyData, nil } } if sslPolicy.Id > 0 { policyConfig, err := models.SharedSSLPolicyDAO.ComposePolicyConfig(nil, sslPolicy.Id, false, nil, nil) if err != nil { return nil, nil, fmt.Errorf("load ssl policy %d failed: %w", sslPolicy.Id, err) } if policyConfig != nil { for _, cert := range policyConfig.Certs { if cert == nil { continue } if len(cert.CertData) > 0 && len(cert.KeyData) > 0 { return cert.CertData, cert.KeyData, nil } } } } return nil, nil, errors.New("cluster tls certificate is not configured") } func (q *HTTPDNSNodeQueue) resolveClusterTLSListenAddr(cluster *models.HTTPDNSCluster) (string, error) { const defaultListenAddr = ":443" if cluster == nil || len(cluster.TLSPolicy) == 0 { return defaultListenAddr, nil } tlsConfig, err := serverconfigs.NewTLSProtocolConfigFromJSON(cluster.TLSPolicy) if err != nil { return "", fmt.Errorf("decode cluster tls listen failed: %w", err) } for _, listen := range tlsConfig.Listen { if listen == nil { continue } if err := listen.Init(); err != nil { return "", fmt.Errorf("invalid cluster tls listen address '%s': %w", listen.PortRange, err) } if listen.MinPort <= 0 { continue } host := strings.TrimSpace(listen.Host) return net.JoinHostPort(host, strconv.Itoa(listen.MinPort)), nil } return defaultListenAddr, nil } func (q *HTTPDNSNodeQueue) parseSSHInfo(node *models.HTTPDNSNode, installStatus *models.NodeInstallStatus) (string, int, int64, error) { if node == nil { return "", 0, 0, errors.New("node should not be nil") } login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(nil, nodeconfigs.NodeRoleHTTPDNS, int64(node.Id)) if err != nil { return "", 0, 0, err } if login == nil { installStatus.ErrorCode = "EMPTY_SSH" return "", 0, 0, errors.New("ssh login not found for node '" + numberutils.FormatInt64(int64(node.Id)) + "'") } sshParams, err := login.DecodeSSHParams() if err != nil { installStatus.ErrorCode = "EMPTY_SSH" return "", 0, 0, err } if len(sshParams.Host) == 0 { installStatus.ErrorCode = "EMPTY_SSH_HOST" return "", 0, 0, errors.New("ssh host should not be empty") } if sshParams.Port <= 0 { sshParams.Port = 22 } if sshParams.GrantId <= 0 { installStatus.ErrorCode = "EMPTY_GRANT" return "", 0, 0, errors.New("grant id should not be empty") } return sshParams.Host, sshParams.Port, sshParams.GrantId, nil }