Files
waf-platform/EdgeAPI/internal/installers/fluent_bit.go
2026-02-12 21:37:55 +08:00

348 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package installers
import (
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/iwind/TeaGo/Tea"
)
const (
fluentBitConfigDir = "/etc/fluent-bit"
fluentBitStorageDir = "/var/lib/fluent-bit/storage"
fluentBitMainConfigFile = "/etc/fluent-bit/fluent-bit.conf"
fluentBitParsersFile = "/etc/fluent-bit/parsers.conf"
fluentBitUpstreamFile = "/etc/fluent-bit/clickhouse-upstream.conf"
fluentBitLogrotateFile = "/etc/logrotate.d/edge-goedge"
fluentBitServiceName = "fluent-bit"
fluentBitDefaultBinPath = "/opt/fluent-bit/bin/fluent-bit"
fluentBitLocalPackagesRoot = "packages"
fluentBitHTTPPathPattern = "/var/log/edge/edge-node/*.log"
fluentBitDNSPathPattern = "/var/log/edge/edge-dns/*.log"
)
var errFluentBitLocalPackageNotFound = errors.New("fluent-bit local package not found")
// SetupFluentBit 安装 Fluent Bit仅离线包并同步配置文件。
// 升级场景下不覆盖已有配置;若已有配置与节点角色不兼容,直接报错终止安装。
func (this *BaseInstaller) SetupFluentBit(role nodeconfigs.NodeRole) error {
if this.client == nil {
return errors.New("ssh client is nil")
}
uname := this.uname()
if !strings.Contains(uname, "Linux") {
return nil
}
tempDir := strings.TrimRight(this.client.UserHome(), "/") + "/.edge-fluent-bit"
_, _, _ = this.client.Exec("mkdir -p " + tempDir)
defer func() {
_, _, _ = this.client.Exec("rm -rf " + tempDir)
}()
// 统一使用 fluent-bit.conf已含 HTTP + DNS 两类 input避免同机 Node/DNS 冲突。
files := []struct {
Local string
Remote string
}{
{Local: "fluent-bit.conf", Remote: "fluent-bit.conf"},
{Local: "parsers.conf", Remote: "parsers.conf"},
{Local: "clickhouse-upstream.conf", Remote: "clickhouse-upstream.conf"},
{Local: "logrotate.conf", Remote: "logrotate.conf"},
}
for _, file := range files {
localPath := filepath.Join(Tea.Root, "deploy", "fluent-bit", file.Local)
if _, err := os.Stat(localPath); err != nil {
return fmt.Errorf("fluent-bit file '%s' not found: %w", localPath, err)
}
remotePath := tempDir + "/" + file.Remote
if err := this.client.Copy(localPath, remotePath, 0644); err != nil {
return fmt.Errorf("upload fluent-bit file '%s' failed: %w", file.Local, err)
}
}
if err := this.ensureFluentBitInstalled(tempDir); err != nil {
return err
}
_, stderr, err := this.client.Exec("mkdir -p " + fluentBitConfigDir + " " + fluentBitStorageDir + " /etc/logrotate.d")
if err != nil {
return fmt.Errorf("prepare fluent-bit directories failed: %w, stderr: %s", err, stderr)
}
exists, err := this.remoteFileExists(fluentBitMainConfigFile)
if err != nil {
return err
}
// 若已存在配置,先做角色兼容校验,不允许覆盖。
if exists {
if err := this.validateExistingConfigForRole(role); err != nil {
return err
}
}
configCopied, err := this.copyFluentBitConfigIfMissing(tempDir)
if err != nil {
return err
}
binPath, err := this.lookupFluentBitBinPath()
if err != nil {
return err
}
if err := this.ensureFluentBitService(binPath, configCopied); err != nil {
return err
}
return nil
}
func (this *BaseInstaller) ensureFluentBitInstalled(tempDir string) error {
binPath, _ := this.lookupFluentBitBinPath()
if binPath != "" {
return nil
}
if err := this.installFluentBitFromLocalPackage(tempDir); err != nil {
if errors.Is(err, errFluentBitLocalPackageNotFound) {
return fmt.Errorf("install fluent-bit failed: local package not found, expected in deploy/fluent-bit/%s/linux-<arch>", fluentBitLocalPackagesRoot)
}
return fmt.Errorf("install fluent-bit from local package failed: %w", err)
}
binPath, err := this.lookupFluentBitBinPath()
if err != nil {
return err
}
if binPath == "" {
return errors.New("fluent-bit binary not found after local package install")
}
return nil
}
func (this *BaseInstaller) installFluentBitFromLocalPackage(tempDir string) error {
arch, err := this.detectRemoteLinuxArch()
if err != nil {
return err
}
packageDir := filepath.Join(Tea.Root, "deploy", "fluent-bit", fluentBitLocalPackagesRoot, "linux-"+arch)
entries, err := os.ReadDir(packageDir)
if err != nil {
if os.IsNotExist(err) {
return errFluentBitLocalPackageNotFound
}
return fmt.Errorf("read fluent-bit local package dir failed: %w", err)
}
packageFiles := make([]string, 0)
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := strings.ToLower(entry.Name())
if strings.HasSuffix(name, ".deb") || strings.HasSuffix(name, ".rpm") || strings.HasSuffix(name, ".tar.gz") || strings.HasSuffix(name, ".tgz") {
packageFiles = append(packageFiles, filepath.Join(packageDir, entry.Name()))
}
}
if len(packageFiles) == 0 {
return errFluentBitLocalPackageNotFound
}
sort.Strings(packageFiles)
var lastErr error
for _, localPackagePath := range packageFiles {
remotePackagePath := tempDir + "/" + filepath.Base(localPackagePath)
if err := this.client.Copy(localPackagePath, remotePackagePath, 0644); err != nil {
lastErr = fmt.Errorf("upload local package failed: %w", err)
continue
}
var installCmd string
lowerName := strings.ToLower(localPackagePath)
switch {
case strings.HasSuffix(lowerName, ".deb"):
installCmd = "dpkg -i " + remotePackagePath
case strings.HasSuffix(lowerName, ".rpm"):
installCmd = "rpm -Uvh --force " + remotePackagePath + " || rpm -ivh --force " + remotePackagePath
case strings.HasSuffix(lowerName, ".tar.gz") || strings.HasSuffix(lowerName, ".tgz"):
extractDir := tempDir + "/extract"
installCmd = "rm -rf " + extractDir + "; mkdir -p " + extractDir + "; tar -xzf " + remotePackagePath + " -C " + extractDir + "; " +
"bin=$(find " + extractDir + " -type f -name fluent-bit | head -n 1); " +
"if [ -z \"$bin\" ]; then exit 3; fi; " +
"mkdir -p /opt/fluent-bit/bin /usr/local/bin; " +
"install -m 0755 \"$bin\" /opt/fluent-bit/bin/fluent-bit; " +
"ln -sf /opt/fluent-bit/bin/fluent-bit /usr/local/bin/fluent-bit"
default:
continue
}
_, stderr, err := this.client.Exec(installCmd)
if err != nil {
lastErr = fmt.Errorf("install fluent-bit local package '%s' failed: %w, stderr: %s", filepath.Base(localPackagePath), err, stderr)
continue
}
binPath, err := this.lookupFluentBitBinPath()
if err == nil && binPath != "" {
return nil
}
if err != nil {
lastErr = err
} else {
lastErr = errors.New("fluent-bit binary not found after local package install")
}
}
if lastErr != nil {
return lastErr
}
return errFluentBitLocalPackageNotFound
}
func (this *BaseInstaller) detectRemoteLinuxArch() (string, error) {
stdout, stderr, err := this.client.Exec("uname -m")
if err != nil {
return "", fmt.Errorf("detect remote arch failed: %w, stderr: %s", err, stderr)
}
arch := strings.ToLower(strings.TrimSpace(stdout))
switch arch {
case "x86_64", "amd64":
return "amd64", nil
case "aarch64", "arm64":
return "arm64", nil
default:
return arch, nil
}
}
func (this *BaseInstaller) lookupFluentBitBinPath() (string, error) {
stdout, stderr, err := this.client.Exec("if command -v fluent-bit >/dev/null 2>&1; then command -v fluent-bit; elif [ -x " + fluentBitDefaultBinPath + " ]; then echo " + fluentBitDefaultBinPath + "; fi")
if err != nil {
return "", fmt.Errorf("lookup fluent-bit binary failed: %w, stderr: %s", err, stderr)
}
return strings.TrimSpace(stdout), nil
}
func (this *BaseInstaller) copyFluentBitConfigIfMissing(tempDir string) (bool, error) {
targets := []struct {
Src string
Dest string
}{
{Src: tempDir + "/fluent-bit.conf", Dest: fluentBitMainConfigFile},
{Src: tempDir + "/parsers.conf", Dest: fluentBitParsersFile},
{Src: tempDir + "/clickhouse-upstream.conf", Dest: fluentBitUpstreamFile},
{Src: tempDir + "/logrotate.conf", Dest: fluentBitLogrotateFile},
}
copied := false
for _, target := range targets {
exists, err := this.remoteFileExists(target.Dest)
if err != nil {
return false, err
}
if exists {
continue
}
_, stderr, err := this.client.Exec("cp -f " + target.Src + " " + target.Dest)
if err != nil {
return false, fmt.Errorf("copy fluent-bit file to '%s' failed: %w, stderr: %s", target.Dest, err, stderr)
}
copied = true
}
return copied, nil
}
func (this *BaseInstaller) validateExistingConfigForRole(role nodeconfigs.NodeRole) error {
requiredPatterns := []string{}
switch role {
case nodeconfigs.NodeRoleNode:
requiredPatterns = append(requiredPatterns, fluentBitHTTPPathPattern)
case nodeconfigs.NodeRoleDNS:
requiredPatterns = append(requiredPatterns, fluentBitDNSPathPattern)
}
for _, pattern := range requiredPatterns {
ok, err := this.remoteFileContains(fluentBitMainConfigFile, pattern)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("existing fluent-bit config '%s' does not contain required path '%s'; skip overwrite by design, please update config manually", fluentBitMainConfigFile, pattern)
}
}
return nil
}
func (this *BaseInstaller) remoteFileExists(path string) (bool, error) {
stdout, stderr, err := this.client.Exec("if [ -f \"" + path + "\" ]; then echo 1; else echo 0; fi")
if err != nil {
return false, fmt.Errorf("check remote file '%s' failed: %w, stderr: %s", path, err, stderr)
}
return strings.TrimSpace(stdout) == "1", nil
}
func (this *BaseInstaller) remoteFileContains(path string, pattern string) (bool, error) {
stdout, stderr, err := this.client.Exec("if grep -F \"" + pattern + "\" \"" + path + "\" >/dev/null 2>&1; then echo 1; else echo 0; fi")
if err != nil {
return false, fmt.Errorf("check remote file content '%s' failed: %w, stderr: %s", path, err, stderr)
}
return strings.TrimSpace(stdout) == "1", nil
}
func (this *BaseInstaller) ensureFluentBitService(binPath string, configCopied bool) error {
_, _, _ = this.client.Exec("if command -v systemctl >/dev/null 2>&1 && [ ! -f /etc/systemd/system/" + fluentBitServiceName + ".service ] && [ ! -f /lib/systemd/system/" + fluentBitServiceName + ".service ]; then " +
"cat > /etc/systemd/system/" + fluentBitServiceName + ".service <<'EOF'\n" +
"[Unit]\n" +
"Description=Fluent Bit\n" +
"After=network.target\n" +
"\n" +
"[Service]\n" +
"ExecStart=" + binPath + " -c " + fluentBitMainConfigFile + "\n" +
"Restart=always\n" +
"RestartSec=5\n" +
"\n" +
"[Install]\n" +
"WantedBy=multi-user.target\n" +
"EOF\n" +
"fi")
stdout, stderr, err := this.client.Exec("if command -v systemctl >/dev/null 2>&1; then systemctl daemon-reload; systemctl enable " + fluentBitServiceName + " >/dev/null 2>&1 || true; if systemctl is-active " + fluentBitServiceName + " >/dev/null 2>&1; then " +
"if [ \"" + boolToString(configCopied) + "\" = \"1\" ]; then systemctl restart " + fluentBitServiceName + "; fi; " +
"else systemctl start " + fluentBitServiceName + "; fi; else echo no-systemctl; fi")
if err != nil {
return fmt.Errorf("ensure fluent-bit service failed: %w, stderr: %s", err, stderr)
}
if strings.TrimSpace(stdout) == "no-systemctl" {
_, _, runningErr := this.client.Exec("pgrep -f \"fluent-bit.*fluent-bit.conf\" >/dev/null 2>&1")
if runningErr != nil {
_, stderr, err = this.client.Exec(binPath + " -c " + fluentBitMainConfigFile + " >/dev/null 2>&1 &")
if err != nil {
return fmt.Errorf("start fluent-bit without systemd failed: %w, stderr: %s", err, stderr)
}
}
}
return nil
}
func boolToString(v bool) string {
if v {
return "1"
}
return "0"
}