package installers import ( "encoding/json" "errors" "fmt" "net" "strconv" "strings" "time" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/sslconfigs" "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/maps" ) var sharedHTTPDNSNodeQueue = NewHTTPDNSNodeQueue() type HTTPDNSNodeQueue struct{} func NewHTTPDNSNodeQueue() *HTTPDNSNodeQueue { return &HTTPDNSNodeQueue{} } func SharedHTTPDNSNodeQueue() *HTTPDNSNodeQueue { return sharedHTTPDNSNodeQueue } // InstallNodeProcess 鍦ㄧ嚎瀹夎 HTTPDNS 鑺傜偣娴佺▼鎺у埗 func (q *HTTPDNSNodeQueue) InstallNodeProcess(nodeId int64, isUpgrading bool) error { installStatus := models.NewNodeInstallStatus() installStatus.IsRunning = true installStatus.IsFinished = false installStatus.IsOk = false installStatus.Error = "" installStatus.ErrorCode = "" installStatus.UpdatedAt = time.Now().Unix() err := models.SharedHTTPDNSNodeDAO.UpdateNodeInstallStatus(nil, nodeId, installStatus) if err != nil { return err } ticker := utils.NewTicker(3 * time.Second) goman.New(func() { for ticker.Wait() { installStatus.UpdatedAt = time.Now().Unix() updateErr := models.SharedHTTPDNSNodeDAO.UpdateNodeInstallStatus(nil, nodeId, installStatus) if updateErr != nil { logs.Println("[HTTPDNS_INSTALL]" + updateErr.Error()) } } }) defer ticker.Stop() err = q.InstallNode(nodeId, installStatus, isUpgrading) installStatus.IsRunning = false installStatus.IsFinished = true if err != nil { installStatus.IsOk = false installStatus.Error = err.Error() } else { installStatus.IsOk = true } installStatus.UpdatedAt = time.Now().Unix() updateErr := models.SharedHTTPDNSNodeDAO.UpdateNodeInstallStatus(nil, nodeId, installStatus) if updateErr != nil { return updateErr } if installStatus.IsOk { return models.SharedHTTPDNSNodeDAO.UpdateNodeIsInstalled(nil, nodeId, true) } return nil } // InstallNode 鍦ㄧ嚎瀹夎 HTTPDNS 鑺傜偣 func (q *HTTPDNSNodeQueue) InstallNode(nodeId int64, installStatus *models.NodeInstallStatus, isUpgrading bool) error { node, err := models.SharedHTTPDNSNodeDAO.FindEnabledNode(nil, nodeId) if err != nil { return err } if node == nil { return errors.New("can not find node, ID '" + numberutils.FormatInt64(nodeId) + "'") } cluster, err := models.SharedHTTPDNSClusterDAO.FindEnabledCluster(nil, int64(node.ClusterId)) if err != nil { return err } if cluster == nil { return errors.New("can not find cluster") } sshHost, sshPort, grantId, err := q.parseSSHInfo(node) if err != nil { installStatus.ErrorCode = "EMPTY_SSH" return err } grant, err := models.SharedNodeGrantDAO.FindEnabledNodeGrant(nil, grantId) if err != nil { return err } if grant == nil { installStatus.ErrorCode = "EMPTY_GRANT" return errors.New("can not find user grant with id '" + numberutils.FormatInt64(grantId) + "'") } apiNodes, err := models.SharedAPINodeDAO.FindAllEnabledAndOnAPINodes(nil) if err != nil { return err } if len(apiNodes) == 0 { return errors.New("no available api nodes") } apiEndpoints := make([]string, 0, 8) for _, apiNode := range apiNodes { addrConfigs, decodeErr := apiNode.DecodeAccessAddrs() if decodeErr != nil { return fmt.Errorf("decode api node access addresses failed: %w", decodeErr) } for _, addrConfig := range addrConfigs { apiEndpoints = append(apiEndpoints, addrConfig.FullAddresses()...) } } if len(apiEndpoints) == 0 { return errors.New("no available api endpoints") } tlsCertData, tlsKeyData, err := q.resolveClusterTLSCertPair(cluster) if err != nil { installStatus.ErrorCode = "EMPTY_TLS_CERT" return err } httpdnsListenAddr, err := q.resolveClusterTLSListenAddr(cluster) if err != nil { installStatus.ErrorCode = "INVALID_TLS_LISTEN" return err } params := &NodeParams{ Endpoints: apiEndpoints, NodeId: node.UniqueId, Secret: node.Secret, TLSCertData: tlsCertData, TLSKeyData: tlsKeyData, HTTPDNSListenAddr: httpdnsListenAddr, IsUpgrading: isUpgrading, } installer := &HTTPDNSNodeInstaller{} err = installer.Login(&Credentials{ Host: sshHost, Port: sshPort, Username: grant.Username, Password: grant.Password, PrivateKey: grant.PrivateKey, Passphrase: grant.Passphrase, Method: grant.Method, Sudo: grant.Su == 1, }) if err != nil { installStatus.ErrorCode = "SSH_LOGIN_FAILED" return err } defer func() { _ = installer.Close() }() installDir := node.InstallDir if len(installDir) == 0 { if cluster != nil && len(cluster.InstallDir) > 0 { installDir = cluster.InstallDir } if len(installDir) == 0 { installDir = installer.client.UserHome() + "/edge-httpdns" } } return installer.Install(installDir, params, installStatus) } func (q *HTTPDNSNodeQueue) resolveClusterTLSCertPair(cluster *models.HTTPDNSCluster) ([]byte, []byte, error) { if cluster == nil { return nil, nil, errors.New("cluster not found") } if len(cluster.TLSPolicy) == 0 { return nil, nil, errors.New("cluster tls policy is empty") } tlsConfig := map[string]json.RawMessage{} if err := json.Unmarshal(cluster.TLSPolicy, &tlsConfig); err != nil { return nil, nil, fmt.Errorf("decode cluster tls policy failed: %w", err) } sslPolicyData := tlsConfig["sslPolicy"] if len(sslPolicyData) == 0 { // Compatible with old data where TLSPolicy stores SSLPolicy directly. sslPolicyData = json.RawMessage(cluster.TLSPolicy) } sslPolicy := &sslconfigs.SSLPolicy{} if err := json.Unmarshal(sslPolicyData, sslPolicy); err != nil { return nil, nil, fmt.Errorf("decode ssl policy failed: %w", err) } for _, cert := range sslPolicy.Certs { if cert == nil { continue } if len(cert.CertData) > 0 && len(cert.KeyData) > 0 { return cert.CertData, cert.KeyData, nil } } for _, certRef := range sslPolicy.CertRefs { if certRef == nil || certRef.CertId <= 0 { continue } certConfig, err := models.SharedSSLCertDAO.ComposeCertConfig(nil, certRef.CertId, false, nil, nil) if err != nil { return nil, nil, fmt.Errorf("load ssl cert %d failed: %w", certRef.CertId, err) } if certConfig == nil { continue } if len(certConfig.CertData) > 0 && len(certConfig.KeyData) > 0 { return certConfig.CertData, certConfig.KeyData, nil } } if sslPolicy.Id > 0 { policyConfig, err := models.SharedSSLPolicyDAO.ComposePolicyConfig(nil, sslPolicy.Id, false, nil, nil) if err != nil { return nil, nil, fmt.Errorf("load ssl policy %d failed: %w", sslPolicy.Id, err) } if policyConfig != nil { for _, cert := range policyConfig.Certs { if cert == nil { continue } if len(cert.CertData) > 0 && len(cert.KeyData) > 0 { return cert.CertData, cert.KeyData, nil } } } } return nil, nil, errors.New("cluster tls certificate is not configured") } func (q *HTTPDNSNodeQueue) resolveClusterTLSListenAddr(cluster *models.HTTPDNSCluster) (string, error) { const defaultListenAddr = ":443" if cluster == nil || len(cluster.TLSPolicy) == 0 { return defaultListenAddr, nil } tlsConfig, err := serverconfigs.NewTLSProtocolConfigFromJSON(cluster.TLSPolicy) if err != nil { return "", fmt.Errorf("decode cluster tls listen failed: %w", err) } for _, listen := range tlsConfig.Listen { if listen == nil { continue } if err := listen.Init(); err != nil { return "", fmt.Errorf("invalid cluster tls listen address '%s': %w", listen.PortRange, err) } if listen.MinPort <= 0 { continue } host := strings.TrimSpace(listen.Host) return net.JoinHostPort(host, strconv.Itoa(listen.MinPort)), nil } return defaultListenAddr, nil } func (q *HTTPDNSNodeQueue) parseSSHInfo(node *models.HTTPDNSNode) (string, int, int64, error) { if node == nil { return "", 0, 0, errors.New("node should not be nil") } if len(node.InstallStatus) == 0 { return "", 0, 0, errors.New("ssh config should not be empty") } statusMap := maps.Map{} err := json.Unmarshal(node.InstallStatus, &statusMap) if err != nil { return "", 0, 0, errors.New("invalid install status data") } sshMap := statusMap.GetMap("ssh") if sshMap == nil { return "", 0, 0, errors.New("ssh config should not be empty") } host := sshMap.GetString("host") port := sshMap.GetInt("port") grantId := sshMap.GetInt64("grantId") if len(host) == 0 { return "", 0, 0, errors.New("ssh host should not be empty") } if port <= 0 { port = 22 } if grantId <= 0 { return "", 0, 0, errors.New("grant id should not be empty") } return host, port, grantId, nil }