Go工程師體系課 012

Go 中集成 Elasticsearch

1. 客戶端庫選擇

1.1 主流 Go ES 客戶端

  • olivere/elastic:功能最全面,API 設計優雅,支持 ES 7.x/8.x
  • elastic/go-elasticsearch:官方客戶端,輕量級,更接近原生 REST API
  • go-elasticsearch/elasticsearch:社區維護的官方客戶端分支

1.2 推薦選擇

olivere/elastic 是生產環境首選,原因:

  • 類型安全的查詢構建器
  • 完善的錯誤處理
  • 支持所有 ES 功能(聚合、批量操作、索引管理等)
  • 活躍維護,版本更新及時

2. olivere/elastic 快速入門

2.1 安裝依賴

go mod init your-project
go get github.com/olivere/elastic/v7

2.2 基礎連接

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/olivere/elastic/v7"
)

func main() {
    // 創建客戶端
    client, err := elastic.NewClient(
        elastic.SetURL("http://localhost:9200"),
        elastic.SetSniff(false), // 單節點環境關閉嗅探
        elastic.SetHealthcheck(false), // 關閉健康檢查
    )
    if err != nil {
        log.Fatal(err)
    }
    defer client.Stop()

    // 檢查連接
    info, code, err := client.Ping("http://localhost:9200").Do(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("ES 版本: %s, 狀態碼: %d\n", info.Version.Number, code)
}

2.3 連接配置選項

client, err := elastic.NewClient(
    elastic.SetURL("http://localhost:9200", "http://localhost:9201"), // 多節點
    elastic.SetBasicAuth("username", "password"), // 認證
    elastic.SetSniff(true), // 自動發現節點
    elastic.SetHealthcheckInterval(10*time.Second), // 健康檢查間隔
    elastic.SetMaxRetries(3), // 最大重試次數
    elastic.SetRetryStatusCodes(502, 503, 504), // 重試狀態碼
    elastic.SetGzip(true), // 啓用壓縮
    elastic.SetErrorLog(log.New(os.Stderr, "ES ", log.LstdFlags)), // 錯誤日誌
    elastic.SetInfoLog(log.New(os.Stdout, "ES ", log.LstdFlags)), // 信息日誌
)

3. 索引管理

3.1 創建索引

// 創建索引
createIndex, err := client.CreateIndex("products").
    BodyString(`{
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 1
        },
        "mappings": {
            "properties": {
                "title": {
                    "type": "text",
                    "analyzer": "ik_smart",
                    "fields": {
                        "keyword": {
                            "type": "keyword"
                        }
                    }
                },
                "price": {"type": "double"},
                "status": {"type": "keyword"},
                "created_at": {"type": "date"}
            }
        }
    }`).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引創建結果: %v\n", createIndex.Acknowledged)

3.2 檢查索引是否存在

exists, err := client.IndexExists("products").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引存在: %v\n", exists)

3.3 刪除索引

deleteIndex, err := client.DeleteIndex("products").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引刪除結果: %v\n", deleteIndex.Acknowledged)

4. 文檔操作

4.1 定義文檔結構

type Product struct {
    ID        string    `json:"id"`
    Title     string    `json:"title"`
    Price     float64   `json:"price"`
    Status    string    `json:"status"`
    CreatedAt time.Time `json:"created_at"`
    Tags      []string  `json:"tags"`
}

4.2 添加文檔

// 添加單個文檔
product := Product{
    ID:        "1",
    Title:     "iPhone 15 Pro",
    Price:     7999.0,
    Status:    "active",
    CreatedAt: time.Now(),
    Tags:      []string{"phone", "apple", "premium"},
}

put1, err := client.Index().
    Index("products").
    Id("1").
    BodyJson(product).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("文檔索引結果: %s\n", put1.Result)

4.3 獲取文檔

// 根據 ID 獲取文檔
get1, err := client.Get().
    Index("products").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

if get1.Found {
    var product Product
    err = json.Unmarshal(get1.Source, &product)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("獲取到文檔: %+v\n", product)
}

4.4 更新文檔

// 部分更新
update, err := client.Update().
    Index("products").
    Id("1").
    Doc(map[string]interface{}{
        "price": 7599.0,
    }).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("更新結果: %s\n", update.Result)

4.5 刪除文檔

delete, err := client.Delete().
    Index("products").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("刪除結果: %s\n", delete.Result)

5. 批量操作

5.1 批量索引

bulkRequest := client.Bulk()

products := []Product{
    {ID: "2", Title: "Samsung Galaxy S24", Price: 5999.0, Status: "active", CreatedAt: time.Now()},
    {ID: "3", Title: "MacBook Pro M3", Price: 12999.0, Status: "active", CreatedAt: time.Now()},
    {ID: "4", Title: "iPad Air", Price: 3999.0, Status: "active", CreatedAt: time.Now()},
}

for _, product := range products {
    req := elastic.NewBulkIndexRequest().
        Index("products").
        Id(product.ID).
        Doc(product)
    bulkRequest = bulkRequest.Add(req)
}

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("批量索引完成,處理了 %d 個請求\n", len(bulkResponse.Items))

5.2 批量更新

bulkRequest := client.Bulk()

// 批量更新價格
updates := map[string]float64{
    "2": 5799.0,
    "3": 11999.0,
    "4": 3799.0,
}

for id, price := range updates {
    req := elastic.NewBulkUpdateRequest().
        Index("products").
        Id(id).
        Doc(map[string]interface{}{"price": price})
    bulkRequest = bulkRequest.Add(req)
}

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}

6. 搜索查詢

6.1 簡單搜索

// 匹配所有文檔
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("找到 %d 個文檔\n", searchResult.TotalHits())

6.2 匹配查詢

// 文本匹配查詢
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchQuery("title", "iPhone")).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

for _, hit := range searchResult.Hits.Hits {
    var product Product
    err := json.Unmarshal(hit.Source, &product)
    if err != nil {
        continue
    }
    fmt.Printf("文檔 ID: %s, 標題: %s, 分數: %f\n",
        hit.Id, product.Title, *hit.Score)
}

6.3 復合查詢

// 布爾查詢
boolQuery := elastic.NewBoolQuery().
    Must(elastic.NewMatchQuery("title", "iPhone")).
    Filter(elastic.NewTermQuery("status", "active")).
    Filter(elastic.NewRangeQuery("price").Gte(1000).Lte(10000))

searchResult, err := client.Search().
    Index("products").
    Query(boolQuery).
    Sort("price", true). // 按價格升序
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

6.4 多字段搜索

// 多字段匹配
multiMatchQuery := elastic.NewMultiMatchQuery("蘋果手機", "title", "tags").
    Type("best_fields").
    FieldWithBoost("title", 3.0)

searchResult, err := client.Search().
    Index("products").
    Query(multiMatchQuery).
    Highlight(elastic.NewHighlight().Field("title")).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

7. 聚合分析

7.1 基礎聚合

// 按狀態分組統計
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("status_count", elastic.NewTermsAggregation().Field("status")).
    Size(0). // 不返回文檔,只要聚合結果
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statusAgg, found := searchResult.Aggregations.Terms("status_count")
if found {
    for _, bucket := range statusAgg.Buckets {
        fmt.Printf("狀態: %s, 數量: %d\n", bucket.Key, bucket.DocCount)
    }
}

7.2 數值統計聚合

// 價格統計
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("price_stats", elastic.NewStatsAggregation().Field("price")).
    Size(0).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statsAgg, found := searchResult.Aggregations.Stats("price_stats")
if found {
    fmt.Printf("價格統計 - 最小值: %.2f, 最大值: %.2f, 平均值: %.2f, 總數: %.2f\n",
        statsAgg.Min, statsAgg.Max, statsAgg.Avg, statsAgg.Sum)
}

7.3 復合聚合

// 按狀態分組,每組內統計價格
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("status_groups",
        elastic.NewTermsAggregation().Field("status").
            SubAggregation("price_stats", elastic.NewStatsAggregation().Field("price"))).
    Size(0).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statusAgg, found := searchResult.Aggregations.Terms("status_groups")
if found {
    for _, bucket := range statusAgg.Buckets {
        priceStats, found := bucket.Stats("price_stats")
        if found {
            fmt.Printf("狀態: %s, 平均價格: %.2f, 數量: %d\n",
                bucket.Key, priceStats.Avg, bucket.DocCount)
        }
    }
}

8. 分頁與滾動

8.1 基礎分頁

// 分頁查詢
page := 1
size := 10
from := (page - 1) * size

searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    From(from).
    Size(size).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("第 %d 頁,共 %d 條記錄\n", page, searchResult.TotalHits())

8.2 滾動查詢(大數據量)

// 滾動查詢,適合大數據量導出
scroll := client.Scroll("products").Size(100).KeepAlive("1m")
for {
    searchResult, err := scroll.Do(context.Background())
    if err == io.EOF {
        break // 數據讀取完畢
    }
    if err != nil {
        log.Fatal(err)
    }

    for _, hit := range searchResult.Hits.Hits {
        var product Product
        err := json.Unmarshal(hit.Source, &product)
        if err != nil {
            continue
        }
        fmt.Printf("處理文檔: %s\n", product.Title)
    }
}

9. 錯誤處理

9.1 常見錯誤處理

func handleESError(err error) {
    if err == nil {
        return
    }

    // 檢查是否是 ES 錯誤
    if esErr, ok := err.(*elastic.Error); ok {
        switch esErr.Status {
        case 404:
            fmt.Println("文檔或索引不存在")
        case 409:
            fmt.Println("版本衝突")
        case 400:
            fmt.Printf("請求錯誤: %s\n", esErr.Details)
        default:
            fmt.Printf("ES 錯誤: %d - %s\n", esErr.Status, esErr.Details)
        }
        return
    }

    // 網絡或其他錯誤
    fmt.Printf("其他錯誤: %v\n", err)
}

10. 最佳實踐

10.1 連接池配置

client, err := elastic.NewClient(
    elastic.SetURL("http://localhost:9200"),
    elastic.SetMaxRetries(3),
    elastic.SetRetryBackoff(func(i int) time.Duration {
        return time.Duration(i) * 100 * time.Millisecond
    }),
    elastic.SetHealthcheckInterval(30*time.Second),
    elastic.SetSniff(false), // 生產環境建議關閉
)

10.2 上下文管理

// 使用帶超時的上下文
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Do(ctx)

10.3 批量操作優化

// 批量大小控制
const batchSize = 1000

func bulkIndexProducts(products []Product) error {
    for i := 0; i < len(products); i += batchSize {
        end := i + batchSize
        if end > len(products) {
            end = len(products)
        }

        bulkRequest := client.Bulk()
        for j := i; j < end; j++ {
            req := elastic.NewBulkIndexRequest().
                Index("products").
                Id(products[j].ID).
                Doc(products[j])
            bulkRequest = bulkRequest.Add(req)
        }

        _, err := bulkRequest.Do(context.Background())
        if err != nil {
            return err
        }
    }
    return nil
}

11. 完整示例項目結構

project/
├── go.mod
├── go.sum
├── main.go
├── config/
│   └── es.go          # ES 配置
├── models/
│   └── product.go     # 數據模型
├── services/
│   └── es_service.go  # ES 服務層
└── handlers/
    └── product.go     # 業務處理

11.1 配置管理

// config/es.go
package config

import (
    "github.com/olivere/elastic/v7"
)

type ESConfig struct {
    URLs     []string
    Username string
    Password string
    Sniff    bool
}

func NewESClient(config ESConfig) (*elastic.Client, error) {
    options := []elastic.ClientOptionFunc{
        elastic.SetURL(config.URLs...),
        elastic.SetSniff(config.Sniff),
    }

    if config.Username != "" && config.Password != "" {
        options = append(options, elastic.SetBasicAuth(config.Username, config.Password))
    }

    return elastic.NewClient(options...)
}

11.2 服務層封裝

// services/es_service.go
package services

import (
    "context"
    "encoding/json"

    "github.com/olivere/elastic/v7"
)

type ProductService struct {
    client *elastic.Client
    index  string
}

func NewProductService(client *elastic.Client) *ProductService {
    return &ProductService{
        client: client,
        index:  "products",
    }
}

func (s *ProductService) SearchProducts(query string, from, size int) ([]Product, int64, error) {
    searchResult, err := s.client.Search().
        Index(s.index).
        Query(elastic.NewMultiMatchQuery(query, "title", "tags")).
        From(from).
        Size(size).
        Do(context.Background())
    if err != nil {
        return nil, 0, err
    }

    var products []Product
    for _, hit := range searchResult.Hits.Hits {
        var product Product
        err := json.Unmarshal(hit.Source, &product)
        if err != nil {
            continue
        }
        products = append(products, product)
    }

    return products, searchResult.TotalHits(), nil
}

12. 總結

olivere/elastic 是 Go 語言中最成熟的 ES 客戶端,提供了:

  • 類型安全:編譯時檢查查詢語法
  • 功能完整:支持所有 ES 功能
  • 性能優化:連接池、批量操作、重試機制
  • 易於使用:鏈式 API 設計,代碼可讀性強

通過本文的示例,您可以快速在 Go 項目中集成 Elasticsearch,實現高效的搜索和分析功能。

主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6778

(0)
Walker的頭像Walker
上一篇 12小時前
下一篇 2025年3月8日 12:52

相關推薦

  • Go工程師體系課 017

    限流、熔斷與降級入門(含 Sentinel 實戰) 結合課件第 3 章(3-1 ~ 3-9)的視頻要點,整理一套面向初學者的服務保護指南,幫助理解“為甚麼需要限流、熔斷和降級”,以及如何用 Sentinel 快速上手。 學習路線速覽 3-1 理解服務雪崩與限流、熔斷、降級的背景 3-2 Sentinel 與 Hystrix 對比,明確技術選型 3-3 Sen…

  • Go工程師體系課 009

    其它一些功能 個人中心 收藏 管理收貨地址(增刪改查) 留言 拷貝inventory_srv--> userop_srv 查詢替換所有的inventory Elasticsearch 深度解析文檔 1. 甚麼是Elasticsearch Elasticsearch是一個基於Apache Lucene構建的分布式、RESTful搜索和分析引擎,能夠快速地…

    後端開發 6小時前
    000
  • Go日積月累 電子書目錄與推薦

    Go 語言電子書精華整理與推薦 基於 48 份 Go 語言電子書資料,按主題提煉為 4 篇系統化精華文檔。整理時間:2026-03-06 精華文章導讀 以下 4 篇文章從 48 份電子書中提煉核心知識,按主題系統化整理,覆蓋 Go 語言從底層原理到企業實戰的完整知識體系。 1. Go 底層原理與源碼精華 知識來源:《Go 源碼剖析》(雨痕)、《Go 1.4 …

    後端開發 1天前
    1000
  • Go資深工程師講解(慕課) 006_函數式編程

    Go 函數式編程 對應視頻 Ch6(6-2 函數式編程例一),在 002.md 基礎上擴展更多函數式編程模式 1. 回顧:Go 中函數是一等公民 Go 不是純函數式語言,但函數可以作為:- 變量- 參數- 返回值- 存放在數據結構中 // 函數作為變量 var add = func(a, b int) int { return a + b } // 函數作為…

  • Go工程師體系課 004

    需求分析 後台管理系統 商品管理 商品列表 商品分類 品牌管理 品牌分類 訂單管理 訂單列表 用戶信息管理 用戶列表 用戶地址 用戶留言 輪播圖管理 電商系統 登錄頁面 首頁 商品搜索 商品分類導航 輪播圖展示 推薦商品展示 商品詳情頁 商品圖片展示 商品描述 商品規格選擇 加入購物車 購物車 商品列表 數量調整 刪除商品 結算功能 用戶中心 訂單中心 我的…

    11小時前
    100
簡體中文 繁體中文 English