Files
waf-platform/EdgeAPI/internal/setup/clickhouse_upgrade.go
2026-03-22 17:37:40 +08:00

148 lines
4.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package setup
import (
"context"
"strings"
"time"
"github.com/TeaOSLab/EdgeAPI/internal/clickhouse"
)
// EnsureClickHouseTables 自动确保日志相关 ClickHouse 表存在。
// 仅做 CREATE TABLE IF NOT EXISTS不会覆盖已有表结构。
func EnsureClickHouseTables() error {
client := clickhouse.NewClient()
if !client.IsConfigured() {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
sqls := []string{
`CREATE TABLE IF NOT EXISTS logs_ingest
(
timestamp DateTime CODEC(DoubleDelta, ZSTD(1)),
node_id UInt64,
cluster_id UInt64,
server_id UInt64,
host LowCardinality(String),
ip String,
method LowCardinality(String),
path String CODEC(ZSTD(1)),
status UInt16,
bytes_in UInt64 CODEC(Delta, ZSTD(1)),
bytes_out UInt64 CODEC(Delta, ZSTD(1)),
cost_ms UInt32 CODEC(Delta, ZSTD(1)),
ua String CODEC(ZSTD(1)),
referer String CODEC(ZSTD(1)),
log_type LowCardinality(String),
trace_id String,
firewall_policy_id UInt64 DEFAULT 0,
firewall_rule_group_id UInt64 DEFAULT 0,
firewall_rule_set_id UInt64 DEFAULT 0,
firewall_rule_id UInt64 DEFAULT 0,
request_headers String DEFAULT '' CODEC(ZSTD(3)),
request_body String DEFAULT '' CODEC(ZSTD(3)),
response_headers String DEFAULT '' CODEC(ZSTD(3)),
response_body String DEFAULT '' CODEC(ZSTD(3)),
attrs String DEFAULT '' CODEC(ZSTD(3)),
INDEX idx_trace_id trace_id TYPE bloom_filter(0.01) GRANULARITY 4,
INDEX idx_ip ip TYPE bloom_filter(0.01) GRANULARITY 4,
INDEX idx_host host TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4,
INDEX idx_fw_policy firewall_policy_id TYPE minmax GRANULARITY 4,
INDEX idx_status status TYPE minmax GRANULARITY 4
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp, node_id, server_id, trace_id)
SETTINGS index_granularity = 8192`,
`CREATE TABLE IF NOT EXISTS dns_logs_ingest
(
timestamp DateTime CODEC(DoubleDelta, ZSTD(1)),
request_id String,
node_id UInt64,
cluster_id UInt64,
domain_id UInt64,
record_id UInt64,
remote_addr String,
question_name String,
question_type LowCardinality(String),
record_name String,
record_type LowCardinality(String),
record_value String,
networking LowCardinality(String),
is_recursive UInt8,
error String CODEC(ZSTD(1)),
ns_route_codes Array(String),
content_json String DEFAULT '' CODEC(ZSTD(3)),
INDEX idx_request_id request_id TYPE bloom_filter(0.01) GRANULARITY 4,
INDEX idx_remote_addr remote_addr TYPE bloom_filter(0.01) GRANULARITY 4,
INDEX idx_question_name question_name TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4,
INDEX idx_domain_id domain_id TYPE minmax GRANULARITY 4
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp, request_id, node_id)
SETTINGS index_granularity = 8192`,
`CREATE TABLE IF NOT EXISTS httpdns_access_logs_ingest
(
request_id String,
cluster_id UInt64,
node_id UInt64,
app_id String,
app_name String,
domain String,
qtype LowCardinality(String),
client_ip String,
client_region String,
carrier String,
sdk_version String,
os LowCardinality(String),
result_ips String,
status LowCardinality(String),
error_code String,
cost_ms UInt32,
created_at UInt64,
day String,
summary String CODEC(ZSTD(1)),
INDEX idx_request_id request_id TYPE bloom_filter(0.01) GRANULARITY 4,
INDEX idx_cluster_id cluster_id TYPE minmax GRANULARITY 4,
INDEX idx_node_id node_id TYPE minmax GRANULARITY 4,
INDEX idx_app_id app_id TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4,
INDEX idx_domain domain TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4,
INDEX idx_status status TYPE minmax GRANULARITY 4
)
ENGINE = MergeTree
PARTITION BY day
ORDER BY (day, created_at, request_id, node_id)
SETTINGS index_granularity = 8192`,
}
for _, sql := range sqls {
stmt := strings.TrimSpace(sql)
if len(stmt) == 0 {
continue
}
if err := client.Execute(ctx, stmt); err != nil {
return err
}
}
// v1.5.1: 为 logs_ingest 添加 attrs 列(存储 cache.status 等扩展属性)
upgradeSQLs := []string{
`ALTER TABLE logs_ingest ADD COLUMN IF NOT EXISTS attrs String DEFAULT '' CODEC(ZSTD(3)) AFTER response_body`,
}
for _, sql := range upgradeSQLs {
stmt := strings.TrimSpace(sql)
if len(stmt) == 0 {
continue
}
if err := client.Execute(ctx, stmt); err != nil {
return err
}
}
return nil
}