Go工程师体系课 012【学习笔记】

Go 中集成 Elasticsearch

1. 客户端库选择

1.1 主流 Go ES 客户端

  • olivere/elastic:功能最全面,API 设计优雅,支持 ES 7.x/8.x
  • elastic/go-elasticsearch:官方客户端,轻量级,更接近原生 REST API
  • go-elasticsearch/elasticsearch:社区维护的官方客户端分支

1.2 推荐选择

olivere/elastic 是生产环境首选,原因:

  • 类型安全的查询构建器
  • 完善的错误处理
  • 支持所有 ES 功能(聚合、批量操作、索引管理等)
  • 活跃维护,版本更新及时

2. olivere/elastic 快速入门

2.1 安装依赖

go mod init your-project
go get github.com/olivere/elastic/v7

2.2 基础连接

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/olivere/elastic/v7"
)

func main() {
    // 创建客户端
    client, err := elastic.NewClient(
        elastic.SetURL("http://localhost:9200"),
        elastic.SetSniff(false), // 单节点环境关闭嗅探
        elastic.SetHealthcheck(false), // 关闭健康检查
    )
    if err != nil {
        log.Fatal(err)
    }
    defer client.Stop()

    // 检查连接
    info, code, err := client.Ping("http://localhost:9200").Do(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("ES 版本: %s, 状态码: %d\n", info.Version.Number, code)
}

2.3 连接配置选项

client, err := elastic.NewClient(
    elastic.SetURL("http://localhost:9200", "http://localhost:9201"), // 多节点
    elastic.SetBasicAuth("username", "password"), // 认证
    elastic.SetSniff(true), // 自动发现节点
    elastic.SetHealthcheckInterval(10*time.Second), // 健康检查间隔
    elastic.SetMaxRetries(3), // 最大重试次数
    elastic.SetRetryStatusCodes(502, 503, 504), // 重试状态码
    elastic.SetGzip(true), // 启用压缩
    elastic.SetErrorLog(log.New(os.Stderr, "ES ", log.LstdFlags)), // 错误日志
    elastic.SetInfoLog(log.New(os.Stdout, "ES ", log.LstdFlags)), // 信息日志
)

3. 索引管理

3.1 创建索引

// 创建索引
createIndex, err := client.CreateIndex("products").
    BodyString(`{
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 1
        },
        "mappings": {
            "properties": {
                "title": {
                    "type": "text",
                    "analyzer": "ik_smart",
                    "fields": {
                        "keyword": {
                            "type": "keyword"
                        }
                    }
                },
                "price": {"type": "double"},
                "status": {"type": "keyword"},
                "created_at": {"type": "date"}
            }
        }
    }`).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引创建结果: %v\n", createIndex.Acknowledged)

3.2 检查索引是否存在

exists, err := client.IndexExists("products").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引存在: %v\n", exists)

3.3 删除索引

deleteIndex, err := client.DeleteIndex("products").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引删除结果: %v\n", deleteIndex.Acknowledged)

4. 文档操作

4.1 定义文档结构

type Product struct {
    ID        string    `json:"id"`
    Title     string    `json:"title"`
    Price     float64   `json:"price"`
    Status    string    `json:"status"`
    CreatedAt time.Time `json:"created_at"`
    Tags      []string  `json:"tags"`
}

4.2 添加文档

// 添加单个文档
product := Product{
    ID:        "1",
    Title:     "iPhone 15 Pro",
    Price:     7999.0,
    Status:    "active",
    CreatedAt: time.Now(),
    Tags:      []string{"phone", "apple", "premium"},
}

put1, err := client.Index().
    Index("products").
    Id("1").
    BodyJson(product).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("文档索引结果: %s\n", put1.Result)

4.3 获取文档

// 根据 ID 获取文档
get1, err := client.Get().
    Index("products").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

if get1.Found {
    var product Product
    err = json.Unmarshal(get1.Source, &product)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("获取到文档: %+v\n", product)
}

4.4 更新文档

// 部分更新
update, err := client.Update().
    Index("products").
    Id("1").
    Doc(map[string]interface{}{
        "price": 7599.0,
    }).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("更新结果: %s\n", update.Result)

4.5 删除文档

delete, err := client.Delete().
    Index("products").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("删除结果: %s\n", delete.Result)

5. 批量操作

5.1 批量索引

bulkRequest := client.Bulk()

products := []Product{
    {ID: "2", Title: "Samsung Galaxy S24", Price: 5999.0, Status: "active", CreatedAt: time.Now()},
    {ID: "3", Title: "MacBook Pro M3", Price: 12999.0, Status: "active", CreatedAt: time.Now()},
    {ID: "4", Title: "iPad Air", Price: 3999.0, Status: "active", CreatedAt: time.Now()},
}

for _, product := range products {
    req := elastic.NewBulkIndexRequest().
        Index("products").
        Id(product.ID).
        Doc(product)
    bulkRequest = bulkRequest.Add(req)
}

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("批量索引完成,处理了 %d 个请求\n", len(bulkResponse.Items))

5.2 批量更新

bulkRequest := client.Bulk()

// 批量更新价格
updates := map[string]float64{
    "2": 5799.0,
    "3": 11999.0,
    "4": 3799.0,
}

for id, price := range updates {
    req := elastic.NewBulkUpdateRequest().
        Index("products").
        Id(id).
        Doc(map[string]interface{}{"price": price})
    bulkRequest = bulkRequest.Add(req)
}

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}

6. 搜索查询

6.1 简单搜索

// 匹配所有文档
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("找到 %d 个文档\n", searchResult.TotalHits())

6.2 匹配查询

// 文本匹配查询
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchQuery("title", "iPhone")).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

for _, hit := range searchResult.Hits.Hits {
    var product Product
    err := json.Unmarshal(hit.Source, &product)
    if err != nil {
        continue
    }
    fmt.Printf("文档 ID: %s, 标题: %s, 分数: %f\n",
        hit.Id, product.Title, *hit.Score)
}

6.3 复合查询

// 布尔查询
boolQuery := elastic.NewBoolQuery().
    Must(elastic.NewMatchQuery("title", "iPhone")).
    Filter(elastic.NewTermQuery("status", "active")).
    Filter(elastic.NewRangeQuery("price").Gte(1000).Lte(10000))

searchResult, err := client.Search().
    Index("products").
    Query(boolQuery).
    Sort("price", true). // 按价格升序
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

6.4 多字段搜索

// 多字段匹配
multiMatchQuery := elastic.NewMultiMatchQuery("苹果手机", "title", "tags").
    Type("best_fields").
    FieldWithBoost("title", 3.0)

searchResult, err := client.Search().
    Index("products").
    Query(multiMatchQuery).
    Highlight(elastic.NewHighlight().Field("title")).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

7. 聚合分析

7.1 基础聚合

// 按状态分组统计
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("status_count", elastic.NewTermsAggregation().Field("status")).
    Size(0). // 不返回文档,只要聚合结果
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statusAgg, found := searchResult.Aggregations.Terms("status_count")
if found {
    for _, bucket := range statusAgg.Buckets {
        fmt.Printf("状态: %s, 数量: %d\n", bucket.Key, bucket.DocCount)
    }
}

7.2 数值统计聚合

// 价格统计
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("price_stats", elastic.NewStatsAggregation().Field("price")).
    Size(0).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statsAgg, found := searchResult.Aggregations.Stats("price_stats")
if found {
    fmt.Printf("价格统计 - 最小值: %.2f, 最大值: %.2f, 平均值: %.2f, 总数: %.2f\n",
        statsAgg.Min, statsAgg.Max, statsAgg.Avg, statsAgg.Sum)
}

7.3 复合聚合

// 按状态分组,每组内统计价格
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("status_groups",
        elastic.NewTermsAggregation().Field("status").
            SubAggregation("price_stats", elastic.NewStatsAggregation().Field("price"))).
    Size(0).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statusAgg, found := searchResult.Aggregations.Terms("status_groups")
if found {
    for _, bucket := range statusAgg.Buckets {
        priceStats, found := bucket.Stats("price_stats")
        if found {
            fmt.Printf("状态: %s, 平均价格: %.2f, 数量: %d\n",
                bucket.Key, priceStats.Avg, bucket.DocCount)
        }
    }
}

8. 分页与滚动

8.1 基础分页

// 分页查询
page := 1
size := 10
from := (page - 1) * size

searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    From(from).
    Size(size).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("第 %d 页,共 %d 条记录\n", page, searchResult.TotalHits())

8.2 滚动查询(大数据量)

// 滚动查询,适合大数据量导出
scroll := client.Scroll("products").Size(100).KeepAlive("1m")
for {
    searchResult, err := scroll.Do(context.Background())
    if err == io.EOF {
        break // 数据读取完毕
    }
    if err != nil {
        log.Fatal(err)
    }

    for _, hit := range searchResult.Hits.Hits {
        var product Product
        err := json.Unmarshal(hit.Source, &product)
        if err != nil {
            continue
        }
        fmt.Printf("处理文档: %s\n", product.Title)
    }
}

9. 错误处理

9.1 常见错误处理

func handleESError(err error) {
    if err == nil {
        return
    }

    // 检查是否是 ES 错误
    if esErr, ok := err.(*elastic.Error); ok {
        switch esErr.Status {
        case 404:
            fmt.Println("文档或索引不存在")
        case 409:
            fmt.Println("版本冲突")
        case 400:
            fmt.Printf("请求错误: %s\n", esErr.Details)
        default:
            fmt.Printf("ES 错误: %d - %s\n", esErr.Status, esErr.Details)
        }
        return
    }

    // 网络或其他错误
    fmt.Printf("其他错误: %v\n", err)
}

10. 最佳实践

10.1 连接池配置

client, err := elastic.NewClient(
    elastic.SetURL("http://localhost:9200"),
    elastic.SetMaxRetries(3),
    elastic.SetRetryBackoff(func(i int) time.Duration {
        return time.Duration(i) * 100 * time.Millisecond
    }),
    elastic.SetHealthcheckInterval(30*time.Second),
    elastic.SetSniff(false), // 生产环境建议关闭
)

10.2 上下文管理

// 使用带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Do(ctx)

10.3 批量操作优化

// 批量大小控制
const batchSize = 1000

func bulkIndexProducts(products []Product) error {
    for i := 0; i < len(products); i += batchSize {
        end := i + batchSize
        if end > len(products) {
            end = len(products)
        }

        bulkRequest := client.Bulk()
        for j := i; j < end; j++ {
            req := elastic.NewBulkIndexRequest().
                Index("products").
                Id(products[j].ID).
                Doc(products[j])
            bulkRequest = bulkRequest.Add(req)
        }

        _, err := bulkRequest.Do(context.Background())
        if err != nil {
            return err
        }
    }
    return nil
}

11. 完整示例项目结构

project/
├── go.mod
├── go.sum
├── main.go
├── config/
│   └── es.go          # ES 配置
├── models/
│   └── product.go     # 数据模型
├── services/
│   └── es_service.go  # ES 服务层
└── handlers/
    └── product.go     # 业务处理

11.1 配置管理

// config/es.go
package config

import (
    "github.com/olivere/elastic/v7"
)

type ESConfig struct {
    URLs     []string
    Username string
    Password string
    Sniff    bool
}

func NewESClient(config ESConfig) (*elastic.Client, error) {
    options := []elastic.ClientOptionFunc{
        elastic.SetURL(config.URLs...),
        elastic.SetSniff(config.Sniff),
    }

    if config.Username != "" && config.Password != "" {
        options = append(options, elastic.SetBasicAuth(config.Username, config.Password))
    }

    return elastic.NewClient(options...)
}

11.2 服务层封装

// services/es_service.go
package services

import (
    "context"
    "encoding/json"

    "github.com/olivere/elastic/v7"
)

type ProductService struct {
    client *elastic.Client
    index  string
}

func NewProductService(client *elastic.Client) *ProductService {
    return &ProductService{
        client: client,
        index:  "products",
    }
}

func (s *ProductService) SearchProducts(query string, from, size int) ([]Product, int64, error) {
    searchResult, err := s.client.Search().
        Index(s.index).
        Query(elastic.NewMultiMatchQuery(query, "title", "tags")).
        From(from).
        Size(size).
        Do(context.Background())
    if err != nil {
        return nil, 0, err
    }

    var products []Product
    for _, hit := range searchResult.Hits.Hits {
        var product Product
        err := json.Unmarshal(hit.Source, &product)
        if err != nil {
            continue
        }
        products = append(products, product)
    }

    return products, searchResult.TotalHits(), nil
}

12. 总结

olivere/elastic 是 Go 语言中最成熟的 ES 客户端,提供了:

  • 类型安全:编译时检查查询语法
  • 功能完整:支持所有 ES 功能
  • 性能优化:连接池、批量操作、重试机制
  • 易于使用:链式 API 设计,代码可读性强

通过本文的示例,您可以快速在 Go 项目中集成 Elasticsearch,实现高效的搜索和分析功能。

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/4785

(0)
Walker的头像Walker
上一篇 2025年11月25日 12:00
下一篇 2025年11月25日 10:00

相关推荐

  • Nuxt3_扫盲 入门与原理介绍【学习笔记】

    Nuxt 3 入门与原理介绍 💡 什么是 Nuxt 3? Nuxt 3 是基于 Vue 3 和 Vite 打造的全栈前端框架,支持: 服务端渲染(SSR) 静态站点生成(SSG) 单页应用(SPA) 构建全栈应用(支持 API) Nuxt 3 是 Vue 的“加强版”,帮你简化项目结构和开发流程。 🔧 核心原理 功能 Nuxt 如何处理 ✅ 页面路由 自动根…

    个人 2025年4月6日
    2.1K00
  • 深入理解ES6 011【学习笔记】

    Promise与异步编程 因为执行引擎是单线程的,所以需要跟踪即将运行的代码,那些代码被放在一个任务队列中,每当一段代码准备执行时,都会被添加到任务队列中,每当引擎中的一段代码结束执行,事件循环会执行队列中的一下个任务。 Promise相当于异步操作结果占位符,它不会去订阅一个事件,也不会传递一个回调函数给目标函数,而是让函数返回一个Promise,就像这样…

    个人 2025年3月8日
    1.1K00
  • Go工程师体系课 004【学习笔记】

    需求分析 后台管理系统 商品管理 商品列表 商品分类 品牌管理 品牌分类 订单管理 订单列表 用户信息管理 用户列表 用户地址 用户留言 轮播图管理 电商系统 登录页面 首页 商品搜索 商品分类导航 轮播图展示 推荐商品展示 商品详情页 商品图片展示 商品描述 商品规格选择 加入购物车 购物车 商品列表 数量调整 删除商品 结算功能 用户中心 订单中心 我的…

    2025年11月25日
    21600
  • Go工程师体系课 protoc-gen-validate【学习笔记】

    protoc-gen-validate 简介与使用指南 ✅ 什么是 protoc-gen-validate protoc-gen-validate(简称 PGV)是一个 Protocol Buffers 插件,用于在生成的 Go 代码中添加结构体字段的验证逻辑。 它通过在 .proto 文件中添加 validate 规则,自动为每个字段生成验证代码,避免你手…

    个人 2025年11月25日
    1.3K00
  • 深入理解ES6 003【学习笔记】

    函数 参数默认值,以及一些关于arguments对象,如何使用表达式作为参数、参数的临时死区。 以前设置默认值总是利用在含有逻辑或操作符的表达式中,前一个值是false时,总是返回后面一个的值,但如果我们给参数传0时,就有些麻烦。需要去验证一下类型 function makeRequest(url,timeout,callback){ timeout = t…

    个人 2025年3月8日
    1.2K00
简体中文 繁體中文 English
欢迎🌹 Coding never stops, keep learning! 💡💻 光临🌹