Badger 入门教程

官方网站:


1. 安装

使用 Badger 前,先安装 Go 1.23 或更高版本。运行如下命令获取该库。

go get github.com/dgraph-io/badger/v4

上面的命令将获取该库。

1.1. 安装 Badger 命令行工具

go install github.com/dgraph-io/badger/v4/badger@latest

上面的命令将 Badger 命令行工具安装到 $GOBIN。


2. 打开数据库

在 Badger 中顶级对象是 DB。它代表磁盘上特定目录中的多个文件,这些文件包含单个数据库的数据。

使用 badger.Open() 函数,以及适当的选项打开数据库。客户端必须指定 DirValueDir 选项。可以将它们设置为相同的值。

package main

import (
	"log"

	badger "github.com/dgraph-io/badger/v4"
)

func main() {
  // Open the Badger database located in the /tmp/badger directory.
  // It will be created if it doesn't exist.
  db, err := badger.Open(badger.DefaultOptions("/tmp/badger"))
  if err != nil {
	  log.Fatal(err)
  }
  defer db.Close()
  // Your code here…
}

注意,Badger 将在目录上获得锁,因此多个进程不能同时打开同一个数据库。

2.1. 内存模式/无磁盘模式

默认情况下,Badger 确保将所有数据持久化到磁盘。Badger 也支持纯内存模式。当以内存模式运行 Badger 时,所有数据存储在内存中。内存模式读写更快,但在崩溃或关闭时,存储在 Badger 中的所有数据都将丢失。设置 InMemory 选项,以内存模式打开 Badger。

opt := badger.DefaultOptions("").WithInMemory(true)

2.2. 加密模式

如果开启加密模式,那么需要设置索引缓存大小。

比如,设置 100 Mb 的缓存:

opts.IndexCache = 100 << 20 // 100 mb or some other size based on the amount of data

3. 事务

3.1. 只读事务

使用 DB.View() 方法开启只读事务:

err := db.View(func(txn *badger.Txn) error {
  // Your code here…
  return nil
})

在该事务内不能执行写或删除。Badger 确保在闭包内可以获取数据库的一致性视图。事务开始后,在其他地方发生的任何写入,对闭包内的调用都不可见。

3.2. 读写事务

使用 DB.Update() 方法开启读写事务:

err := db.Update(func(txn *badger.Txn) error {
  // Your code here…
  return nil
})

在读写事务内部允许所有数据库操作。

必须检查返回的错误值。如果在闭包中返回错误,那么它将被传递出去。

如果发生冲突,那么将返回 ErrConflict。如果收到该错误,那么根据应用程序的状态选择重试该操作。

当事务中挂起的写/删除的数量超过特定的限制时,那么将返回 ErrTxnTooBig 错误。在这种情况下,最好提交事务,然后立即开启新事务。如下所示(为简化起见,在某些地方未检查错误):

updates := make(map[string]string)
txn := db.NewTransaction(true)
for k,v := range updates {
  if err := txn.Set([]byte(k),[]byte(v)); err == badger.ErrTxnTooBig {
    _ = txn.Commit()
    txn = db.NewTransaction(true)
    _ = txn.Set([]byte(k),[]byte(v))
  }
}
_ = txn.Commit()

3.3. 手动管理事务

DB.View()DB.Update() 方法是 DB.NewTransaction()Txn.Commit() 方法(在只读事务中是 Txn.Discard())的包装器。这些辅助方法启动事务,执行函数,如果返回错误,那么安全地销毁事务。这是使用 Badger 事务的推荐方式。

然而,有时希望手动创建和提交事务。可以直接使用 DB.NewTransaction() 函数,它接受用于指定是否需要读写事务的布尔参数。对于读写事务,必须调用 Txn.Commit() 确保提交事务。对于只读事务,调用 Txn.Discard() 即可。Txn.Commit() 内部也调用 Txn.Discard() 清理事务,因此对于读写事务只调用 Txn.Commit() 即可。如果由于某种原因未调用 Txn.Commit() (比如过早地返回错误),那么确保在 defer 块中调用 Txn.Discard()。参考如下代码。

// Start a writable transaction.
txn := db.NewTransaction(true)
defer txn.Discard()

// Use the transaction...
err := txn.Set([]byte("answer"), []byte("42"))
if err != nil {
    return err
}

// Commit the transaction and check for error.
if err := txn.Commit(); err != nil {
    return err
}

DB.NewTransaction() 的第一个参数是指示事务是否可写的布尔值。

Badger 允许对 Txn.Commit() 方法提供可选的回调函数。通常情况下,可以将回调设置为 nil,该方法在所有写入操作成功后返回。然而,如果提供回调函数,那么 Txn.Commit() 方法在检查冲突后立即返回。实际的磁盘写入操作是异步发生的,一旦写入完成或发生错误,将调用回调函数。在某些情况下,这可以提高应用程序的吞吐量。但这也意味着直到使用 nil 错误值调用回调函数前,事务都不是持久的。


4. 使用 key/value 对

使用 Txn.Set() 方法保存 key/value 对:

err := db.Update(func(txn *badger.Txn) error {
  err := txn.Set([]byte("answer"), []byte("42"))
  return err
})

也可以通过先创建 Entry,再使用 Txn.SetEntry() 设置该 Entry 的方式保存 key/value 对。Entry 暴露设置属性的方法。

err := db.Update(func(txn *badger.Txn) error {
  e := badger.NewEntry([]byte("answer"), []byte("42"))
  err := txn.SetEntry(e)
  return err
})

上面的代码将 answer 键的值设置为 “42”。使用 Txn.Get() 方法获取该值。

err := db.View(func(txn *badger.Txn) error {
  item, err := txn.Get([]byte("answer"))
  handle(err)

  var valNot, valCopy []byte
  err := item.Value(func(val []byte) error {
    // This func with val would only be called if item.Value encounters no error.

    // Accessing val here is valid.
    fmt.Printf("The answer is: %s\n", val)

    // Copying or parsing val is valid.
    valCopy = append([]byte{}, val...)

    // Assigning val slice to another variable is NOT OK.
    valNot = val // Do not do this.
    return nil
  })
  handle(err)

  // DO NOT access val here. It is the most common cause of bugs.
  fmt.Printf("NEVER do this. %s\n", valNot)

  // You must copy it to use it outside item.Value(...).
  fmt.Printf("The answer is: %s\n", valCopy)

  // Alternatively, you could also use item.ValueCopy().
  valCopy, err = item.ValueCopy(nil)
  handle(err)
  fmt.Printf("The answer is: %s\n", valCopy)

  return nil
})

如果未找到值,那么 Txn.Get() 返回 ErrKeyNotFound

注意,Get() 返回的值仅在事务开启期间可用。如果需要在事务外部使用该值,那么必须使用 copy(),将其拷贝到另一个字节切片。

使用 Txn.Delete() 方法删除键。


5. 单调递增整数

使用 DB.GetSequence 方法获取具有强持久性的唯一的单调递增整数。该方法返回 Sequence 对象,它是线程安全的,并且可以被多个 goroutine 并发地使用。

Badger 使用提供给 DB.GetSequence 的带宽,租用一段整数,以便从内存中分配。磁盘写操作的频率由该租约带宽和 Next 调用的频率决定。设置过低的带宽将执行更多的磁盘写操作,设置过高的带宽将导致在 Badger 关闭或崩溃时浪费整数。为避免浪费整数,在关闭 Badger 之前调用 Release

seq, err := db.GetSequence(key, 1000)
defer seq.Release()
for {
  num, err := seq.Next()
}

6. 合并操作

Badger 提供对有序合并操作的支持。可以定义 MergeFunc 类型的函数,它接受现有的值,以及要与它合并的值。该函数返回新值,该值是合并操作的结果。所有值都以字节数组的形式指定。比如,下面的合并函数(add)将 []byte 值追加到现有的 []byte 值。

// Merge function to append one byte slice to another
func add(originalValue, newValue []byte) []byte {
  return append(originalValue, newValue...)
}

然后可以将该函数连同键和持续时间值一起传递给 DB.GetMergeOperator() 方法。持续时间指定合并函数在使用 MergeOperator.Add() 方法添加的值上运行的频率。

可以使用 MergeOperator.Get() 方法获取与合并操作相关联的键的增量值。

key := []byte("merge")

m := db.GetMergeOperator(key, add, 200*time.Millisecond)
defer m.Stop()

m.Add([]byte("A"))
m.Add([]byte("B"))
m.Add([]byte("C"))

res, _ := m.Get() // res should have value ABC encoded

示例:递增计数器的合并操作符

func uint64ToBytes(i uint64) []byte {
  var buf [8]byte
  binary.BigEndian.PutUint64(buf[:], i)
  return buf[:]
}

func bytesToUint64(b []byte) uint64 {
  return binary.BigEndian.Uint64(b)
}

// Merge function to add two uint64 numbers
func add(existing, new []byte) []byte {
  return uint64ToBytes(bytesToUint64(existing) + bytesToUint64(new))
}

可用作

key := []byte("merge")

m := db.GetMergeOperator(key, add, 200*time.Millisecond)
defer m.Stop()

m.Add(uint64ToBytes(1))
m.Add(uint64ToBytes(2))
m.Add(uint64ToBytes(3))

res, _ := m.Get() // res should have value 6 encoded

7. 在键上设置存活时间(TTL)和用户元数据

Badger 允许在键上设置可选的存活时间(TTL)值。一旦 TTL 过期,键将不再可检索,并且符合垃圾回收的条件。可以使用 Entry.WithTTL()Txn.SetEntry() API 方法将 TTL 设置为 time.Duration 值。

err := db.Update(func(txn *badger.Txn) error {
  e := badger.NewEntry([]byte("answer"), []byte("42")).WithTTL(time.Hour)
  err := txn.SetEntry(e)
  return err
})

可以在每个键上设置可选的用户元数据值。用户元数据值由单个字节表示。它可以用于与键一起设置特定的位,以帮助解释或解码 key-value 对。可以使用 Entry.WithMeta()Txn.SetEntry() API 方法设置用户元数据。

err := db.Update(func(txn *badger.Txn) error {
  e := badger.NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1))
  err := txn.SetEntry(e)
  return err
})

可以使用 Entry API 为同一个键添加用户元数据和 TTL。然后使用 Txn.SetEntry() 设置该 Entry

err := db.Update(func(txn *badger.Txn) error {
  e := badger.NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)).WithTTL(time.Hour)
  err := txn.SetEntry(e)
  return err
})

8. 迭代键

可以通过 Txn.NewIterator() 方法获取 Iterator,然后使用 Iterator 迭代键,迭代按以字节为单位的字典序进行。

err := db.View(func(txn *badger.Txn) error {
  opts := badger.DefaultIteratorOptions
  opts.PrefetchSize = 10
  it := txn.NewIterator(opts)
  defer it.Close()
  for it.Rewind(); it.Valid(); it.Next() {
    item := it.Item()
    k := item.Key()
    err := item.Value(func(v []byte) error {
      fmt.Printf("key=%s, value=%s\n", k, v)
      return nil
    })
    if err != nil {
      return err
    }
  }
  return nil
})

迭代器支持移动到键列表中的特定点,并且每次向前或向后移动一个键。

默认情况下,Badger 预取后面 100 项的值。可以用 IteratorOptions.PrefetchSize 进行调整。但是,将其设置为高于 GOMAXPROCS 的值(建议设置为 128 或更高)不会带来任何额外的好处。也可以完全关闭值的获取。参阅下面关于仅键迭代的部分。

8.1. 前缀扫描

可以结合 Seek() 和 ValidForPrefix() 迭代键前缀。

db.View(func(txn *badger.Txn) error {
  it := txn.NewIterator(badger.DefaultIteratorOptions)
  defer it.Close()
  prefix := []byte("1234")
  for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
    item := it.Item()
    k := item.Key()
    err := item.Value(func(v []byte) error {
      fmt.Printf("key=%s, value=%s\n", k, v)
      return nil
    })
    if err != nil {
      return err
    }
  }
  return nil
})

8.2. 仅键迭代

Badger 支持被称为仅键迭代的独特迭代模式。它比常规迭代快几个数量级,因为它只涉及对 LSM 树的访问,而 LSM 树通常完全驻留在 RAM 中。将 IteratorOptions.PrefetchValues 字段设置为 false,启用仅键迭代。这也可以用于在迭代期间对选定的键进行稀疏读取,在需要时调用item.Value()

err := db.View(func(txn *badger.Txn) error {
  opts := badger.DefaultIteratorOptions
  opts.PrefetchValues = false
  it := txn.NewIterator(opts)
  defer it.Close()
  for it.Rewind(); it.Valid(); it.Next() {
    item := it.Item()
    k := item.Key()
    fmt.Printf("key=%s\n", k)
  }
  return nil
})

9. 流

Badger 提供 Stream 框架,它并发地遍历 DB 的全部或部分,将数据转换为自定义键值,并且将其串行地流出,以便通过网络发送,写入磁盘,甚至写回 Badger。该方式比使用单个迭代器遍历 Badger 快得多。Stream 支持管理模式和正常模式。

Stream 使用由 LSM 树中的 SSTable 创建的自然边界快速生成键范围。然后,每个协程选择一个范围,并且运行迭代器对其进行迭代。每个迭代器遍历值的所有版本,并且是从同一个事务创建的,因此工作在 DB 的快照上。每次遇到新键时,它调用 ChooseKey(item),然后调用 KeyToList(key, itr)。这允许用户选择或拒绝该键,如果选择,则将值版本转换为自定义键值。协程在将键值发送到管道前,将批量处理 4MB 的键值。另一个协程使用智能批处理算法进一步对来自该管道的数据进行批处理,并且串行地调用 Send

该框架为高吞吐量的键值迭代而设计,将迭代工作分散到许多协程中。DB.Backup 使用该框架提供快速的全量和增量备份。Dgraph 是该框架的重度用户。事实上,该框架在被移植到 Badger 前,是在 Dgraph 中开发和使用的。

stream := db.NewStream()
// db.NewStreamAt(readTs) for managed mode.

// -- Optional settings
stream.NumGo = 16                     // Set number of goroutines to use for iteration.
stream.Prefix = []byte("some-prefix") // Leave nil for iteration over the whole DB.
stream.LogPrefix = "Badger.Streaming" // For identifying stream logs. Outputs to Logger.

// ChooseKey is called concurrently for every key. If left nil, assumes true by default.
stream.ChooseKey = func(item *badger.Item) bool {
  return bytes.HasSuffix(item.Key(), []byte("er"))
}

// KeyToList is called concurrently for chosen keys. This can be used to convert
// Badger data into custom key-values. If nil, uses stream.ToList, a default
// implementation, which picks all valid key-values.
stream.KeyToList = nil

// -- End of optional settings.

// Send is called serially, while Stream.Orchestrate is running.
stream.Send = func(list *pb.KVList) error {
  return proto.MarshalText(w, list) // Write to w.
}

// Run the stream
if err := stream.Orchestrate(context.Background()); err != nil {
  return err
}
// Done.

10. 垃圾回收

Badger 值需要被垃圾回收,原因有两个:

Badger 依赖于客户端在其选择的时间执行垃圾回收。它提供以下方法,可以在适当的时机调用:

ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
again:
	err := db.RunValueLogGC(0.7)
	if err == nil {
        goto again
	}
}

11. 数据库备份

有两个公共 API 方法 DB.Backup()DB.Load() 可用于执行在线备份和恢复。Badger v0.9 提供 CLI 工具 badger,它可以执行脱机备份/恢复。确保 PATH 中有 $GOPATH/bin,才能使用该工具。

下面的命令将创建与版本无关的数据库备份,备份到当前工作目录中的文件 badger.bak 中。

badger backup --dir <path/to/badgerdb>

将当前工作目录中的 badger.bak 恢复到新数据库:

badger restore --dir <path/to/badgerdb>

查看 badger --help 获取更多细节。

对于使用 v0.8(或更低版本)创建的 Badger 数据库,可以使用 v0.8.1 提供的 badger_backup 工具,然后使用上面的命令恢复,以升级数据库,使用最新版本。

badger_backup --dir <path/to/badgerdb> --backup-file badger.bak

建议使用 BackupRestore API 和工具。但是,Badger 也是 rsync 友好的,因为除只能追加的最新值日志外,所有文件都是不可变的。因此,可以将 rsync 用作执行备份的基本方法。在下面的脚本中,重复 rsync 确保在执行完整的备份期间 LSM 树与清单文件保持一致。

#!/bin/bash
set -o history
set -o histexpand
# Makes a complete copy of a Badger database directory.
# Repeat rsync if the MANIFEST and SSTables are updated.
rsync -avz --delete db/ dst
while !! | grep -q "(MANIFEST\|\.sst)$"; do :; done

12. 内存使用

可以通过调整使用 DB.Open 打开数据库时传入的 Options 结构体中的若干选项管理 Badger 的内存占用。

如果想降低 Badger 实例的内存使用,那么调整这些选项(最好每次一个)直到达到期望的内存用量。