Files
2026-02-27 10:35:22 +08:00

228 lines
5.1 KiB
Go

package clickhouse
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
type Client struct {
cfg *Config
httpCli *http.Client
}
func NewClient() *Client {
cfg := SharedConfig()
transport := &http.Transport{}
if cfg != nil && strings.EqualFold(cfg.Scheme, "https") {
transport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: cfg.TLSSkipVerify,
ServerName: cfg.TLSServerName,
}
}
return &Client{
cfg: cfg,
httpCli: &http.Client{
Timeout: 30 * time.Second,
Transport: transport,
},
}
}
func (c *Client) IsConfigured() bool {
return c.cfg != nil && c.cfg.IsConfigured()
}
func (c *Client) Query(ctx context.Context, query string, dest interface{}) error {
if !c.IsConfigured() {
return fmt.Errorf("clickhouse: not configured")
}
q := strings.TrimSpace(query)
if !strings.HasSuffix(strings.ToUpper(q), "FORMAT JSONEACHROW") {
query = q + " FORMAT JSONEachRow"
}
u := c.buildURL(query)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return err
}
if c.cfg.User != "" || c.cfg.Password != "" {
req.SetBasicAuth(c.cfg.User, c.cfg.Password)
}
resp, err := c.httpCli.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("clickhouse HTTP %d: %s", resp.StatusCode, string(body))
}
dec := json.NewDecoder(resp.Body)
return decodeRows(dec, dest)
}
func (c *Client) QueryRow(ctx context.Context, query string, dest interface{}) error {
if !c.IsConfigured() {
return fmt.Errorf("clickhouse: not configured")
}
q := strings.TrimSpace(query)
if !strings.HasSuffix(strings.ToUpper(q), "FORMAT JSONEACHROW") {
query = q + " FORMAT JSONEachRow"
}
u := c.buildURL(query)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return err
}
if c.cfg.User != "" || c.cfg.Password != "" {
req.SetBasicAuth(c.cfg.User, c.cfg.Password)
}
resp, err := c.httpCli.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("clickhouse HTTP %d: %s", resp.StatusCode, string(body))
}
dec := json.NewDecoder(resp.Body)
return decodeOneRow(dec, dest)
}
func (c *Client) Execute(ctx context.Context, query string) error {
if !c.IsConfigured() {
return fmt.Errorf("clickhouse: not configured")
}
u := c.buildURL(strings.TrimSpace(query))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, nil)
if err != nil {
return err
}
if c.cfg.User != "" || c.cfg.Password != "" {
req.SetBasicAuth(c.cfg.User, c.cfg.Password)
}
resp, err := c.httpCli.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("clickhouse HTTP %d: %s", resp.StatusCode, string(body))
}
return nil
}
func (c *Client) InsertJSONEachRow(ctx context.Context, insertSQL string, rows []map[string]interface{}) error {
if len(rows) == 0 {
return nil
}
if !c.IsConfigured() {
return fmt.Errorf("clickhouse: not configured")
}
query := strings.TrimSpace(insertSQL)
if !strings.HasSuffix(strings.ToUpper(query), "FORMAT JSONEACHROW") {
query += " FORMAT JSONEachRow"
}
var payload bytes.Buffer
for _, row := range rows {
if row == nil {
continue
}
data, err := json.Marshal(row)
if err != nil {
return err
}
payload.Write(data)
payload.WriteByte('\n')
}
u := c.buildURL(query)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, &payload)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
if c.cfg.User != "" || c.cfg.Password != "" {
req.SetBasicAuth(c.cfg.User, c.cfg.Password)
}
resp, err := c.httpCli.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("clickhouse HTTP %d: %s", resp.StatusCode, string(body))
}
return nil
}
func (c *Client) buildURL(query string) string {
scheme := "http"
if c.cfg != nil && strings.EqualFold(c.cfg.Scheme, "https") {
scheme = "https"
}
return fmt.Sprintf("%s://%s:%d/?query=%s&database=%s",
scheme, c.cfg.Host, c.cfg.Port, url.QueryEscape(query), url.QueryEscape(c.cfg.Database))
}
func decodeRows(dec *json.Decoder, dest interface{}) error {
switch d := dest.(type) {
case *[]map[string]interface{}:
*d = (*d)[:0]
for {
var row map[string]interface{}
if err := dec.Decode(&row); err != nil {
if err == io.EOF {
return nil
}
return err
}
*d = append(*d, row)
}
default:
return fmt.Errorf("clickhouse: unsupported dest type for Query (use *[]map[string]interface{})")
}
}
func decodeOneRow(dec *json.Decoder, dest interface{}) error {
switch d := dest.(type) {
case *map[string]interface{}:
if err := dec.Decode(d); err != nil {
return err
}
return nil
default:
return fmt.Errorf("clickhouse: unsupported dest type for QueryRow (use *map[string]interface{})")
}
}