换成单集群模式

This commit is contained in:
robin
2026-03-02 20:07:53 +08:00
parent 5d0b7c7e91
commit 2a76d1773d
432 changed files with 5681 additions and 5095 deletions

View File

@@ -33,11 +33,13 @@ const (
fluentBitServiceName = "fluent-bit"
fluentBitDefaultBinPath = "/opt/fluent-bit/bin/fluent-bit"
fluentBitLocalPackagesRoot = "packages"
fluentBitHTTPPathPattern = "/var/log/edge/edge-node/*.log"
fluentBitDNSPathPattern = "/var/log/edge/edge-dns/*.log"
fluentBitManagedMarker = "managed-by-edgeapi"
fluentBitRoleNode = "node"
fluentBitRoleDNS = "dns"
fluentBitHTTPPathPattern = "/var/log/edge/edge-node/*.log"
fluentBitDNSPathPattern = "/var/log/edge/edge-dns/*.log"
fluentBitHTTPDNSPathPattern = "/var/log/edge/edge-httpdns/*.log"
fluentBitManagedMarker = "managed-by-edgeapi"
fluentBitRoleNode = "node"
fluentBitRoleDNS = "dns"
fluentBitRoleHTTPDNS = "httpdns"
)
var errFluentBitLocalPackageNotFound = errors.New("fluent-bit local package not found")
@@ -57,10 +59,11 @@ type fluentBitManagedMeta struct {
}
type fluentBitDesiredConfig struct {
Roles []string
ClickHouse *systemconfigs.ClickHouseSetting
HTTPPathPattern string
DNSPathPattern string
Roles []string
ClickHouse *systemconfigs.ClickHouseSetting
HTTPPathPattern string
DNSPathPattern string
HTTPDNSPathPattern string
}
// SetupFluentBit 安装并托管 Fluent Bit 配置(离线包 + 平台渲染配置)。
@@ -344,7 +347,7 @@ func mapNodeRole(role nodeconfigs.NodeRole) (string, error) {
case nodeconfigs.NodeRoleDNS:
return fluentBitRoleDNS, nil
case nodeconfigs.NodeRoleHTTPDNS:
return fluentBitRoleDNS, nil
return fluentBitRoleHTTPDNS, nil
default:
return "", fmt.Errorf("unsupported fluent-bit role '%s'", role)
}
@@ -354,7 +357,7 @@ func normalizeRoles(rawRoles []string) []string {
roleSet := map[string]struct{}{}
for _, role := range rawRoles {
role = strings.ToLower(strings.TrimSpace(role))
if role != fluentBitRoleNode && role != fluentBitRoleDNS {
if role != fluentBitRoleNode && role != fluentBitRoleDNS && role != fluentBitRoleHTTPDNS {
continue
}
roleSet[role] = struct{}{}
@@ -420,6 +423,7 @@ func (this *BaseInstaller) buildDesiredFluentBitConfig(roles []string) (*fluentB
httpPathPattern := fluentBitHTTPPathPattern
dnsPathPattern := fluentBitDNSPathPattern
httpdnsPathPattern := fluentBitHTTPDNSPathPattern
publicPolicyPath, err := this.readPublicAccessLogPolicyPath()
if err != nil {
return nil, err
@@ -429,13 +433,15 @@ func (this *BaseInstaller) buildDesiredFluentBitConfig(roles []string) (*fluentB
pattern := strings.TrimRight(policyDir, "/") + "/*.log"
httpPathPattern = pattern
dnsPathPattern = pattern
httpdnsPathPattern = pattern
}
return &fluentBitDesiredConfig{
Roles: normalizeRoles(roles),
ClickHouse: ch,
HTTPPathPattern: httpPathPattern,
DNSPathPattern: dnsPathPattern,
Roles: normalizeRoles(roles),
ClickHouse: ch,
HTTPPathPattern: httpPathPattern,
DNSPathPattern: dnsPathPattern,
HTTPDNSPathPattern: httpdnsPathPattern,
}, nil
}
@@ -556,6 +562,7 @@ func renderManagedConfig(desired *fluentBitDesiredConfig) (string, error) {
insertHTTP := url.QueryEscape(fmt.Sprintf("INSERT INTO %s.logs_ingest FORMAT JSONEachRow", desired.ClickHouse.Database))
insertDNS := url.QueryEscape(fmt.Sprintf("INSERT INTO %s.dns_logs_ingest FORMAT JSONEachRow", desired.ClickHouse.Database))
insertHTTPDNS := url.QueryEscape(fmt.Sprintf("INSERT INTO %s.httpdns_access_logs_ingest FORMAT JSONEachRow", desired.ClickHouse.Database))
lines := []string{
"# " + fluentBitManagedMarker,
@@ -604,6 +611,23 @@ func renderManagedConfig(desired *fluentBitDesiredConfig) (string, error) {
)
}
if hasRole(desired.Roles, fluentBitRoleHTTPDNS) {
lines = append(lines,
"[INPUT]",
" Name tail",
" Path "+desired.HTTPDNSPathPattern,
" Tag app.httpdns.logs",
" Parser json",
" Refresh_Interval 2",
" Read_from_Head false",
" DB /var/lib/fluent-bit/httpdns-logs.db",
" storage.type filesystem",
" Mem_Buf_Limit 256MB",
" Skip_Long_Lines On",
"",
)
}
if hasRole(desired.Roles, fluentBitRoleNode) {
lines = append(lines,
"[OUTPUT]",
@@ -666,6 +690,37 @@ func renderManagedConfig(desired *fluentBitDesiredConfig) (string, error) {
lines = append(lines, "")
}
if hasRole(desired.Roles, fluentBitRoleHTTPDNS) {
lines = append(lines,
"[OUTPUT]",
" Name http",
" Match app.httpdns.logs",
" Host "+desired.ClickHouse.Host,
" Port "+strconv.Itoa(desired.ClickHouse.Port),
" URI /?query="+insertHTTPDNS,
" Format json_lines",
" http_user ${CH_USER}",
" http_passwd ${CH_PASSWORD}",
" json_date_key timestamp",
" json_date_format epoch",
" workers 2",
" net.keepalive On",
" Retry_Limit False",
)
if useTLS {
lines = append(lines, " tls On")
if desired.ClickHouse.TLSSkipVerify {
lines = append(lines, " tls.verify Off")
} else {
lines = append(lines, " tls.verify On")
}
if strings.TrimSpace(desired.ClickHouse.TLSServerName) != "" {
lines = append(lines, " tls.vhost "+strings.TrimSpace(desired.ClickHouse.TLSServerName))
}
}
lines = append(lines, "")
}
return strings.Join(lines, "\n"), nil
}

View File

@@ -13,10 +13,10 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/sslconfigs"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps"
)
var sharedHTTPDNSNodeQueue = NewHTTPDNSNodeQueue()
@@ -98,9 +98,8 @@ func (q *HTTPDNSNodeQueue) InstallNode(nodeId int64, installStatus *models.NodeI
return errors.New("can not find cluster")
}
sshHost, sshPort, grantId, err := q.parseSSHInfo(node)
sshHost, sshPort, grantId, err := q.parseSSHInfo(node, installStatus)
if err != nil {
installStatus.ErrorCode = "EMPTY_SSH"
return err
}
@@ -287,35 +286,37 @@ func (q *HTTPDNSNodeQueue) resolveClusterTLSListenAddr(cluster *models.HTTPDNSCl
return defaultListenAddr, nil
}
func (q *HTTPDNSNodeQueue) parseSSHInfo(node *models.HTTPDNSNode) (string, int, int64, error) {
func (q *HTTPDNSNodeQueue) parseSSHInfo(node *models.HTTPDNSNode, installStatus *models.NodeInstallStatus) (string, int, int64, error) {
if node == nil {
return "", 0, 0, errors.New("node should not be nil")
}
if len(node.InstallStatus) == 0 {
return "", 0, 0, errors.New("ssh config should not be empty")
}
statusMap := maps.Map{}
err := json.Unmarshal(node.InstallStatus, &statusMap)
login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(nil, nodeconfigs.NodeRoleHTTPDNS, int64(node.Id))
if err != nil {
return "", 0, 0, errors.New("invalid install status data")
return "", 0, 0, err
}
sshMap := statusMap.GetMap("ssh")
if sshMap == nil {
return "", 0, 0, errors.New("ssh config should not be empty")
if login == nil {
installStatus.ErrorCode = "EMPTY_SSH"
return "", 0, 0, errors.New("ssh login not found for node '" + numberutils.FormatInt64(int64(node.Id)) + "'")
}
host := sshMap.GetString("host")
port := sshMap.GetInt("port")
grantId := sshMap.GetInt64("grantId")
if len(host) == 0 {
sshParams, err := login.DecodeSSHParams()
if err != nil {
installStatus.ErrorCode = "EMPTY_SSH"
return "", 0, 0, err
}
if len(sshParams.Host) == 0 {
installStatus.ErrorCode = "EMPTY_SSH_HOST"
return "", 0, 0, errors.New("ssh host should not be empty")
}
if port <= 0 {
port = 22
if sshParams.Port <= 0 {
sshParams.Port = 22
}
if grantId <= 0 {
if sshParams.GrantId <= 0 {
installStatus.ErrorCode = "EMPTY_GRANT"
return "", 0, 0, errors.New("grant id should not be empty")
}
return host, port, grantId, nil
return sshParams.Host, sshParams.Port, sshParams.GrantId, nil
}