我们在前面的文章中创建了一个数据库,接下来就要对数据库进行操作了。

指令

为了方便后续的操作,我们沿着以下程序进行阅读

1
2
3
4
5
6
7
db.Update(func(tx *bolt.Tx) error {
	b, err := tx.CreateBucket([]byte("MyBucket"))
	if err != nil {
		return fmt.Errorf("create bucket: %s", err)
	}
	return nil
})

可以看到Update的参数是一个匿名函数,我们从Update开始。

Update

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (db *DB) Update(fn func(*Tx) error) error {
	t, err := db.Begin(true)
	if err != nil {
		return err
	}

	// Make sure the transaction rolls back in the event of a panic.
	defer func() {
		if t.db != nil {
			t.rollback()
		}
	}()

	// Mark as a managed tx so that the inner function cannot manually commit.
	t.managed = true

	// If an error is returned from the function then rollback and return error.
	err = fn(t)
	t.managed = false
	if err != nil {
		_ = t.Rollback()
		return err
	}

	return t.Commit()
}

第一步开始一个事务,我们看下事务是如何开始的。

Begin
1
2
3
4
5
6
func (db *DB) Begin(writable bool) (*Tx, error) {
	if writable {
		return db.beginRWTx()
	}
	return db.beginTx()
}

根据我们之前执行的命令,我们是需要开一个可读写的事务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (db *DB) beginRWTx() (*Tx, error) {
	// If the database was opened with Options.ReadOnly, return an error.
	if db.readOnly {
		return nil, ErrDatabaseReadOnly
	}

	// Obtain writer lock. This is released by the transaction when it closes.
	// This enforces only one writer transaction at a time.
	db.rwlock.Lock()

	// Once we have the writer lock then we can lock the meta pages so that
	// we can set up the transaction.
	db.metalock.Lock()
	defer db.metalock.Unlock()

	// Exit if the database is not open yet.
	if !db.opened {
		db.rwlock.Unlock()
		return nil, ErrDatabaseNotOpen
	}

	// Create a transaction associated with the database.
	t := &Tx{writable: true}
	t.init(db)
	db.rwtx = t

	// Free any pages associated with closed read-only transactions.
	var minid txid = 0xFFFFFFFFFFFFFFFF
	for _, t := range db.txs {
		if t.meta.txid < minid {
			minid = t.meta.txid
		}
	}
	if minid > 0 {
		db.freelist.release(minid - 1)
	}

	return t, nil
}

根据bolt的介绍,我们可以知道,bolt允许多个读操作,但在同一时刻,只能有一个写操作。 所以在程序开始之前,需要加上读写锁。 除此之外,我们还记得,在metapages中记录着交易序号txid,所以也需要对meta页面加锁,防止更新出错。 综合来说,这些都是为了保证在一个时间只有一个写交易。 (注:交易,事务,都是transaction的翻译,这里面我就混用了) 接下来就是创建交易,初始化交易,并且释放读交易所占用的页面。 我们需要看一下交易的定义了。

Tx

交易的定义比较简单

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type Tx struct {
	writable       bool
	managed        bool
	db             *DB
	meta           *meta
	root           Bucket
	pages          map[pgid]*page
	stats          TxStats
	commitHandlers []func()

	WriteFlag int
}

writable是用来表明当前的tx是否可读,在我们这,tx是可读的。 managed表示当前这个tx是否通过db.Update()或者db.View()进行数据库操作。 db就是当前的db对象。 meta就是我们之前db中存储的meta信息。 root就是指的根bucket,所有的tx都要从根开始查起。 pages当前tx操作的page stats统计tx的操作内容 commitHandler回调函数,在commit的时候执行的函数 WriteFlag复制或者剪切数据库的时候文件的打开方式 明白了交易的内容,我们需要看下交易的初始化操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (tx *Tx) init(db *DB) {
	tx.db = db
	tx.pages = nil

	// Copy the meta page since it can be changed by the writer.
	tx.meta = &meta{}
	db.meta().copy(tx.meta)

	// Copy over the root bucket.
	tx.root = newBucket(tx)
	tx.root.bucket = &bucket{}
	*tx.root.bucket = tx.meta.root

	// Increment the transaction id and add a page cache for writable transactions.
	if tx.writable {
		tx.pages = make(map[pgid]*page)
		tx.meta.txid += txid(1)
	}
}

我们可以看到,tx首先初始化了db,并创建了空的pages。接下来把db中存储的meta复制给交易。 在最开始的时候,我们创建了两个meta,所以,复制的是哪一个呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (db *DB) meta() *meta {
	// We have to return the meta with the highest txid which doesn't fail
	// validation. Otherwise, we can cause errors when in fact the database is
	// in a consistent state. metaA is the one with the higher txid.
	metaA := db.meta0
	metaB := db.meta1
	if db.meta1.txid > db.meta0.txid {
		metaA = db.meta1
		metaB = db.meta0
	}

	// Use higher meta page if valid. Otherwise fallback to previous, if valid.
	if err := metaA.validate(); err == nil {
		return metaA
	} else if err := metaB.validate(); err == nil {
		return metaB
	}

	// This should never be reached, because both meta1 and meta0 were validated
	// on mmap() and we do fsync() on every write.
	panic("bolt.DB.meta(): invalid meta pages")
}

可以看到,谁存储的txid大,就用哪个meta。我们还注意到,传递的是指针形式的meta,也就是meta信息会随着更新,这个tx中的meta也会更新。 接下来,为tx的root创建了一个包含本交易的bucket,并且是根bucket。此外,我们在meta中也记录了根bucket,这个地方我们使用meta中的bucket初始化了这个地方创建的bucket。 最后由于bolt是使用txid来标识数据库的状态信息,如果是可读写交易,就将txid加一,标识有了新的交易,随后会在commit中更新meta的txid。

执行

我们目前执行完了bolt的Update中的begin操作,后续还有一些容易理解的操作。 首先程序设置defer,如果执行出错,就进行rollback,否则执行我们手动传入的函数,并执行commit。 在这个地方有个有意思的内容

1
t.managed=true

正好对应了我们介绍tx的时候,对managed关键字的介绍。在使用update的时候,手动无法进行提交,因为已经将程序托管给update了。