148 lines
4.9 KiB
Go
148 lines
4.9 KiB
Go
package setup
|
||
|
||
import (
|
||
"context"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/TeaOSLab/EdgeAPI/internal/clickhouse"
|
||
)
|
||
|
||
// EnsureClickHouseTables 自动确保日志相关 ClickHouse 表存在。
|
||
// 仅做 CREATE TABLE IF NOT EXISTS,不会覆盖已有表结构。
|
||
func EnsureClickHouseTables() error {
|
||
client := clickhouse.NewClient()
|
||
if !client.IsConfigured() {
|
||
return nil
|
||
}
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||
defer cancel()
|
||
|
||
sqls := []string{
|
||
`CREATE TABLE IF NOT EXISTS logs_ingest
|
||
(
|
||
timestamp DateTime CODEC(DoubleDelta, ZSTD(1)),
|
||
node_id UInt64,
|
||
cluster_id UInt64,
|
||
server_id UInt64,
|
||
host LowCardinality(String),
|
||
ip String,
|
||
method LowCardinality(String),
|
||
path String CODEC(ZSTD(1)),
|
||
status UInt16,
|
||
bytes_in UInt64 CODEC(Delta, ZSTD(1)),
|
||
bytes_out UInt64 CODEC(Delta, ZSTD(1)),
|
||
cost_ms UInt32 CODEC(Delta, ZSTD(1)),
|
||
ua String CODEC(ZSTD(1)),
|
||
referer String CODEC(ZSTD(1)),
|
||
log_type LowCardinality(String),
|
||
trace_id String,
|
||
firewall_policy_id UInt64 DEFAULT 0,
|
||
firewall_rule_group_id UInt64 DEFAULT 0,
|
||
firewall_rule_set_id UInt64 DEFAULT 0,
|
||
firewall_rule_id UInt64 DEFAULT 0,
|
||
request_headers String DEFAULT '' CODEC(ZSTD(3)),
|
||
request_body String DEFAULT '' CODEC(ZSTD(3)),
|
||
response_headers String DEFAULT '' CODEC(ZSTD(3)),
|
||
response_body String DEFAULT '' CODEC(ZSTD(3)),
|
||
attrs String DEFAULT '' CODEC(ZSTD(3)),
|
||
INDEX idx_trace_id trace_id TYPE bloom_filter(0.01) GRANULARITY 4,
|
||
INDEX idx_ip ip TYPE bloom_filter(0.01) GRANULARITY 4,
|
||
INDEX idx_host host TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4,
|
||
INDEX idx_fw_policy firewall_policy_id TYPE minmax GRANULARITY 4,
|
||
INDEX idx_status status TYPE minmax GRANULARITY 4
|
||
)
|
||
ENGINE = MergeTree
|
||
PARTITION BY toYYYYMMDD(timestamp)
|
||
ORDER BY (timestamp, node_id, server_id, trace_id)
|
||
SETTINGS index_granularity = 8192`,
|
||
`CREATE TABLE IF NOT EXISTS dns_logs_ingest
|
||
(
|
||
timestamp DateTime CODEC(DoubleDelta, ZSTD(1)),
|
||
request_id String,
|
||
node_id UInt64,
|
||
cluster_id UInt64,
|
||
domain_id UInt64,
|
||
record_id UInt64,
|
||
remote_addr String,
|
||
question_name String,
|
||
question_type LowCardinality(String),
|
||
record_name String,
|
||
record_type LowCardinality(String),
|
||
record_value String,
|
||
networking LowCardinality(String),
|
||
is_recursive UInt8,
|
||
error String CODEC(ZSTD(1)),
|
||
ns_route_codes Array(String),
|
||
content_json String DEFAULT '' CODEC(ZSTD(3)),
|
||
INDEX idx_request_id request_id TYPE bloom_filter(0.01) GRANULARITY 4,
|
||
INDEX idx_remote_addr remote_addr TYPE bloom_filter(0.01) GRANULARITY 4,
|
||
INDEX idx_question_name question_name TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4,
|
||
INDEX idx_domain_id domain_id TYPE minmax GRANULARITY 4
|
||
)
|
||
ENGINE = MergeTree
|
||
PARTITION BY toYYYYMMDD(timestamp)
|
||
ORDER BY (timestamp, request_id, node_id)
|
||
SETTINGS index_granularity = 8192`,
|
||
`CREATE TABLE IF NOT EXISTS httpdns_access_logs_ingest
|
||
(
|
||
request_id String,
|
||
cluster_id UInt64,
|
||
node_id UInt64,
|
||
app_id String,
|
||
app_name String,
|
||
domain String,
|
||
qtype LowCardinality(String),
|
||
client_ip String,
|
||
client_region String,
|
||
carrier String,
|
||
sdk_version String,
|
||
os LowCardinality(String),
|
||
result_ips String,
|
||
status LowCardinality(String),
|
||
error_code String,
|
||
cost_ms UInt32,
|
||
created_at UInt64,
|
||
day String,
|
||
summary String CODEC(ZSTD(1)),
|
||
INDEX idx_request_id request_id TYPE bloom_filter(0.01) GRANULARITY 4,
|
||
INDEX idx_cluster_id cluster_id TYPE minmax GRANULARITY 4,
|
||
INDEX idx_node_id node_id TYPE minmax GRANULARITY 4,
|
||
INDEX idx_app_id app_id TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4,
|
||
INDEX idx_domain domain TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4,
|
||
INDEX idx_status status TYPE minmax GRANULARITY 4
|
||
)
|
||
ENGINE = MergeTree
|
||
PARTITION BY day
|
||
ORDER BY (day, created_at, request_id, node_id)
|
||
SETTINGS index_granularity = 8192`,
|
||
}
|
||
|
||
for _, sql := range sqls {
|
||
stmt := strings.TrimSpace(sql)
|
||
if len(stmt) == 0 {
|
||
continue
|
||
}
|
||
if err := client.Execute(ctx, stmt); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
// v1.5.1: 为 logs_ingest 添加 attrs 列(存储 cache.status 等扩展属性)
|
||
upgradeSQLs := []string{
|
||
`ALTER TABLE logs_ingest ADD COLUMN IF NOT EXISTS attrs String DEFAULT '' CODEC(ZSTD(3)) AFTER response_body`,
|
||
}
|
||
for _, sql := range upgradeSQLs {
|
||
stmt := strings.TrimSpace(sql)
|
||
if len(stmt) == 0 {
|
||
continue
|
||
}
|
||
if err := client.Execute(ctx, stmt); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|