package installers import ( "crypto/sha256" "encoding/json" "errors" "fmt" "net/url" "os" slashpath "path" "path/filepath" "sort" "strconv" "strings" "time" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs" "github.com/iwind/TeaGo/Tea" ) const ( fluentBitConfigDir = "/etc/fluent-bit" fluentBitStorageDir = "/var/lib/fluent-bit/storage" fluentBitMainConfigFile = "/etc/fluent-bit/fluent-bit.conf" fluentBitParsersFile = "/etc/fluent-bit/parsers.conf" fluentBitManagedMetaFile = "/etc/fluent-bit/.edge-managed.json" fluentBitManagedEnvFile = "/etc/fluent-bit/.edge-managed.env" fluentBitDropInDir = "/etc/systemd/system/fluent-bit.service.d" fluentBitDropInFile = "/etc/systemd/system/fluent-bit.service.d/edge-managed.conf" fluentBitServiceName = "fluent-bit" fluentBitDefaultBinPath = "/opt/fluent-bit/bin/fluent-bit" fluentBitLocalPackagesRoot = "packages" fluentBitHTTPPathPattern = "/var/log/edge/edge-node/*.log" fluentBitDNSPathPattern = "/var/log/edge/edge-dns/*.log" fluentBitHTTPDNSPathPattern = "/var/log/edge/edge-httpdns/*.log" fluentBitManagedMarker = "managed-by-edgeapi" fluentBitRoleNode = "node" fluentBitRoleDNS = "dns" fluentBitRoleHTTPDNS = "httpdns" ) var errFluentBitLocalPackageNotFound = errors.New("fluent-bit local package not found") var fluentBitPackageFileMapping = map[string]string{ "ubuntu22.04-amd64": "fluent-bit_4.2.2_amd64.deb", "ubuntu22.04-arm64": "fluent-bit_4.2.2_arm64.deb", "amzn2023-amd64": "fluent-bit-4.2.2-1.x86_64.rpm", "amzn2023-arm64": "fluent-bit-4.2.2-1.aarch64.rpm", } type fluentBitManagedMeta struct { Roles []string `json:"roles"` Hash string `json:"hash"` UpdatedAt int64 `json:"updatedAt"` SourceVersion string `json:"sourceVersion"` } type fluentBitDesiredConfig struct { Roles []string ClickHouse *systemconfigs.ClickHouseSetting HTTPPathPattern string DNSPathPattern string HTTPDNSPathPattern string } // SetupFluentBit 安装并托管 Fluent Bit 配置(离线包 + 平台渲染配置)。 func (this *BaseInstaller) SetupFluentBit(role nodeconfigs.NodeRole) error { if this.client == nil { return errors.New("ssh client is nil") } uname := this.uname() if !strings.Contains(uname, "Linux") { return nil } tempDir := strings.TrimRight(this.client.UserHome(), "/") + "/.edge-fluent-bit" _, _, _ = this.client.Exec("mkdir -p " + shQuote(tempDir)) defer func() { _, _, _ = this.client.Exec("rm -rf " + shQuote(tempDir)) }() if err := this.ensureFluentBitInstalled(tempDir); err != nil { return err } _, stderr, err := this.client.Exec("mkdir -p " + shQuote(fluentBitConfigDir) + " " + shQuote(fluentBitStorageDir)) if err != nil { return fmt.Errorf("prepare fluent-bit directories failed: %w, stderr: %s", err, stderr) } parserContent, err := this.readLocalParsersContent() if err != nil { return err } existingMeta, err := this.readManagedMeta() if err != nil { return err } mergedRoles, err := mergeManagedRoles(existingMeta, role) if err != nil { return err } desiredConfig, err := this.buildDesiredFluentBitConfig(mergedRoles) if err != nil { return err } configChanged, err := this.applyManagedConfig(tempDir, desiredConfig, parserContent, existingMeta) if err != nil { return err } binPath, err := this.lookupFluentBitBinPath() if err != nil { return err } if err := this.ensureFluentBitService(tempDir, binPath, configChanged); err != nil { return err } return nil } func (this *BaseInstaller) ensureFluentBitInstalled(tempDir string) error { binPath, _ := this.lookupFluentBitBinPath() if binPath != "" { return nil } platformKey, packageName, arch, err := this.detectRemotePlatformAndPackage() if err != nil { return fmt.Errorf("detect fluent-bit platform failed: %w", err) } if err := this.installFluentBitFromLocalPackage(tempDir, arch, packageName); err != nil { if errors.Is(err, errFluentBitLocalPackageNotFound) { expectedPath := filepath.Join("deploy", "fluent-bit", fluentBitLocalPackagesRoot, "linux-"+arch, packageName) return fmt.Errorf("install fluent-bit failed: local package missing for platform '%s', expected '%s'", platformKey, expectedPath) } return fmt.Errorf("install fluent-bit from local package failed: %w", err) } binPath, err = this.lookupFluentBitBinPath() if err != nil { return err } if binPath == "" { return errors.New("fluent-bit binary not found after local package install") } _, stderr, err := this.client.Exec(binPath + " --version") if err != nil { return fmt.Errorf("verify fluent-bit version failed: %w, stderr: %s", err, stderr) } return nil } func (this *BaseInstaller) installFluentBitFromLocalPackage(tempDir string, arch string, packageName string) error { packageDir := filepath.Join(Tea.Root, "deploy", "fluent-bit", fluentBitLocalPackagesRoot, "linux-"+arch) localPackagePath := filepath.Join(packageDir, packageName) if _, err := os.Stat(localPackagePath); err != nil { if os.IsNotExist(err) { return errFluentBitLocalPackageNotFound } return fmt.Errorf("check local package failed: %w", err) } remotePackagePath := tempDir + "/" + filepath.Base(localPackagePath) if err := this.client.Copy(localPackagePath, remotePackagePath, 0644); err != nil { return fmt.Errorf("upload local package failed: %w", err) } var installCmd string lowerName := strings.ToLower(localPackagePath) switch { case strings.HasSuffix(lowerName, ".deb"): installCmd = "dpkg -i " + shQuote(remotePackagePath) case strings.HasSuffix(lowerName, ".rpm"): installCmd = "rpm -Uvh --force " + shQuote(remotePackagePath) + " || rpm -ivh --force " + shQuote(remotePackagePath) case strings.HasSuffix(lowerName, ".tar.gz") || strings.HasSuffix(lowerName, ".tgz"): extractDir := tempDir + "/extract" installCmd = "rm -rf " + shQuote(extractDir) + "; mkdir -p " + shQuote(extractDir) + "; tar -xzf " + shQuote(remotePackagePath) + " -C " + shQuote(extractDir) + "; " + "bin=$(find " + shQuote(extractDir) + " -type f -name fluent-bit | head -n 1); " + "if [ -z \"$bin\" ]; then exit 3; fi; " + "mkdir -p /opt/fluent-bit/bin /usr/local/bin; " + "install -m 0755 \"$bin\" /opt/fluent-bit/bin/fluent-bit; " + "ln -sf /opt/fluent-bit/bin/fluent-bit /usr/local/bin/fluent-bit" default: return fmt.Errorf("unsupported local package format: %s", packageName) } _, stderr, err := this.client.Exec(installCmd) if err != nil { return fmt.Errorf("install fluent-bit local package '%s' failed: %w, stderr: %s", filepath.Base(localPackagePath), err, stderr) } return nil } func (this *BaseInstaller) detectRemotePlatformAndPackage() (platformKey string, packageName string, arch string, err error) { arch, err = this.detectRemoteLinuxArch() if err != nil { return "", "", "", err } releaseData, stderr, err := this.client.Exec("cat /etc/os-release") if err != nil { return "", "", "", fmt.Errorf("read /etc/os-release failed: %w, stderr: %s", err, stderr) } if strings.TrimSpace(releaseData) == "" { return "", "", "", errors.New("/etc/os-release is empty") } releaseMap := parseOSRelease(releaseData) id := strings.ToLower(strings.TrimSpace(releaseMap["ID"])) versionID := strings.TrimSpace(releaseMap["VERSION_ID"]) var distro string switch { case id == "ubuntu" && strings.HasPrefix(versionID, "22.04"): distro = "ubuntu22.04" case id == "amzn" && strings.HasPrefix(versionID, "2023"): distro = "amzn2023" default: return "", "", "", fmt.Errorf("unsupported linux platform id='%s' version='%s'", id, versionID) } platformKey = distro + "-" + arch packageName, ok := fluentBitPackageFileMapping[platformKey] if !ok { return "", "", "", fmt.Errorf("no local package mapping for platform '%s'", platformKey) } return platformKey, packageName, arch, nil } func parseOSRelease(content string) map[string]string { result := map[string]string{} lines := strings.Split(content, "\n") for _, line := range lines { line = strings.TrimSpace(line) if line == "" || strings.HasPrefix(line, "#") || !strings.Contains(line, "=") { continue } parts := strings.SplitN(line, "=", 2) key := strings.TrimSpace(parts[0]) value := strings.TrimSpace(parts[1]) value = strings.Trim(value, "\"") result[key] = value } return result } func (this *BaseInstaller) detectRemoteLinuxArch() (string, error) { stdout, stderr, err := this.client.Exec("uname -m") if err != nil { return "", fmt.Errorf("detect remote arch failed: %w, stderr: %s", err, stderr) } arch := strings.ToLower(strings.TrimSpace(stdout)) switch arch { case "x86_64", "amd64": return "amd64", nil case "aarch64", "arm64": return "arm64", nil default: return arch, nil } } func (this *BaseInstaller) lookupFluentBitBinPath() (string, error) { stdout, stderr, err := this.client.Exec("if command -v fluent-bit >/dev/null 2>&1; then command -v fluent-bit; elif [ -x " + fluentBitDefaultBinPath + " ]; then echo " + fluentBitDefaultBinPath + "; fi") if err != nil { return "", fmt.Errorf("lookup fluent-bit binary failed: %w, stderr: %s", err, stderr) } return strings.TrimSpace(stdout), nil } func (this *BaseInstaller) readLocalParsersContent() (string, error) { parsersPath := filepath.Join(Tea.Root, "deploy", "fluent-bit", "parsers.conf") data, err := os.ReadFile(parsersPath) if err != nil { return "", fmt.Errorf("read local parsers config failed: %w", err) } return string(data), nil } func (this *BaseInstaller) readManagedMeta() (*fluentBitManagedMeta, error) { exists, err := this.remoteFileExists(fluentBitManagedMetaFile) if err != nil { return nil, err } if !exists { return nil, nil } content, stderr, err := this.client.Exec("cat " + shQuote(fluentBitManagedMetaFile)) if err != nil { return nil, fmt.Errorf("read fluent-bit managed metadata failed: %w, stderr: %s", err, stderr) } if strings.TrimSpace(content) == "" { return nil, nil } meta := &fluentBitManagedMeta{} if err := json.Unmarshal([]byte(content), meta); err != nil { return nil, fmt.Errorf("decode fluent-bit managed metadata failed: %w", err) } meta.Roles = normalizeRoles(meta.Roles) return meta, nil } func mergeManagedRoles(meta *fluentBitManagedMeta, role nodeconfigs.NodeRole) ([]string, error) { roleName, err := mapNodeRole(role) if err != nil { return nil, err } roleSet := map[string]struct{}{} if meta != nil { for _, r := range normalizeRoles(meta.Roles) { roleSet[r] = struct{}{} } } roleSet[roleName] = struct{}{} roles := make([]string, 0, len(roleSet)) for r := range roleSet { roles = append(roles, r) } sort.Strings(roles) return roles, nil } func mapNodeRole(role nodeconfigs.NodeRole) (string, error) { switch role { case nodeconfigs.NodeRoleNode: return fluentBitRoleNode, nil case nodeconfigs.NodeRoleDNS: return fluentBitRoleDNS, nil case nodeconfigs.NodeRoleHTTPDNS: return fluentBitRoleHTTPDNS, nil default: return "", fmt.Errorf("unsupported fluent-bit role '%s'", role) } } func normalizeRoles(rawRoles []string) []string { roleSet := map[string]struct{}{} for _, role := range rawRoles { role = strings.ToLower(strings.TrimSpace(role)) if role != fluentBitRoleNode && role != fluentBitRoleDNS && role != fluentBitRoleHTTPDNS { continue } roleSet[role] = struct{}{} } roles := make([]string, 0, len(roleSet)) for role := range roleSet { roles = append(roles, role) } sort.Strings(roles) return roles } func hasRole(roles []string, role string) bool { for _, one := range roles { if one == role { return true } } return false } func (this *BaseInstaller) buildDesiredFluentBitConfig(roles []string) (*fluentBitDesiredConfig, error) { if len(roles) == 0 { return nil, errors.New("fluent-bit roles should not be empty") } ch, err := models.SharedSysSettingDAO.ReadClickHouseConfig(nil) if err != nil { return nil, fmt.Errorf("read clickhouse setting failed: %w", err) } if ch == nil { ch = &systemconfigs.ClickHouseSetting{} } if strings.TrimSpace(ch.Host) == "" { ch.Host = "127.0.0.1" } ch.Scheme = strings.ToLower(strings.TrimSpace(ch.Scheme)) if ch.Scheme == "" { ch.Scheme = "https" } if ch.Scheme != "http" && ch.Scheme != "https" { return nil, fmt.Errorf("unsupported clickhouse scheme '%s'", ch.Scheme) } if ch.Port <= 0 { if ch.Scheme == "https" { ch.Port = 8443 } else { ch.Port = 8443 } } if strings.TrimSpace(ch.Database) == "" { ch.Database = "default" } if strings.TrimSpace(ch.User) == "" { ch.User = "default" } // 当前平台策略:后台固定跳过 ClickHouse TLS 证书校验,不暴露 ServerName 配置。 ch.TLSSkipVerify = true ch.TLSServerName = "" httpPathPattern := fluentBitHTTPPathPattern dnsPathPattern := fluentBitDNSPathPattern httpdnsPathPattern := fluentBitHTTPDNSPathPattern publicPolicyPath, err := this.readPublicAccessLogPolicyPath() if err != nil { return nil, err } policyDir := dirFromPolicyPath(publicPolicyPath) if policyDir != "" { pattern := strings.TrimRight(policyDir, "/") + "/*.log" httpPathPattern = pattern dnsPathPattern = pattern httpdnsPathPattern = pattern } return &fluentBitDesiredConfig{ Roles: normalizeRoles(roles), ClickHouse: ch, HTTPPathPattern: httpPathPattern, DNSPathPattern: dnsPathPattern, HTTPDNSPathPattern: httpdnsPathPattern, }, nil } func (this *BaseInstaller) readPublicAccessLogPolicyPath() (string, error) { policyId, err := models.SharedHTTPAccessLogPolicyDAO.FindCurrentPublicPolicyId(nil) if err != nil { return "", fmt.Errorf("find current public access log policy failed: %w", err) } if policyId <= 0 { return "", nil } policy, err := models.SharedHTTPAccessLogPolicyDAO.FindEnabledHTTPAccessLogPolicy(nil, policyId) if err != nil { return "", fmt.Errorf("read public access log policy failed: %w", err) } if policy == nil { return "", nil } return strings.TrimSpace(models.ParseHTTPAccessLogPolicyFilePath(policy)), nil } func dirFromPolicyPath(policyPath string) string { pathValue := strings.TrimSpace(policyPath) if pathValue == "" { return "" } pathValue = strings.ReplaceAll(pathValue, "\\", "/") dir := slashpath.Dir(pathValue) if dir == "." { return "" } return strings.TrimRight(dir, "/") } func (this *BaseInstaller) applyManagedConfig(tempDir string, desired *fluentBitDesiredConfig, parserContent string, existingMeta *fluentBitManagedMeta) (bool, error) { mainExists, err := this.remoteFileExists(fluentBitMainConfigFile) if err != nil { return false, err } if mainExists && existingMeta == nil { containsMarker, err := this.remoteFileContains(fluentBitMainConfigFile, fluentBitManagedMarker) if err != nil { return false, err } if !containsMarker { // Adopt unmanaged config by backing it up and replacing with managed config below. } } configContent, err := renderManagedConfig(desired) if err != nil { return false, err } envContent := renderManagedEnv(desired.ClickHouse) metaContent, newMeta, err := renderManagedMeta(desired, configContent, parserContent, envContent) if err != nil { return false, err } requiredFiles := []string{fluentBitMainConfigFile, fluentBitParsersFile, fluentBitManagedEnvFile, fluentBitManagedMetaFile} if existingMeta != nil && existingMeta.Hash == newMeta.Hash { allExists := true for _, file := range requiredFiles { exists, err := this.remoteFileExists(file) if err != nil { return false, err } if !exists { allExists = false break } } if allExists { return false, nil } } if mainExists { backup := fluentBitMainConfigFile + ".bak." + strconv.FormatInt(time.Now().Unix(), 10) _, stderr, err := this.client.Exec("cp -f " + shQuote(fluentBitMainConfigFile) + " " + shQuote(backup)) if err != nil { return false, fmt.Errorf("backup existing fluent-bit config failed: %w, stderr: %s", err, stderr) } } if err := this.writeRemoteFileByTemp(tempDir, fluentBitMainConfigFile, configContent, 0644); err != nil { return false, err } if err := this.writeRemoteFileByTemp(tempDir, fluentBitParsersFile, parserContent, 0644); err != nil { return false, err } if err := this.writeRemoteFileByTemp(tempDir, fluentBitManagedEnvFile, envContent, 0600); err != nil { return false, err } if err := this.writeRemoteFileByTemp(tempDir, fluentBitManagedMetaFile, metaContent, 0644); err != nil { return false, err } return true, nil } func renderManagedConfig(desired *fluentBitDesiredConfig) (string, error) { if desired == nil || desired.ClickHouse == nil { return "", errors.New("invalid fluent-bit desired config") } scheme := strings.ToLower(strings.TrimSpace(desired.ClickHouse.Scheme)) if scheme == "" { scheme = "http" } if scheme != "http" && scheme != "https" { return "", fmt.Errorf("unsupported clickhouse scheme '%s'", desired.ClickHouse.Scheme) } useTLS := scheme == "https" insertHTTP := url.QueryEscape(fmt.Sprintf("INSERT INTO %s.logs_ingest FORMAT JSONEachRow", desired.ClickHouse.Database)) insertDNS := url.QueryEscape(fmt.Sprintf("INSERT INTO %s.dns_logs_ingest FORMAT JSONEachRow", desired.ClickHouse.Database)) insertHTTPDNS := url.QueryEscape(fmt.Sprintf("INSERT INTO %s.httpdns_access_logs_ingest FORMAT JSONEachRow", desired.ClickHouse.Database)) lines := []string{ "# " + fluentBitManagedMarker, "[SERVICE]", " Flush 1", " Log_Level info", " Parsers_File " + fluentBitParsersFile, " storage.path " + fluentBitStorageDir, " storage.sync normal", " storage.checksum off", " storage.backlog.mem_limit 512MB", "", } if hasRole(desired.Roles, fluentBitRoleNode) { lines = append(lines, "[INPUT]", " Name tail", " Path "+desired.HTTPPathPattern, " Tag app.http.logs", " Parser json", " Refresh_Interval 2", " Read_from_Head false", " DB /var/lib/fluent-bit/http-logs.db", " storage.type filesystem", " Mem_Buf_Limit 256MB", " Skip_Long_Lines On", "", ) } if hasRole(desired.Roles, fluentBitRoleDNS) { lines = append(lines, "[INPUT]", " Name tail", " Path "+desired.DNSPathPattern, " Tag app.dns.logs", " Parser json", " Refresh_Interval 2", " Read_from_Head false", " DB /var/lib/fluent-bit/dns-logs.db", " storage.type filesystem", " Mem_Buf_Limit 256MB", " Skip_Long_Lines On", "", ) } if hasRole(desired.Roles, fluentBitRoleHTTPDNS) { lines = append(lines, "[INPUT]", " Name tail", " Path "+desired.HTTPDNSPathPattern, " Tag app.httpdns.logs", " Parser json", " Refresh_Interval 2", " Read_from_Head false", " DB /var/lib/fluent-bit/httpdns-logs.db", " storage.type filesystem", " Mem_Buf_Limit 256MB", " Skip_Long_Lines On", "", ) } if hasRole(desired.Roles, fluentBitRoleNode) { lines = append(lines, "[OUTPUT]", " Name http", " Match app.http.logs", " Host "+desired.ClickHouse.Host, " Port "+strconv.Itoa(desired.ClickHouse.Port), " URI /?query="+insertHTTP, " Format json_lines", " http_user ${CH_USER}", " http_passwd ${CH_PASSWORD}", " json_date_key timestamp", " json_date_format epoch", " workers 2", " net.keepalive On", " Retry_Limit False", ) if useTLS { lines = append(lines, " tls On") if desired.ClickHouse.TLSSkipVerify { lines = append(lines, " tls.verify Off") } else { lines = append(lines, " tls.verify On") } if strings.TrimSpace(desired.ClickHouse.TLSServerName) != "" { lines = append(lines, " tls.vhost "+strings.TrimSpace(desired.ClickHouse.TLSServerName)) } } lines = append(lines, "") } if hasRole(desired.Roles, fluentBitRoleDNS) { lines = append(lines, "[OUTPUT]", " Name http", " Match app.dns.logs", " Host "+desired.ClickHouse.Host, " Port "+strconv.Itoa(desired.ClickHouse.Port), " URI /?query="+insertDNS, " Format json_lines", " http_user ${CH_USER}", " http_passwd ${CH_PASSWORD}", " json_date_key timestamp", " json_date_format epoch", " workers 2", " net.keepalive On", " Retry_Limit False", ) if useTLS { lines = append(lines, " tls On") if desired.ClickHouse.TLSSkipVerify { lines = append(lines, " tls.verify Off") } else { lines = append(lines, " tls.verify On") } if strings.TrimSpace(desired.ClickHouse.TLSServerName) != "" { lines = append(lines, " tls.vhost "+strings.TrimSpace(desired.ClickHouse.TLSServerName)) } } lines = append(lines, "") } if hasRole(desired.Roles, fluentBitRoleHTTPDNS) { lines = append(lines, "[OUTPUT]", " Name http", " Match app.httpdns.logs", " Host "+desired.ClickHouse.Host, " Port "+strconv.Itoa(desired.ClickHouse.Port), " URI /?query="+insertHTTPDNS, " Format json_lines", " http_user ${CH_USER}", " http_passwd ${CH_PASSWORD}", " json_date_key timestamp", " json_date_format epoch", " workers 2", " net.keepalive On", " Retry_Limit False", ) if useTLS { lines = append(lines, " tls On") if desired.ClickHouse.TLSSkipVerify { lines = append(lines, " tls.verify Off") } else { lines = append(lines, " tls.verify On") } if strings.TrimSpace(desired.ClickHouse.TLSServerName) != "" { lines = append(lines, " tls.vhost "+strings.TrimSpace(desired.ClickHouse.TLSServerName)) } } lines = append(lines, "") } return strings.Join(lines, "\n"), nil } func renderManagedEnv(ch *systemconfigs.ClickHouseSetting) string { user := "default" password := "" if ch != nil { if strings.TrimSpace(ch.User) != "" { user = strings.TrimSpace(ch.User) } password = ch.Password } return "CH_USER=" + strconv.Quote(user) + "\n" + "CH_PASSWORD=" + strconv.Quote(password) + "\n" } func renderManagedMeta(desired *fluentBitDesiredConfig, configContent string, parserContent string, envContent string) (string, *fluentBitManagedMeta, error) { hashInput := configContent + "\n---\n" + parserContent + "\n---\n" + envContent + "\n---\n" + strings.Join(desired.Roles, ",") hashBytes := sha256.Sum256([]byte(hashInput)) hash := fmt.Sprintf("%x", hashBytes[:]) meta := &fluentBitManagedMeta{ Roles: desired.Roles, Hash: hash, UpdatedAt: time.Now().Unix(), SourceVersion: teaconst.Version, } data, err := json.MarshalIndent(meta, "", " ") if err != nil { return "", nil, fmt.Errorf("encode fluent-bit managed metadata failed: %w", err) } return string(data) + "\n", meta, nil } func (this *BaseInstaller) copyLocalFileToRemote(tempDir string, localPath string, remotePath string, mode os.FileMode) error { tempFile := tempDir + "/" + filepath.Base(remotePath) if err := this.client.Copy(localPath, tempFile, mode); err != nil { return fmt.Errorf("upload fluent-bit file '%s' failed: %w", localPath, err) } _, stderr, err := this.client.Exec("cp -f " + shQuote(tempFile) + " " + shQuote(remotePath) + " && chmod " + fmt.Sprintf("%04o", mode) + " " + shQuote(remotePath)) if err != nil { return fmt.Errorf("install remote fluent-bit file '%s' failed: %w, stderr: %s", remotePath, err, stderr) } return nil } func (this *BaseInstaller) writeRemoteFileByTemp(tempDir string, remotePath string, content string, mode os.FileMode) error { tempFile := tempDir + "/" + filepath.Base(remotePath) + ".tmp" if _, err := this.client.WriteFile(tempFile, []byte(content)); err != nil { return fmt.Errorf("write temp fluent-bit file '%s' failed: %w", tempFile, err) } _, stderr, err := this.client.Exec("cp -f " + shQuote(tempFile) + " " + shQuote(remotePath) + " && chmod " + fmt.Sprintf("%04o", mode) + " " + shQuote(remotePath)) if err != nil { return fmt.Errorf("write managed fluent-bit file '%s' failed: %w, stderr: %s", remotePath, err, stderr) } return nil } func (this *BaseInstaller) ensureFluentBitService(tempDir string, binPath string, configChanged bool) error { _, _, _ = this.client.Exec("if command -v systemctl >/dev/null 2>&1 && [ ! -f /etc/systemd/system/" + fluentBitServiceName + ".service ] && [ ! -f /lib/systemd/system/" + fluentBitServiceName + ".service ]; then " + "cat > /etc/systemd/system/" + fluentBitServiceName + ".service <<'EOF'\n" + "[Unit]\n" + "Description=Fluent Bit\n" + "After=network.target\n" + "\n" + "[Service]\n" + "ExecStart=" + binPath + " -c " + fluentBitMainConfigFile + "\n" + "Restart=always\n" + "RestartSec=5\n" + "\n" + "[Install]\n" + "WantedBy=multi-user.target\n" + "EOF\n" + "fi") stdout, _, err := this.client.Exec("if command -v systemctl >/dev/null 2>&1; then echo 1; else echo 0; fi") if err != nil { return fmt.Errorf("check systemctl failed: %w", err) } if strings.TrimSpace(stdout) == "1" { dropInChanged, err := this.ensureServiceDropIn(tempDir, binPath) if err != nil { return err } restartRequired := configChanged || dropInChanged _, stderr, err := this.client.Exec("systemctl daemon-reload; systemctl enable " + fluentBitServiceName + " >/dev/null 2>&1 || true; " + "if systemctl is-active " + fluentBitServiceName + " >/dev/null 2>&1; then " + "if [ \"" + boolToString(restartRequired) + "\" = \"1\" ]; then systemctl restart " + fluentBitServiceName + "; fi; " + "else systemctl start " + fluentBitServiceName + "; fi") if err != nil { return fmt.Errorf("ensure fluent-bit service failed: %w, stderr: %s", err, stderr) } return nil } if configChanged { _, _, _ = this.client.Exec("pkill -f \"fluent-bit.*fluent-bit.conf\" >/dev/null 2>&1 || true") } _, _, runningErr := this.client.Exec("pgrep -f \"fluent-bit.*fluent-bit.conf\" >/dev/null 2>&1") if runningErr != nil { startCmd := "set -a; [ -f " + shQuote(fluentBitManagedEnvFile) + " ] && . " + shQuote(fluentBitManagedEnvFile) + "; set +a; " + shQuote(binPath) + " -c " + shQuote(fluentBitMainConfigFile) + " >/dev/null 2>&1 &" _, stderr, err := this.client.Exec(startCmd) if err != nil { return fmt.Errorf("start fluent-bit without systemd failed: %w, stderr: %s", err, stderr) } } return nil } func (this *BaseInstaller) ensureServiceDropIn(tempDir string, binPath string) (bool, error) { _, stderr, err := this.client.Exec("mkdir -p " + shQuote(fluentBitDropInDir)) if err != nil { return false, fmt.Errorf("prepare fluent-bit drop-in dir failed: %w, stderr: %s", err, stderr) } content := "[Service]\n" + "EnvironmentFile=-" + fluentBitManagedEnvFile + "\n" + "ExecStart=\n" + "ExecStart=" + binPath + " -c " + fluentBitMainConfigFile + "\n" existing, _, _ := this.client.Exec("if [ -f " + shQuote(fluentBitDropInFile) + " ]; then cat " + shQuote(fluentBitDropInFile) + "; fi") if existing == content { return false, nil } if err := this.writeRemoteFileByTemp(tempDir, fluentBitDropInFile, content, 0644); err != nil { return false, err } return true, nil } func (this *BaseInstaller) remoteFileExists(path string) (bool, error) { stdout, stderr, err := this.client.Exec("if [ -f " + shQuote(path) + " ]; then echo 1; else echo 0; fi") if err != nil { return false, fmt.Errorf("check remote file '%s' failed: %w, stderr: %s", path, err, stderr) } return strings.TrimSpace(stdout) == "1", nil } func (this *BaseInstaller) remoteFileContains(path string, pattern string) (bool, error) { stdout, stderr, err := this.client.Exec("if grep -F " + shQuote(pattern) + " " + shQuote(path) + " >/dev/null 2>&1; then echo 1; else echo 0; fi") if err != nil { return false, fmt.Errorf("check remote file content '%s' failed: %w, stderr: %s", path, err, stderr) } return strings.TrimSpace(stdout) == "1", nil } func shQuote(value string) string { if value == "" { return "''" } return "'" + strings.ReplaceAll(value, "'", "'\"'\"'") + "'" } func boolToString(v bool) string { if v { return "1" } return "0" }