Go Engineer Systematic Course 012

Integrating Elasticsearch in Go

1. Client Library Selection

1.1 Mainstream Go ES Clients

  • olivere/elastic: Most comprehensive features, elegant API design, supports ES 7.x/8.x
  • elastic/go-elasticsearch: Official client, lightweight, closer to native REST API
  • go-elasticsearch/elasticsearch: Community-maintained branch of the official client

1.2 Recommended Choice

olivere/elastic is the preferred choice for production environments due to the following reasons:

  • Type-safe query builder
  • Comprehensive error handling
  • Supports all ES features (aggregations, bulk operations, index management, etc.)
  • Actively maintained, timely version updates

2. olivere/elastic Quick Start

2.1 Install Dependencies

go mod init your-project
go get github.com/olivere/elastic/v7

2.2 Basic Connection

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/olivere/elastic/v7"
)

func main() {
    // Create client
    client, err := elastic.NewClient(
        elastic.SetURL("http://localhost:9200"),
        elastic.SetSniff(false), // Disable sniffing for single-node environment
        elastic.SetHealthcheck(false), // Disable health check
    )
    if err != nil {
        log.Fatal(err)
    }
    defer client.Stop()

    // Check connection
    info, code, err := client.Ping("http://localhost:9200").Do(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("ES Version: %s, Status Code: %d\n", info.Version.Number, code)
}

2.3 Connection Configuration Options

client, err := elastic.NewClient(
    elastic.SetURL("http://localhost:9200", "http://localhost:9201"), // Multiple nodes
    elastic.SetBasicAuth("username", "password"), // Authentication
    elastic.SetSniff(true), // Auto-discover nodes
    elastic.SetHealthcheckInterval(10*time.Second), // Health check interval
    elastic.SetMaxRetries(3), // Max retries
    elastic.SetRetryStatusCodes(502, 503, 504), // Retry status codes
    elastic.SetGzip(true), // Enable compression
    elastic.SetErrorLog(log.New(os.Stderr, "ES ", log.LstdFlags)), // Error log
    elastic.SetInfoLog(log.New(os.Stdout, "ES ", log.LstdFlags)), // Info log
)

3. Index Management

3.1 Create Index

// Create index
createIndex, err := client.CreateIndex("products").
    BodyString(`{
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 1
        },
        "mappings": {
            "properties": {
                "title": {
                    "type": "text",
                    "analyzer": "ik_smart",
                    "fields": {
                        "keyword": {
                            "type": "keyword"
                        }
                    }
                },
                "price": {"type": "double"},
                "status": {"type": "keyword"},
                "created_at": {"type": "date"}
            }
        }
    }`).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Index creation result: %v\n", createIndex.Acknowledged)

3.2 Check if Index Exists

exists, err := client.IndexExists("products").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Index exists: %v\n", exists)

3.3 Delete Index

deleteIndex, err := client.DeleteIndex("products").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Index deletion result: %v\n", deleteIndex.Acknowledged)

4. Document Operations

4.1 Define Document Structure

type Product struct {
    ID        string    `json:"id"`
    Title     string    `json:"title"`
    Price     float64   `json:"price"`
    Status    string    `json:"status"`
    CreatedAt time.Time `json:"created_at"`
    Tags      []string  `json:"tags"`
}

4.2 Add Document

// Add a single document
product := Product{
    ID:        "1",
    Title:     "iPhone 15 Pro",
    Price:     7999.0,
    Status:    "active",
    CreatedAt: time.Now(),
    Tags:      []string{"phone", "apple", "premium"},
}

put1, err := client.Index().
    Index("products").
    Id("1").
    BodyJson(product).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Document indexing result: %s\n", put1.Result)

4.3 Get Document

// Get document by ID
get1, err := client.Get().
    Index("products").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

if get1.Found {
    var product Product
    err = json.Unmarshal(get1.Source, &product)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Retrieved document: %+v\n", product)
}

4.4 Update Document

// Partial update
update, err := client.Update().
    Index("products").
    Id("1").
    Doc(map[string]interface{}{
        "price": 7599.0,
    }).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Update result: %s\n", update.Result)

4.5 Delete Document

delete, err := client.Delete().
    Index("products").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Deletion result: %s\n", delete.Result)

5. Bulk Operations

5.1 Bulk Indexing

bulkRequest := client.Bulk()

products := []Product{
    {ID: "2", Title: "Samsung Galaxy S24", Price: 5999.0, Status: "active", CreatedAt: time.Now()},
    {ID: "3", Title: "MacBook Pro M3", Price: 12999.0, Status: "active", CreatedAt: time.Now()},
    {ID: "4", Title: "iPad Air", Price: 3999.0, Status: "active", CreatedAt: time.Now()},
}

for _, product := range products {
    req := elastic.NewBulkIndexRequest().
        Index("products").
        Id(product.ID).
        Doc(product)
    bulkRequest = bulkRequest.Add(req)
}

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Bulk indexing completed, processed %d requests\n", len(bulkResponse.Items))

5.2 Bulk Update

bulkRequest := client.Bulk()

// Bulk update prices
updates := map[string]float64{
    "2": 5799.0,
    "3": 11999.0,
    "4": 3799.0,
}

for id, price := range updates {
    req := elastic.NewBulkUpdateRequest().
        Index("products").
        Id(id).
        Doc(map[string]interface{}{"price": price})
    bulkRequest = bulkRequest.Add(req)
}

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}

6. Search Queries

6.1 Simple Search

// Match all documents
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Found %d documents\n", searchResult.TotalHits())

6.2 Match Query

// Text match query
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchQuery("title", "iPhone")).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

for _, hit := range searchResult.Hits.Hits {
    var product Product
    err := json.Unmarshal(hit.Source, &product)
    if err != nil {
        continue
    }
    fmt.Printf("Document ID: %s, Title: %s, Score: %f\n",
        hit.Id, product.Title, *hit.Score)
}

6.3 Compound Query

// Boolean query
boolQuery := elastic.NewBoolQuery().
    Must(elastic.NewMatchQuery("title", "iPhone")).
    Filter(elastic.NewTermQuery("status", "active")).
    Filter(elastic.NewRangeQuery("price").Gte(1000).Lte(10000))

searchResult, err := client.Search().
    Index("products").
    Query(boolQuery).
    Sort("price", true). // Sort by price ascending
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

6.4 Multi-Field Search

// Multi-field match
multiMatchQuery := elastic.NewMultiMatchQuery("Apple phone", "title", "tags").
    Type("best_fields").
    FieldWithBoost("title", 3.0)

searchResult, err := client.Search().
    Index("products").
    Query(multiMatchQuery).
    Highlight(elastic.NewHighlight().Field("title")).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

7. Aggregation Analysis

7.1 Basic Aggregation

// Group and count by status
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("status_count", elastic.NewTermsAggregation().Field("status")).
    Size(0). // Don't return documents, only aggregation results
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statusAgg, found := searchResult.Aggregations.Terms("status_count")
if found {
    for _, bucket := range statusAgg.Buckets {
        fmt.Printf("Status: %s, Count: %d\n", bucket.Key, bucket.DocCount)
    }
}

7.2 Numeric Statistics Aggregation

// Price statistics
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("price_stats", elastic.NewStatsAggregation().Field("price")).
    Size(0).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statsAgg, found := searchResult.Aggregations.Stats("price_stats")
if found {
    fmt.Printf("Price Statistics - Min: %.2f, Max: %.2f, Avg: %.2f, Sum: %.2f\n",
        statsAgg.Min, statsAgg.Max, statsAgg.Avg, statsAgg.Sum)
}

7.3 Composite Aggregation

// Group by status, then calculate price statistics within each group
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("status_groups",
        elastic.NewTermsAggregation().Field("status").
            SubAggregation("price_stats", elastic.NewStatsAggregation().Field("price"))).
    Size(0).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statusAgg, found := searchResult.Aggregations.Terms("status_groups")
if found {
    for _, bucket := range statusAgg.Buckets {
        priceStats, found := bucket.Stats("price_stats")
        if found {
            fmt.Printf("Status: %s, Average Price: %.2f, Count: %d\n",
                bucket.Key, priceStats.Avg, bucket.DocCount)
        }
    }
}

8. Pagination and Scrolling

8.1 Basic Pagination

// Paginated query
page := 1
size := 10
from := (page - 1) * size

searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    From(from).
    Size(size).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Page %d, Total %d records\n", page, searchResult.TotalHits())

8.2 Scroll Query (Large Datasets)

// Scroll query, suitable for exporting large datasets
scroll := client.Scroll("products").Size(100).KeepAlive("1m")
for {
    searchResult, err := scroll.Do(context.Background())
    if err == io.EOF {
        break // Data read complete
    }
    if err != nil {
        log.Fatal(err)
    }

    for _, hit := range searchResult.Hits.Hits {
        var product Product
        err := json.Unmarshal(hit.Source, &product)
        if err != nil {
            continue
        }
        fmt.Printf("Processing document: %s\n", product.Title)
    }
}

9. Error Handling

9.1 Common Error Handling

func handleESError(err error) {
    if err == nil {
        return
    }

    // Check if it's an ES error
    if esErr, ok := err.(*elastic.Error); ok {
        switch esErr.Status {
        case 404:
            fmt.Println("Document or index not found")
        case 409:
            fmt.Println("Version conflict")
        case 400:
            fmt.Printf("Request error: %s\n", esErr.Details)
        default:
            fmt.Printf("ES error: %d - %s\n", esErr.Status, esErr.Details)
        }
        return
    }

    // Network or other errors
    fmt.Printf("Other error: %v\n", err)
}

10. Best Practices

10.1 Connection Pool Configuration

client, err := elastic.NewClient(
    elastic.SetURL("http://localhost:9200"),
    elastic.SetMaxRetries(3),
    elastic.SetRetryBackoff(func(i int) time.Duration {
        return time.Duration(i) * 100 * time.Millisecond
    }),
    elastic.SetHealthcheckInterval(30*time.Second),
    elastic.SetSniff(false), // Recommended to disable in production
)

10.2 Context Management

// Use context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Do(ctx)

10.3 Bulk Operation Optimization

// Bulk size control
const batchSize = 1000

func bulkIndexProducts(products []Product) error {
    for i := 0; i < len(products); i += batchSize {
        end := i + batchSize
        if end > len(products) {
            end = len(products)
        }

        bulkRequest := client.Bulk()
        for j := i; j < end; j++ {
            req := elastic.NewBulkIndexRequest().
                Index("products").
                Id(products[j].ID).
                Doc(products[j])
            bulkRequest = bulkRequest.Add(req)
        }

        _, err := bulkRequest.Do(context.Background())
        if err != nil {
            return err
        }
    }
    return nil
}

11. Complete Example Project Structure

project/
├── go.mod
├── go.sum
├── main.go
├── config/
│   └── es.go          # ES Configuration
├── models/
│   └── product.go     # Data Models
├── services/
│   └── es_service.go  # ES Service Layer
└── handlers/
    └── product.go     # Business Logic

11.1 Configuration Management

// config/es.go
package config

import (
    "github.com/olivere/elastic/v7"
)

type ESConfig struct {
    URLs     []string
    Username string
    Password string
    Sniff    bool
}

func NewESClient(config ESConfig) (*elastic.Client, error) {
    options := []elastic.ClientOptionFunc{
        elastic.SetURL(config.URLs...),
        elastic.SetSniff(config.Sniff),
    }

    if config.Username != "" && config.Password != "" {
        options = append(options, elastic.SetBasicAuth(config.Username, config.Password))
    }

    return elastic.NewClient(options...)
}

11.2 Service Layer Encapsulation

// services/es_service.go
package services

import (
    "context"
    "encoding/json"

    "github.com/olivere/elastic/v7"
)

type ProductService struct {
    client *elastic.Client
    index  string
}

func NewProductService(client *elastic.Client) *ProductService {
    return &ProductService{
        client: client,
        index:  "products",
    }
}

func (s *ProductService) SearchProducts(query string, from, size int) ([]Product, int64, error) {
    searchResult, err := s.client.Search().
        Index(s.index).
        Query(elastic.NewMultiMatchQuery(query, "title", "tags")).
        From(from).
        Size(size).
        Do(context.Background())
    if err != nil {
        return nil, 0, err
    }

    var products []Product
    for _, hit := range searchResult.Hits.Hits {
        var product Product
        err := json.Unmarshal(hit.Source, &product)
        if err != nil {
            continue
        }
        products = append(products, product)
    }

    return products, searchResult.TotalHits(), nil
}

12. Summary

olivere/elastic is the most mature ES client in Go, offering:

  • Type safety: Compile-time checking of query syntax
  • Full functionality: Supports all ES features
  • Performance optimization: Connection pooling, bulk operations, retry mechanisms
  • Ease of use: Chained API design, strong code readability

With the examples in this article, you can quickly integrate Elasticsearch into your Go projects to achieve efficient search and analysis capabilities.

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6778

(0)
Walker的头像Walker
上一篇 14 hours ago
下一篇 Mar 8, 2025 12:52

Related Posts

EN
简体中文 繁體中文 English