引入lumberjack和fluentbit自动分发

This commit is contained in:
robin
2026-02-13 22:36:17 +08:00
parent c6da67db79
commit e9093baffb
47 changed files with 4589 additions and 317 deletions

View File

@@ -18,11 +18,9 @@ const (
envPassword = "CLICKHOUSE_PASSWORD"
envDatabase = "CLICKHOUSE_DATABASE"
envScheme = "CLICKHOUSE_SCHEME"
envTLSSkipVerify = "CLICKHOUSE_TLS_SKIP_VERIFY"
envTLSServerName = "CLICKHOUSE_TLS_SERVER_NAME"
defaultPort = 8123
defaultPort = 8443
defaultDB = "default"
defaultScheme = "http"
defaultScheme = "https"
)
var (
@@ -62,7 +60,7 @@ func ResetSharedConfig() {
}
func loadConfig() *Config {
cfg := &Config{Port: defaultPort, Database: defaultDB, Scheme: defaultScheme}
cfg := &Config{Port: defaultPort, Database: defaultDB, Scheme: defaultScheme, TLSSkipVerify: true}
// 1) 优先从后台页面配置DB读取
if models.SharedSysSettingDAO != nil {
if dbCfg, err := models.SharedSysSettingDAO.ReadClickHouseConfig(nil); err == nil && dbCfg != nil && dbCfg.Host != "" {
@@ -72,8 +70,8 @@ func loadConfig() *Config {
cfg.Password = dbCfg.Password
cfg.Database = dbCfg.Database
cfg.Scheme = normalizeScheme(dbCfg.Scheme)
cfg.TLSSkipVerify = dbCfg.TLSSkipVerify
cfg.TLSServerName = dbCfg.TLSServerName
cfg.TLSSkipVerify = true
cfg.TLSServerName = ""
if cfg.Port <= 0 {
cfg.Port = defaultPort
}
@@ -93,8 +91,8 @@ func loadConfig() *Config {
cfg.Password = ch.Password
cfg.Database = ch.Database
cfg.Scheme = normalizeScheme(ch.Scheme)
cfg.TLSSkipVerify = ch.TLSSkipVerify
cfg.TLSServerName = ch.TLSServerName
cfg.TLSSkipVerify = true
cfg.TLSServerName = ""
if cfg.Port <= 0 {
cfg.Port = defaultPort
}
@@ -112,17 +110,13 @@ func loadConfig() *Config {
cfg.Database = defaultDB
}
cfg.Scheme = normalizeScheme(os.Getenv(envScheme))
cfg.TLSServerName = os.Getenv(envTLSServerName)
cfg.TLSServerName = ""
if p := os.Getenv(envPort); p != "" {
if v, err := strconv.Atoi(p); err == nil {
cfg.Port = v
}
}
if v := os.Getenv(envTLSSkipVerify); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
cfg.TLSSkipVerify = b
}
}
cfg.TLSSkipVerify = true
return cfg
}

View File

@@ -176,7 +176,9 @@ func (s *LogsIngestStore) List(ctx context.Context, f ListFilter) (rows []*LogsI
}
orderBy := fmt.Sprintf("timestamp %s, trace_id %s", orderDir, orderDir)
query := fmt.Sprintf("SELECT timestamp, node_id, cluster_id, server_id, host, ip, method, path, status, bytes_in, bytes_out, cost_ms, ua, referer, log_type, trace_id, firewall_policy_id, firewall_rule_group_id, firewall_rule_set_id, firewall_rule_id, request_headers, request_body, response_headers, response_body FROM %s WHERE %s ORDER BY %s LIMIT %d",
// 列表查询不 SELECT 大字段(request_headers / request_body / response_headers / response_body
// 避免每次翻页读取 GB 级数据。详情查看时通过 FindByTraceId 单独获取。
query := fmt.Sprintf("SELECT timestamp, node_id, cluster_id, server_id, host, ip, method, path, status, bytes_in, bytes_out, cost_ms, ua, referer, log_type, trace_id, firewall_policy_id, firewall_rule_group_id, firewall_rule_set_id, firewall_rule_id FROM %s WHERE %s ORDER BY %s LIMIT %d",
table, where, orderBy, limit+1)
var rawRows []map[string]interface{}

View File

@@ -122,8 +122,10 @@ func (s *NSLogsIngestStore) List(ctx context.Context, f NSListFilter) (rows []*N
limit = 1000
}
// 列表查询不 SELECT content_json 大字段,减少翻页时的数据传输量。
// 详情查看时通过 FindByRequestId 单独获取完整信息。
query := fmt.Sprintf(
"SELECT timestamp, request_id, node_id, cluster_id, domain_id, record_id, remote_addr, question_name, question_type, record_name, record_type, record_value, networking, is_recursive, error, ns_route_codes, content_json FROM %s WHERE %s ORDER BY timestamp %s, request_id %s LIMIT %d",
"SELECT timestamp, request_id, node_id, cluster_id, domain_id, record_id, remote_addr, question_name, question_type, record_name, record_type, record_value, networking, is_recursive, error, ns_route_codes FROM %s WHERE %s ORDER BY timestamp %s, request_id %s LIMIT %d",
table,
strings.Join(conditions, " AND "),
orderDir,

View File

@@ -20,3 +20,20 @@ func ParseHTTPAccessLogPolicyFilePath(policy *HTTPAccessLogPolicy) string {
return strings.TrimSpace(config.Path)
}
// ParseHTTPAccessLogPolicyRotateConfig 提取访问日志策略中的文件轮转配置(所有 file* 类型有效)。
func ParseHTTPAccessLogPolicyRotateConfig(policy *HTTPAccessLogPolicy) *serverconfigs.AccessLogRotateConfig {
if policy == nil || !serverconfigs.IsFileBasedStorageType(policy.Type) || len(policy.Options) == 0 {
return nil
}
config := &serverconfigs.AccessLogFileStorageConfig{}
if err := json.Unmarshal(policy.Options, config); err != nil {
return nil
}
if config.Rotate == nil {
return nil
}
return config.Rotate.Normalize()
}

View File

@@ -1176,6 +1176,7 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, dataMap *shared
if publicPolicy != nil {
config.GlobalServerConfig.HTTPAccessLog.WriteTargets = serverconfigs.ParseWriteTargetsFromPolicy(publicPolicy.WriteTargets, publicPolicy.Type, publicPolicy.DisableDefaultDB)
config.GlobalServerConfig.HTTPAccessLog.FilePath = ParseHTTPAccessLogPolicyFilePath(publicPolicy)
config.GlobalServerConfig.HTTPAccessLog.Rotate = ParseHTTPAccessLogPolicyRotateConfig(publicPolicy)
}
}
}

View File

@@ -479,6 +479,7 @@ func (this *NSNodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64) (*dnsconfigs.
if publicPolicy != nil {
config.AccessLogWriteTargets = serverconfigs.ParseWriteTargetsFromPolicy(publicPolicy.WriteTargets, publicPolicy.Type, publicPolicy.DisableDefaultDB)
config.AccessLogFilePath = ParseHTTPAccessLogPolicyFilePath(publicPolicy)
config.AccessLogRotate = ParseHTTPAccessLogPolicyRotateConfig(publicPolicy)
}
}
}

View File

@@ -270,8 +270,9 @@ func (this *SysSettingDAO) ReadClickHouseConfig(tx *dbs.Tx) (*systemconfigs.Clic
}
if len(valueJSON) == 0 {
return &systemconfigs.ClickHouseSetting{
Port: 8123,
Port: 8443,
Database: "default",
Scheme: "https",
}, nil
}
var config = &systemconfigs.ClickHouseSetting{}

View File

@@ -1,14 +1,23 @@
package installers
import (
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
slashpath "path"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
"github.com/iwind/TeaGo/Tea"
)
@@ -17,19 +26,45 @@ const (
fluentBitStorageDir = "/var/lib/fluent-bit/storage"
fluentBitMainConfigFile = "/etc/fluent-bit/fluent-bit.conf"
fluentBitParsersFile = "/etc/fluent-bit/parsers.conf"
fluentBitUpstreamFile = "/etc/fluent-bit/clickhouse-upstream.conf"
fluentBitManagedMetaFile = "/etc/fluent-bit/.edge-managed.json"
fluentBitManagedEnvFile = "/etc/fluent-bit/.edge-managed.env"
fluentBitLogrotateFile = "/etc/logrotate.d/edge-goedge"
fluentBitDropInDir = "/etc/systemd/system/fluent-bit.service.d"
fluentBitDropInFile = "/etc/systemd/system/fluent-bit.service.d/edge-managed.conf"
fluentBitServiceName = "fluent-bit"
fluentBitDefaultBinPath = "/opt/fluent-bit/bin/fluent-bit"
fluentBitLocalPackagesRoot = "packages"
fluentBitHTTPPathPattern = "/var/log/edge/edge-node/*.log"
fluentBitDNSPathPattern = "/var/log/edge/edge-dns/*.log"
fluentBitManagedMarker = "managed-by-edgeapi"
fluentBitRoleNode = "node"
fluentBitRoleDNS = "dns"
)
var errFluentBitLocalPackageNotFound = errors.New("fluent-bit local package not found")
// SetupFluentBit 安装 Fluent Bit仅离线包并同步配置文件。
// 升级场景下不覆盖已有配置;若已有配置与节点角色不兼容,直接报错终止安装。
var fluentBitPackageFileMapping = map[string]string{
"ubuntu22.04-amd64": "fluent-bit_4.2.2_amd64.deb",
"ubuntu22.04-arm64": "fluent-bit_4.2.2_arm64.deb",
"amzn2023-amd64": "fluent-bit-4.2.2-1.x86_64.rpm",
"amzn2023-arm64": "fluent-bit-4.2.2-1.aarch64.rpm",
}
type fluentBitManagedMeta struct {
Roles []string `json:"roles"`
Hash string `json:"hash"`
UpdatedAt int64 `json:"updatedAt"`
SourceVersion string `json:"sourceVersion"`
}
type fluentBitDesiredConfig struct {
Roles []string
ClickHouse *systemconfigs.ClickHouseSetting
HTTPPathPattern string
DNSPathPattern string
}
// SetupFluentBit 安装并托管 Fluent Bit 配置(离线包 + 平台渲染配置)。
func (this *BaseInstaller) SetupFluentBit(role nodeconfigs.NodeRole) error {
if this.client == nil {
return errors.New("ssh client is nil")
@@ -41,55 +76,41 @@ func (this *BaseInstaller) SetupFluentBit(role nodeconfigs.NodeRole) error {
}
tempDir := strings.TrimRight(this.client.UserHome(), "/") + "/.edge-fluent-bit"
_, _, _ = this.client.Exec("mkdir -p " + tempDir)
_, _, _ = this.client.Exec("mkdir -p " + shQuote(tempDir))
defer func() {
_, _, _ = this.client.Exec("rm -rf " + tempDir)
_, _, _ = this.client.Exec("rm -rf " + shQuote(tempDir))
}()
// 统一使用 fluent-bit.conf已含 HTTP + DNS 两类 input避免同机 Node/DNS 冲突。
files := []struct {
Local string
Remote string
}{
{Local: "fluent-bit.conf", Remote: "fluent-bit.conf"},
{Local: "parsers.conf", Remote: "parsers.conf"},
{Local: "clickhouse-upstream.conf", Remote: "clickhouse-upstream.conf"},
{Local: "logrotate.conf", Remote: "logrotate.conf"},
}
for _, file := range files {
localPath := filepath.Join(Tea.Root, "deploy", "fluent-bit", file.Local)
if _, err := os.Stat(localPath); err != nil {
return fmt.Errorf("fluent-bit file '%s' not found: %w", localPath, err)
}
remotePath := tempDir + "/" + file.Remote
if err := this.client.Copy(localPath, remotePath, 0644); err != nil {
return fmt.Errorf("upload fluent-bit file '%s' failed: %w", file.Local, err)
}
}
if err := this.ensureFluentBitInstalled(tempDir); err != nil {
return err
}
_, stderr, err := this.client.Exec("mkdir -p " + fluentBitConfigDir + " " + fluentBitStorageDir + " /etc/logrotate.d")
_, stderr, err := this.client.Exec("mkdir -p " + shQuote(fluentBitConfigDir) + " " + shQuote(fluentBitStorageDir) + " /etc/logrotate.d")
if err != nil {
return fmt.Errorf("prepare fluent-bit directories failed: %w, stderr: %s", err, stderr)
}
exists, err := this.remoteFileExists(fluentBitMainConfigFile)
parserContent, err := this.readLocalParsersContent()
if err != nil {
return err
}
// 若已存在配置,先做角色兼容校验,不允许覆盖。
if exists {
if err := this.validateExistingConfigForRole(role); err != nil {
return err
}
existingMeta, err := this.readManagedMeta()
if err != nil {
return err
}
configCopied, err := this.copyFluentBitConfigIfMissing(tempDir)
mergedRoles, err := mergeManagedRoles(existingMeta, role)
if err != nil {
return err
}
desiredConfig, err := this.buildDesiredFluentBitConfig(mergedRoles)
if err != nil {
return err
}
configChanged, err := this.applyManagedConfig(tempDir, desiredConfig, parserContent, existingMeta)
if err != nil {
return err
}
@@ -99,7 +120,7 @@ func (this *BaseInstaller) SetupFluentBit(role nodeconfigs.NodeRole) error {
return err
}
if err := this.ensureFluentBitService(binPath, configCopied); err != nil {
if err := this.ensureFluentBitService(tempDir, binPath, configChanged); err != nil {
return err
}
@@ -112,102 +133,128 @@ func (this *BaseInstaller) ensureFluentBitInstalled(tempDir string) error {
return nil
}
if err := this.installFluentBitFromLocalPackage(tempDir); err != nil {
platformKey, packageName, arch, err := this.detectRemotePlatformAndPackage()
if err != nil {
return fmt.Errorf("detect fluent-bit platform failed: %w", err)
}
if err := this.installFluentBitFromLocalPackage(tempDir, arch, packageName); err != nil {
if errors.Is(err, errFluentBitLocalPackageNotFound) {
return fmt.Errorf("install fluent-bit failed: local package not found, expected in deploy/fluent-bit/%s/linux-<arch>", fluentBitLocalPackagesRoot)
expectedPath := filepath.Join("deploy", "fluent-bit", fluentBitLocalPackagesRoot, "linux-"+arch, packageName)
return fmt.Errorf("install fluent-bit failed: local package missing for platform '%s', expected '%s'", platformKey, expectedPath)
}
return fmt.Errorf("install fluent-bit from local package failed: %w", err)
}
binPath, err := this.lookupFluentBitBinPath()
binPath, err = this.lookupFluentBitBinPath()
if err != nil {
return err
}
if binPath == "" {
return errors.New("fluent-bit binary not found after local package install")
}
_, stderr, err := this.client.Exec(binPath + " --version")
if err != nil {
return fmt.Errorf("verify fluent-bit version failed: %w, stderr: %s", err, stderr)
}
return nil
}
func (this *BaseInstaller) installFluentBitFromLocalPackage(tempDir string) error {
arch, err := this.detectRemoteLinuxArch()
if err != nil {
return err
}
func (this *BaseInstaller) installFluentBitFromLocalPackage(tempDir string, arch string, packageName string) error {
packageDir := filepath.Join(Tea.Root, "deploy", "fluent-bit", fluentBitLocalPackagesRoot, "linux-"+arch)
entries, err := os.ReadDir(packageDir)
if err != nil {
localPackagePath := filepath.Join(packageDir, packageName)
if _, err := os.Stat(localPackagePath); err != nil {
if os.IsNotExist(err) {
return errFluentBitLocalPackageNotFound
}
return fmt.Errorf("read fluent-bit local package dir failed: %w", err)
return fmt.Errorf("check local package failed: %w", err)
}
packageFiles := make([]string, 0)
for _, entry := range entries {
if entry.IsDir() {
remotePackagePath := tempDir + "/" + filepath.Base(localPackagePath)
if err := this.client.Copy(localPackagePath, remotePackagePath, 0644); err != nil {
return fmt.Errorf("upload local package failed: %w", err)
}
var installCmd string
lowerName := strings.ToLower(localPackagePath)
switch {
case strings.HasSuffix(lowerName, ".deb"):
installCmd = "dpkg -i " + shQuote(remotePackagePath)
case strings.HasSuffix(lowerName, ".rpm"):
installCmd = "rpm -Uvh --force " + shQuote(remotePackagePath) + " || rpm -ivh --force " + shQuote(remotePackagePath)
case strings.HasSuffix(lowerName, ".tar.gz") || strings.HasSuffix(lowerName, ".tgz"):
extractDir := tempDir + "/extract"
installCmd = "rm -rf " + shQuote(extractDir) + "; mkdir -p " + shQuote(extractDir) + "; tar -xzf " + shQuote(remotePackagePath) + " -C " + shQuote(extractDir) + "; " +
"bin=$(find " + shQuote(extractDir) + " -type f -name fluent-bit | head -n 1); " +
"if [ -z \"$bin\" ]; then exit 3; fi; " +
"mkdir -p /opt/fluent-bit/bin /usr/local/bin; " +
"install -m 0755 \"$bin\" /opt/fluent-bit/bin/fluent-bit; " +
"ln -sf /opt/fluent-bit/bin/fluent-bit /usr/local/bin/fluent-bit"
default:
return fmt.Errorf("unsupported local package format: %s", packageName)
}
_, stderr, err := this.client.Exec(installCmd)
if err != nil {
return fmt.Errorf("install fluent-bit local package '%s' failed: %w, stderr: %s", filepath.Base(localPackagePath), err, stderr)
}
return nil
}
func (this *BaseInstaller) detectRemotePlatformAndPackage() (platformKey string, packageName string, arch string, err error) {
arch, err = this.detectRemoteLinuxArch()
if err != nil {
return "", "", "", err
}
releaseData, stderr, err := this.client.Exec("cat /etc/os-release")
if err != nil {
return "", "", "", fmt.Errorf("read /etc/os-release failed: %w, stderr: %s", err, stderr)
}
if strings.TrimSpace(releaseData) == "" {
return "", "", "", errors.New("/etc/os-release is empty")
}
releaseMap := parseOSRelease(releaseData)
id := strings.ToLower(strings.TrimSpace(releaseMap["ID"]))
versionID := strings.TrimSpace(releaseMap["VERSION_ID"])
var distro string
switch {
case id == "ubuntu" && strings.HasPrefix(versionID, "22.04"):
distro = "ubuntu22.04"
case id == "amzn" && strings.HasPrefix(versionID, "2023"):
distro = "amzn2023"
default:
return "", "", "", fmt.Errorf("unsupported linux platform id='%s' version='%s'", id, versionID)
}
platformKey = distro + "-" + arch
packageName, ok := fluentBitPackageFileMapping[platformKey]
if !ok {
return "", "", "", fmt.Errorf("no local package mapping for platform '%s'", platformKey)
}
return platformKey, packageName, arch, nil
}
func parseOSRelease(content string) map[string]string {
result := map[string]string{}
lines := strings.Split(content, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") || !strings.Contains(line, "=") {
continue
}
name := strings.ToLower(entry.Name())
if strings.HasSuffix(name, ".deb") || strings.HasSuffix(name, ".rpm") || strings.HasSuffix(name, ".tar.gz") || strings.HasSuffix(name, ".tgz") {
packageFiles = append(packageFiles, filepath.Join(packageDir, entry.Name()))
}
parts := strings.SplitN(line, "=", 2)
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
value = strings.Trim(value, "\"")
result[key] = value
}
if len(packageFiles) == 0 {
return errFluentBitLocalPackageNotFound
}
sort.Strings(packageFiles)
var lastErr error
for _, localPackagePath := range packageFiles {
remotePackagePath := tempDir + "/" + filepath.Base(localPackagePath)
if err := this.client.Copy(localPackagePath, remotePackagePath, 0644); err != nil {
lastErr = fmt.Errorf("upload local package failed: %w", err)
continue
}
var installCmd string
lowerName := strings.ToLower(localPackagePath)
switch {
case strings.HasSuffix(lowerName, ".deb"):
installCmd = "dpkg -i " + remotePackagePath
case strings.HasSuffix(lowerName, ".rpm"):
installCmd = "rpm -Uvh --force " + remotePackagePath + " || rpm -ivh --force " + remotePackagePath
case strings.HasSuffix(lowerName, ".tar.gz") || strings.HasSuffix(lowerName, ".tgz"):
extractDir := tempDir + "/extract"
installCmd = "rm -rf " + extractDir + "; mkdir -p " + extractDir + "; tar -xzf " + remotePackagePath + " -C " + extractDir + "; " +
"bin=$(find " + extractDir + " -type f -name fluent-bit | head -n 1); " +
"if [ -z \"$bin\" ]; then exit 3; fi; " +
"mkdir -p /opt/fluent-bit/bin /usr/local/bin; " +
"install -m 0755 \"$bin\" /opt/fluent-bit/bin/fluent-bit; " +
"ln -sf /opt/fluent-bit/bin/fluent-bit /usr/local/bin/fluent-bit"
default:
continue
}
_, stderr, err := this.client.Exec(installCmd)
if err != nil {
lastErr = fmt.Errorf("install fluent-bit local package '%s' failed: %w, stderr: %s", filepath.Base(localPackagePath), err, stderr)
continue
}
binPath, err := this.lookupFluentBitBinPath()
if err == nil && binPath != "" {
return nil
}
if err != nil {
lastErr = err
} else {
lastErr = errors.New("fluent-bit binary not found after local package install")
}
}
if lastErr != nil {
return lastErr
}
return errFluentBitLocalPackageNotFound
return result
}
func (this *BaseInstaller) detectRemoteLinuxArch() (string, error) {
@@ -235,74 +282,456 @@ func (this *BaseInstaller) lookupFluentBitBinPath() (string, error) {
return strings.TrimSpace(stdout), nil
}
func (this *BaseInstaller) copyFluentBitConfigIfMissing(tempDir string) (bool, error) {
targets := []struct {
Src string
Dest string
}{
{Src: tempDir + "/fluent-bit.conf", Dest: fluentBitMainConfigFile},
{Src: tempDir + "/parsers.conf", Dest: fluentBitParsersFile},
{Src: tempDir + "/clickhouse-upstream.conf", Dest: fluentBitUpstreamFile},
{Src: tempDir + "/logrotate.conf", Dest: fluentBitLogrotateFile},
func (this *BaseInstaller) readLocalParsersContent() (string, error) {
parsersPath := filepath.Join(Tea.Root, "deploy", "fluent-bit", "parsers.conf")
data, err := os.ReadFile(parsersPath)
if err != nil {
return "", fmt.Errorf("read local parsers config failed: %w", err)
}
return string(data), nil
}
func (this *BaseInstaller) readManagedMeta() (*fluentBitManagedMeta, error) {
exists, err := this.remoteFileExists(fluentBitManagedMetaFile)
if err != nil {
return nil, err
}
if !exists {
return nil, nil
}
copied := false
for _, target := range targets {
exists, err := this.remoteFileExists(target.Dest)
content, stderr, err := this.client.Exec("cat " + shQuote(fluentBitManagedMetaFile))
if err != nil {
return nil, fmt.Errorf("read fluent-bit managed metadata failed: %w, stderr: %s", err, stderr)
}
if strings.TrimSpace(content) == "" {
return nil, nil
}
meta := &fluentBitManagedMeta{}
if err := json.Unmarshal([]byte(content), meta); err != nil {
return nil, fmt.Errorf("decode fluent-bit managed metadata failed: %w", err)
}
meta.Roles = normalizeRoles(meta.Roles)
return meta, nil
}
func mergeManagedRoles(meta *fluentBitManagedMeta, role nodeconfigs.NodeRole) ([]string, error) {
roleName, err := mapNodeRole(role)
if err != nil {
return nil, err
}
roleSet := map[string]struct{}{}
if meta != nil {
for _, r := range normalizeRoles(meta.Roles) {
roleSet[r] = struct{}{}
}
}
roleSet[roleName] = struct{}{}
roles := make([]string, 0, len(roleSet))
for r := range roleSet {
roles = append(roles, r)
}
sort.Strings(roles)
return roles, nil
}
func mapNodeRole(role nodeconfigs.NodeRole) (string, error) {
switch role {
case nodeconfigs.NodeRoleNode:
return fluentBitRoleNode, nil
case nodeconfigs.NodeRoleDNS:
return fluentBitRoleDNS, nil
default:
return "", fmt.Errorf("unsupported fluent-bit role '%s'", role)
}
}
func normalizeRoles(rawRoles []string) []string {
roleSet := map[string]struct{}{}
for _, role := range rawRoles {
role = strings.ToLower(strings.TrimSpace(role))
if role != fluentBitRoleNode && role != fluentBitRoleDNS {
continue
}
roleSet[role] = struct{}{}
}
roles := make([]string, 0, len(roleSet))
for role := range roleSet {
roles = append(roles, role)
}
sort.Strings(roles)
return roles
}
func hasRole(roles []string, role string) bool {
for _, one := range roles {
if one == role {
return true
}
}
return false
}
func (this *BaseInstaller) buildDesiredFluentBitConfig(roles []string) (*fluentBitDesiredConfig, error) {
if len(roles) == 0 {
return nil, errors.New("fluent-bit roles should not be empty")
}
ch, err := models.SharedSysSettingDAO.ReadClickHouseConfig(nil)
if err != nil {
return nil, fmt.Errorf("read clickhouse setting failed: %w", err)
}
if ch == nil {
ch = &systemconfigs.ClickHouseSetting{}
}
if strings.TrimSpace(ch.Host) == "" {
ch.Host = "127.0.0.1"
}
ch.Scheme = strings.ToLower(strings.TrimSpace(ch.Scheme))
if ch.Scheme == "" {
ch.Scheme = "https"
}
if ch.Scheme != "http" && ch.Scheme != "https" {
return nil, fmt.Errorf("unsupported clickhouse scheme '%s'", ch.Scheme)
}
if ch.Port <= 0 {
if ch.Scheme == "https" {
ch.Port = 8443
} else {
ch.Port = 8443
}
}
if strings.TrimSpace(ch.Database) == "" {
ch.Database = "default"
}
if strings.TrimSpace(ch.User) == "" {
ch.User = "default"
}
// 当前平台策略:后台固定跳过 ClickHouse TLS 证书校验,不暴露 ServerName 配置。
ch.TLSSkipVerify = true
ch.TLSServerName = ""
httpPathPattern := fluentBitHTTPPathPattern
dnsPathPattern := fluentBitDNSPathPattern
publicPolicyPath, err := this.readPublicAccessLogPolicyPath()
if err != nil {
return nil, err
}
policyDir := dirFromPolicyPath(publicPolicyPath)
if policyDir != "" {
pattern := strings.TrimRight(policyDir, "/") + "/*.log"
httpPathPattern = pattern
dnsPathPattern = pattern
}
return &fluentBitDesiredConfig{
Roles: normalizeRoles(roles),
ClickHouse: ch,
HTTPPathPattern: httpPathPattern,
DNSPathPattern: dnsPathPattern,
}, nil
}
func (this *BaseInstaller) readPublicAccessLogPolicyPath() (string, error) {
policyId, err := models.SharedHTTPAccessLogPolicyDAO.FindCurrentPublicPolicyId(nil)
if err != nil {
return "", fmt.Errorf("find current public access log policy failed: %w", err)
}
if policyId <= 0 {
return "", nil
}
policy, err := models.SharedHTTPAccessLogPolicyDAO.FindEnabledHTTPAccessLogPolicy(nil, policyId)
if err != nil {
return "", fmt.Errorf("read public access log policy failed: %w", err)
}
if policy == nil {
return "", nil
}
return strings.TrimSpace(models.ParseHTTPAccessLogPolicyFilePath(policy)), nil
}
func dirFromPolicyPath(policyPath string) string {
pathValue := strings.TrimSpace(policyPath)
if pathValue == "" {
return ""
}
pathValue = strings.ReplaceAll(pathValue, "\\", "/")
dir := slashpath.Dir(pathValue)
if dir == "." {
return ""
}
return strings.TrimRight(dir, "/")
}
func (this *BaseInstaller) applyManagedConfig(tempDir string, desired *fluentBitDesiredConfig, parserContent string, existingMeta *fluentBitManagedMeta) (bool, error) {
mainExists, err := this.remoteFileExists(fluentBitMainConfigFile)
if err != nil {
return false, err
}
if mainExists && existingMeta == nil {
containsMarker, err := this.remoteFileContains(fluentBitMainConfigFile, fluentBitManagedMarker)
if err != nil {
return false, err
}
if exists {
continue
if !containsMarker {
// Adopt unmanaged config by backing it up and replacing with managed config below.
}
_, stderr, err := this.client.Exec("cp -f " + target.Src + " " + target.Dest)
if err != nil {
return false, fmt.Errorf("copy fluent-bit file to '%s' failed: %w, stderr: %s", target.Dest, err, stderr)
}
copied = true
}
return copied, nil
configContent, err := renderManagedConfig(desired)
if err != nil {
return false, err
}
envContent := renderManagedEnv(desired.ClickHouse)
metaContent, newMeta, err := renderManagedMeta(desired, configContent, parserContent, envContent)
if err != nil {
return false, err
}
requiredFiles := []string{fluentBitMainConfigFile, fluentBitParsersFile, fluentBitManagedEnvFile, fluentBitManagedMetaFile}
if existingMeta != nil && existingMeta.Hash == newMeta.Hash {
allExists := true
for _, file := range requiredFiles {
exists, err := this.remoteFileExists(file)
if err != nil {
return false, err
}
if !exists {
allExists = false
break
}
}
if allExists {
return false, nil
}
}
if mainExists {
backup := fluentBitMainConfigFile + ".bak." + strconv.FormatInt(time.Now().Unix(), 10)
_, stderr, err := this.client.Exec("cp -f " + shQuote(fluentBitMainConfigFile) + " " + shQuote(backup))
if err != nil {
return false, fmt.Errorf("backup existing fluent-bit config failed: %w, stderr: %s", err, stderr)
}
}
if err := this.writeRemoteFileByTemp(tempDir, fluentBitMainConfigFile, configContent, 0644); err != nil {
return false, err
}
if err := this.writeRemoteFileByTemp(tempDir, fluentBitParsersFile, parserContent, 0644); err != nil {
return false, err
}
if err := this.writeRemoteFileByTemp(tempDir, fluentBitManagedEnvFile, envContent, 0600); err != nil {
return false, err
}
if err := this.writeRemoteFileByTemp(tempDir, fluentBitManagedMetaFile, metaContent, 0644); err != nil {
return false, err
}
localLogrotate := filepath.Join(Tea.Root, "deploy", "fluent-bit", "logrotate.conf")
if _, err := os.Stat(localLogrotate); err == nil {
if err := this.copyLocalFileToRemote(tempDir, localLogrotate, fluentBitLogrotateFile, 0644); err != nil {
return false, err
}
}
return true, nil
}
func (this *BaseInstaller) validateExistingConfigForRole(role nodeconfigs.NodeRole) error {
requiredPatterns := []string{}
switch role {
case nodeconfigs.NodeRoleNode:
requiredPatterns = append(requiredPatterns, fluentBitHTTPPathPattern)
case nodeconfigs.NodeRoleDNS:
requiredPatterns = append(requiredPatterns, fluentBitDNSPathPattern)
func renderManagedConfig(desired *fluentBitDesiredConfig) (string, error) {
if desired == nil || desired.ClickHouse == nil {
return "", errors.New("invalid fluent-bit desired config")
}
for _, pattern := range requiredPatterns {
ok, err := this.remoteFileContains(fluentBitMainConfigFile, pattern)
if err != nil {
return err
scheme := strings.ToLower(strings.TrimSpace(desired.ClickHouse.Scheme))
if scheme == "" {
scheme = "http"
}
if scheme != "http" && scheme != "https" {
return "", fmt.Errorf("unsupported clickhouse scheme '%s'", desired.ClickHouse.Scheme)
}
useTLS := scheme == "https"
insertHTTP := url.QueryEscape(fmt.Sprintf("INSERT INTO %s.logs_ingest FORMAT JSONEachRow", desired.ClickHouse.Database))
insertDNS := url.QueryEscape(fmt.Sprintf("INSERT INTO %s.dns_logs_ingest FORMAT JSONEachRow", desired.ClickHouse.Database))
lines := []string{
"# " + fluentBitManagedMarker,
"[SERVICE]",
" Flush 2",
" Log_Level info",
" Parsers_File " + fluentBitParsersFile,
" storage.path " + fluentBitStorageDir,
" storage.sync normal",
" storage.checksum off",
" storage.backlog.mem_limit 256MB",
"",
}
if hasRole(desired.Roles, fluentBitRoleNode) {
lines = append(lines,
"[INPUT]",
" Name tail",
" Path "+desired.HTTPPathPattern,
" Tag app.http.logs",
" Parser json",
" Refresh_Interval 2",
" Read_from_Head false",
" DB /var/lib/fluent-bit/http-logs.db",
" storage.type filesystem",
" Mem_Buf_Limit 128MB",
" Skip_Long_Lines On",
"",
)
}
if hasRole(desired.Roles, fluentBitRoleDNS) {
lines = append(lines,
"[INPUT]",
" Name tail",
" Path "+desired.DNSPathPattern,
" Tag app.dns.logs",
" Parser json",
" Refresh_Interval 2",
" Read_from_Head false",
" DB /var/lib/fluent-bit/dns-logs.db",
" storage.type filesystem",
" Mem_Buf_Limit 128MB",
" Skip_Long_Lines On",
"",
)
}
if hasRole(desired.Roles, fluentBitRoleNode) {
lines = append(lines,
"[OUTPUT]",
" Name http",
" Match app.http.logs",
" Host "+desired.ClickHouse.Host,
" Port "+strconv.Itoa(desired.ClickHouse.Port),
" URI /?query="+insertHTTP,
" Format json_lines",
" http_user ${CH_USER}",
" http_passwd ${CH_PASSWORD}",
" json_date_key timestamp",
" json_date_format epoch",
" workers 1",
" net.keepalive On",
" Retry_Limit False",
)
if useTLS {
lines = append(lines, " tls On")
if desired.ClickHouse.TLSSkipVerify {
lines = append(lines, " tls.verify Off")
} else {
lines = append(lines, " tls.verify On")
}
if strings.TrimSpace(desired.ClickHouse.TLSServerName) != "" {
lines = append(lines, " tls.vhost "+strings.TrimSpace(desired.ClickHouse.TLSServerName))
}
}
if !ok {
return fmt.Errorf("existing fluent-bit config '%s' does not contain required path '%s'; skip overwrite by design, please update config manually", fluentBitMainConfigFile, pattern)
lines = append(lines, "")
}
if hasRole(desired.Roles, fluentBitRoleDNS) {
lines = append(lines,
"[OUTPUT]",
" Name http",
" Match app.dns.logs",
" Host "+desired.ClickHouse.Host,
" Port "+strconv.Itoa(desired.ClickHouse.Port),
" URI /?query="+insertDNS,
" Format json_lines",
" http_user ${CH_USER}",
" http_passwd ${CH_PASSWORD}",
" json_date_key timestamp",
" json_date_format epoch",
" workers 1",
" net.keepalive On",
" Retry_Limit False",
)
if useTLS {
lines = append(lines, " tls On")
if desired.ClickHouse.TLSSkipVerify {
lines = append(lines, " tls.verify Off")
} else {
lines = append(lines, " tls.verify On")
}
if strings.TrimSpace(desired.ClickHouse.TLSServerName) != "" {
lines = append(lines, " tls.vhost "+strings.TrimSpace(desired.ClickHouse.TLSServerName))
}
}
lines = append(lines, "")
}
return strings.Join(lines, "\n"), nil
}
func renderManagedEnv(ch *systemconfigs.ClickHouseSetting) string {
user := "default"
password := ""
if ch != nil {
if strings.TrimSpace(ch.User) != "" {
user = strings.TrimSpace(ch.User)
}
password = ch.Password
}
return "CH_USER=" + strconv.Quote(user) + "\n" +
"CH_PASSWORD=" + strconv.Quote(password) + "\n"
}
func renderManagedMeta(desired *fluentBitDesiredConfig, configContent string, parserContent string, envContent string) (string, *fluentBitManagedMeta, error) {
hashInput := configContent + "\n---\n" + parserContent + "\n---\n" + envContent + "\n---\n" + strings.Join(desired.Roles, ",")
hashBytes := sha256.Sum256([]byte(hashInput))
hash := fmt.Sprintf("%x", hashBytes[:])
meta := &fluentBitManagedMeta{
Roles: desired.Roles,
Hash: hash,
UpdatedAt: time.Now().Unix(),
SourceVersion: teaconst.Version,
}
data, err := json.MarshalIndent(meta, "", " ")
if err != nil {
return "", nil, fmt.Errorf("encode fluent-bit managed metadata failed: %w", err)
}
return string(data) + "\n", meta, nil
}
func (this *BaseInstaller) copyLocalFileToRemote(tempDir string, localPath string, remotePath string, mode os.FileMode) error {
tempFile := tempDir + "/" + filepath.Base(remotePath)
if err := this.client.Copy(localPath, tempFile, mode); err != nil {
return fmt.Errorf("upload fluent-bit file '%s' failed: %w", localPath, err)
}
_, stderr, err := this.client.Exec("cp -f " + shQuote(tempFile) + " " + shQuote(remotePath) + " && chmod " + fmt.Sprintf("%04o", mode) + " " + shQuote(remotePath))
if err != nil {
return fmt.Errorf("install remote fluent-bit file '%s' failed: %w, stderr: %s", remotePath, err, stderr)
}
return nil
}
func (this *BaseInstaller) remoteFileExists(path string) (bool, error) {
stdout, stderr, err := this.client.Exec("if [ -f \"" + path + "\" ]; then echo 1; else echo 0; fi")
if err != nil {
return false, fmt.Errorf("check remote file '%s' failed: %w, stderr: %s", path, err, stderr)
func (this *BaseInstaller) writeRemoteFileByTemp(tempDir string, remotePath string, content string, mode os.FileMode) error {
tempFile := tempDir + "/" + filepath.Base(remotePath) + ".tmp"
if _, err := this.client.WriteFile(tempFile, []byte(content)); err != nil {
return fmt.Errorf("write temp fluent-bit file '%s' failed: %w", tempFile, err)
}
return strings.TrimSpace(stdout) == "1", nil
_, stderr, err := this.client.Exec("cp -f " + shQuote(tempFile) + " " + shQuote(remotePath) + " && chmod " + fmt.Sprintf("%04o", mode) + " " + shQuote(remotePath))
if err != nil {
return fmt.Errorf("write managed fluent-bit file '%s' failed: %w, stderr: %s", remotePath, err, stderr)
}
return nil
}
func (this *BaseInstaller) remoteFileContains(path string, pattern string) (bool, error) {
stdout, stderr, err := this.client.Exec("if grep -F \"" + pattern + "\" \"" + path + "\" >/dev/null 2>&1; then echo 1; else echo 0; fi")
if err != nil {
return false, fmt.Errorf("check remote file content '%s' failed: %w, stderr: %s", path, err, stderr)
}
return strings.TrimSpace(stdout) == "1", nil
}
func (this *BaseInstaller) ensureFluentBitService(binPath string, configCopied bool) error {
func (this *BaseInstaller) ensureFluentBitService(tempDir string, binPath string, configChanged bool) error {
_, _, _ = this.client.Exec("if command -v systemctl >/dev/null 2>&1 && [ ! -f /etc/systemd/system/" + fluentBitServiceName + ".service ] && [ ! -f /lib/systemd/system/" + fluentBitServiceName + ".service ]; then " +
"cat > /etc/systemd/system/" + fluentBitServiceName + ".service <<'EOF'\n" +
"[Unit]\n" +
@@ -319,26 +748,90 @@ func (this *BaseInstaller) ensureFluentBitService(binPath string, configCopied b
"EOF\n" +
"fi")
stdout, stderr, err := this.client.Exec("if command -v systemctl >/dev/null 2>&1; then systemctl daemon-reload; systemctl enable " + fluentBitServiceName + " >/dev/null 2>&1 || true; if systemctl is-active " + fluentBitServiceName + " >/dev/null 2>&1; then " +
"if [ \"" + boolToString(configCopied) + "\" = \"1\" ]; then systemctl restart " + fluentBitServiceName + "; fi; " +
"else systemctl start " + fluentBitServiceName + "; fi; else echo no-systemctl; fi")
stdout, _, err := this.client.Exec("if command -v systemctl >/dev/null 2>&1; then echo 1; else echo 0; fi")
if err != nil {
return fmt.Errorf("ensure fluent-bit service failed: %w, stderr: %s", err, stderr)
return fmt.Errorf("check systemctl failed: %w", err)
}
if strings.TrimSpace(stdout) == "no-systemctl" {
_, _, runningErr := this.client.Exec("pgrep -f \"fluent-bit.*fluent-bit.conf\" >/dev/null 2>&1")
if runningErr != nil {
_, stderr, err = this.client.Exec(binPath + " -c " + fluentBitMainConfigFile + " >/dev/null 2>&1 &")
if err != nil {
return fmt.Errorf("start fluent-bit without systemd failed: %w, stderr: %s", err, stderr)
}
if strings.TrimSpace(stdout) == "1" {
dropInChanged, err := this.ensureServiceDropIn(tempDir, binPath)
if err != nil {
return err
}
restartRequired := configChanged || dropInChanged
_, stderr, err := this.client.Exec("systemctl daemon-reload; systemctl enable " + fluentBitServiceName + " >/dev/null 2>&1 || true; " +
"if systemctl is-active " + fluentBitServiceName + " >/dev/null 2>&1; then " +
"if [ \"" + boolToString(restartRequired) + "\" = \"1\" ]; then systemctl restart " + fluentBitServiceName + "; fi; " +
"else systemctl start " + fluentBitServiceName + "; fi")
if err != nil {
return fmt.Errorf("ensure fluent-bit service failed: %w, stderr: %s", err, stderr)
}
return nil
}
if configChanged {
_, _, _ = this.client.Exec("pkill -f \"fluent-bit.*fluent-bit.conf\" >/dev/null 2>&1 || true")
}
_, _, runningErr := this.client.Exec("pgrep -f \"fluent-bit.*fluent-bit.conf\" >/dev/null 2>&1")
if runningErr != nil {
startCmd := "set -a; [ -f " + shQuote(fluentBitManagedEnvFile) + " ] && . " + shQuote(fluentBitManagedEnvFile) + "; set +a; " +
shQuote(binPath) + " -c " + shQuote(fluentBitMainConfigFile) + " >/dev/null 2>&1 &"
_, stderr, err := this.client.Exec(startCmd)
if err != nil {
return fmt.Errorf("start fluent-bit without systemd failed: %w, stderr: %s", err, stderr)
}
}
return nil
}
func (this *BaseInstaller) ensureServiceDropIn(tempDir string, binPath string) (bool, error) {
_, stderr, err := this.client.Exec("mkdir -p " + shQuote(fluentBitDropInDir))
if err != nil {
return false, fmt.Errorf("prepare fluent-bit drop-in dir failed: %w, stderr: %s", err, stderr)
}
content := "[Service]\n" +
"EnvironmentFile=-" + fluentBitManagedEnvFile + "\n" +
"ExecStart=\n" +
"ExecStart=" + binPath + " -c " + fluentBitMainConfigFile + "\n"
existing, _, _ := this.client.Exec("if [ -f " + shQuote(fluentBitDropInFile) + " ]; then cat " + shQuote(fluentBitDropInFile) + "; fi")
if existing == content {
return false, nil
}
if err := this.writeRemoteFileByTemp(tempDir, fluentBitDropInFile, content, 0644); err != nil {
return false, err
}
return true, nil
}
func (this *BaseInstaller) remoteFileExists(path string) (bool, error) {
stdout, stderr, err := this.client.Exec("if [ -f " + shQuote(path) + " ]; then echo 1; else echo 0; fi")
if err != nil {
return false, fmt.Errorf("check remote file '%s' failed: %w, stderr: %s", path, err, stderr)
}
return strings.TrimSpace(stdout) == "1", nil
}
func (this *BaseInstaller) remoteFileContains(path string, pattern string) (bool, error) {
stdout, stderr, err := this.client.Exec("if grep -F " + shQuote(pattern) + " " + shQuote(path) + " >/dev/null 2>&1; then echo 1; else echo 0; fi")
if err != nil {
return false, fmt.Errorf("check remote file content '%s' failed: %w, stderr: %s", path, err, stderr)
}
return strings.TrimSpace(stdout) == "1", nil
}
func shQuote(value string) string {
if value == "" {
return "''"
}
return "'" + strings.ReplaceAll(value, "'", "'\"'\"'") + "'"
}
func boolToString(v bool) string {
if v {
return "1"