Skip to content

Commit d49b462

Browse files
John DoakJohn Doak
authored andcommitted
Update diskmap/diskslice to use directio on linux. Fix filewatcher example
1 parent 316e0a2 commit d49b462

File tree

12 files changed

+287
-67
lines changed

12 files changed

+287
-67
lines changed

diskmap/diskmap.go

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/*
22
Package diskmap provides disk storage of key/value pairs. The data is immutable once written.
33
In addition the diskmap utilizes mmap on reads to make the random access faster.
4+
On Linux, diskmap uses directio to speed up writes.
45
56
Usage is simplistic:
67
@@ -81,14 +82,17 @@ Reading the file is simply:
8182
package diskmap
8283

8384
import (
85+
"bufio"
86+
"context"
8487
"encoding/binary"
8588
"fmt"
89+
"io"
8690
"os"
8791
"reflect"
8892
"sync"
8993
"unsafe"
9094

91-
"golang.org/x/net/context"
95+
"github.com/brk0v/directio"
9296
)
9397

9498
// reservedHeader is the size, in bytes, of the reserved header portion of the file.
@@ -157,42 +161,29 @@ type value struct {
157161

158162
// writer implements Writer.
159163
type writer struct {
164+
name string
160165
file *os.File
166+
dio *directio.DirectIO
167+
buf *bufio.Writer
161168
index index
162169
offset int64
163170
num int64
164171
sync.Mutex
165172
}
166173

167-
// New returns a new Writer that writes to file "p".
168-
func New(p string) (Writer, error) {
169-
f, err := os.Create(p)
170-
if err != nil {
171-
return nil, err
172-
}
173-
174-
if err := f.Chmod(0600); err != nil {
175-
return nil, err
176-
}
177-
178-
if _, err = f.Seek(reservedHeader, 0); err != nil {
179-
return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err)
180-
}
181-
182-
return &writer{
183-
file: f,
184-
offset: reservedHeader,
185-
index: make(index, 0, 1000),
186-
Mutex: sync.Mutex{},
187-
}, nil
188-
}
189-
190174
// Write implements Writer.Write().
191175
func (w *writer) Write(k, v []byte) error {
192176
w.Lock()
193177
defer w.Unlock()
194178

195-
if _, err := w.file.Write(v); err != nil {
179+
var writer io.Writer
180+
if w.buf != nil {
181+
writer = w.buf
182+
} else {
183+
writer = w.file
184+
}
185+
186+
if _, err := writer.Write(v); err != nil {
196187
return fmt.Errorf("problem writing key/value to disk: %q", err)
197188
}
198189

@@ -213,24 +204,42 @@ func (w *writer) Write(k, v []byte) error {
213204

214205
// Close implements Writer.Close().
215206
func (w *writer) Close() error {
207+
var writer io.Writer
208+
if w.buf != nil {
209+
writer = w.buf
210+
} else {
211+
writer = w.file
212+
}
213+
216214
// Write each data offset, then the length of the key, then finally the key to disk (our index) for each entry.
217215
for _, entry := range w.index {
218-
if err := binary.Write(w.file, endian, entry.offset); err != nil {
216+
if err := binary.Write(writer, endian, entry.offset); err != nil {
219217
return fmt.Errorf("could not write offset value %d: %q", entry.offset, err)
220218
}
221219

222-
if err := binary.Write(w.file, endian, entry.length); err != nil {
220+
if err := binary.Write(writer, endian, entry.length); err != nil {
223221
return fmt.Errorf("could not write data length: %q", err)
224222
}
225223

226-
if err := binary.Write(w.file, endian, int64(len(entry.key))); err != nil {
224+
if err := binary.Write(writer, endian, int64(len(entry.key))); err != nil {
227225
return fmt.Errorf("could not write key length: %q", err)
228226
}
229227

230-
if _, err := w.file.Write(entry.key); err != nil {
228+
if _, err := writer.Write(entry.key); err != nil {
231229
return fmt.Errorf("could not write key to disk: %q", err)
232230
}
233231
}
232+
if w.buf != nil {
233+
w.buf.Flush()
234+
w.dio.Flush()
235+
236+
w.file.Close()
237+
f, err := os.OpenFile(w.name, os.O_RDWR, 0666)
238+
if err != nil {
239+
return err
240+
}
241+
w.file = f
242+
}
234243

235244
// Now that we've written all our data to the end of the file, we can go back to our reserved header
236245
// and write our offset to the index at the beginnign of the file.

diskmap/diskmap_linux.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
//go:build linux
2+
3+
package diskmap
4+
5+
import (
6+
"bufio"
7+
"os"
8+
"sync"
9+
"syscall"
10+
11+
"github.com/brk0v/directio"
12+
)
13+
14+
// New returns a new Writer that writes to file "p".
15+
func New(p string) (Writer, error) {
16+
f, err := os.OpenFile(p, os.O_CREATE+os.O_WRONLY+syscall.O_DIRECT, 0666)
17+
if err != nil {
18+
return nil, err
19+
}
20+
21+
dio, err := directio.New(f)
22+
if err != nil {
23+
return nil, err
24+
}
25+
26+
w := bufio.NewWriterSize(dio, 67108864)
27+
header := [64]byte{}
28+
_, err = w.Write(header[:])
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
return &writer{
34+
name: p,
35+
file: f,
36+
dio: dio,
37+
buf: w,
38+
offset: reservedHeader,
39+
index: make(index, 0, 1000),
40+
Mutex: sync.Mutex{},
41+
}, nil
42+
}

diskmap/diskmap_other.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
//go:build !linux
2+
3+
package diskmap
4+
5+
import (
6+
"fmt"
7+
"os"
8+
"sync"
9+
)
10+
11+
// New returns a new Writer that writes to file "p".
12+
func New(p string) (Writer, error) {
13+
f, err := os.Create(p)
14+
if err != nil {
15+
return nil, err
16+
}
17+
18+
if err := f.Chmod(0600); err != nil {
19+
return nil, err
20+
}
21+
22+
if _, err = f.Seek(reservedHeader, 0); err != nil {
23+
return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err)
24+
}
25+
26+
return &writer{
27+
file: f,
28+
offset: reservedHeader,
29+
index: make(index, 0, 1000),
30+
Mutex: sync.Mutex{},
31+
}, nil
32+
}

diskmap/diskmap_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,31 @@ func TestDiskMap(t *testing.T) {
6565
}
6666
}
6767

68+
func BenchmarkDiskMap(b *testing.B) {
69+
b.ReportAllocs()
70+
71+
p := path.Join(os.TempDir(), nextSuffix())
72+
w, err := New(p)
73+
if err != nil {
74+
panic(err)
75+
}
76+
defer os.Remove(p)
77+
78+
b.ResetTimer()
79+
for i := 0; i < 10000; i++ {
80+
k := []byte(nextSuffix())
81+
v := randStringBytes()
82+
83+
if err := w.Write(k, v); err != nil {
84+
b.Fatalf("error writing:\nkey:%q\nvalue:%q\n", k, v)
85+
}
86+
}
87+
88+
if err := w.Close(); err != nil {
89+
b.Fatalf("error closing the Writer: %q", err)
90+
}
91+
}
92+
6893
func nextSuffix() string {
6994
r := uint32(time.Now().UnixNano() + int64(os.Getpid()))
7095

diskslice/diskslice.go

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,21 @@ that can be managed, moved and trivially read. More complex use cases require mo
55
involving multiple files, lock files, etc.... This makes no attempt to provide that.
66
77
Read call without a cached index consist of:
8-
* A single seek to an index entry
9-
* Two 8 byte reads for data offset and len
10-
* A seek to the data
11-
* A single read of the value
8+
- A single seek to an index entry
9+
- Two 8 byte reads for data offset and len
10+
- A seek to the data
11+
- A single read of the value
12+
1213
Total: 2 seeks and 3 reads
1314
1415
Read call with cached index consits of:
15-
* A single read to the data
16+
- A single read to the data
1617
1718
If doing a range over a large file or lots of range calls, it is optimal to have the Reader cache
1819
the index. Every 131,072 entries consumes 1 MiB of cached memory.
1920
2021
File format is as follows:
22+
2123
<file>
2224
<reserve header space>
2325
[index offset]
@@ -38,13 +40,16 @@ File format is as follows:
3840
package diskslice
3941

4042
import (
43+
"bufio"
4144
"bytes"
4245
"context"
4346
"encoding/binary"
4447
"fmt"
4548
"io"
4649
"os"
4750
"sync"
51+
52+
"github.com/brk0v/directio"
4853
)
4954

5055
// reservedHeader is the size, in bytes, of the reserved header portion of the file.
@@ -77,7 +82,10 @@ type value struct {
7782
// Writer provides methods for writing an array of values to disk that can be read without
7883
// reading the file back into memory.
7984
type Writer struct {
85+
name string
8086
file *os.File
87+
dio *directio.DirectIO
88+
buf *bufio.Writer
8189
index index
8290
offset int64
8391
num int64
@@ -97,33 +105,6 @@ func WriteIntercept(interceptor func(dst io.Writer) io.WriteCloser) WriteOption
97105
}
98106
}
99107

100-
// New is the constructor for Writer.
101-
func New(fpath string, options ...WriteOption) (*Writer, error) {
102-
f, err := os.Create(fpath)
103-
if err != nil {
104-
return nil, err
105-
}
106-
107-
if err := f.Chmod(0600); err != nil {
108-
return nil, err
109-
}
110-
111-
if _, err = f.Seek(reservedHeader, 0); err != nil {
112-
return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err)
113-
}
114-
115-
w := &Writer{
116-
file: f,
117-
offset: reservedHeader,
118-
index: make(index, 0, 1000),
119-
mu: sync.Mutex{},
120-
}
121-
for _, option := range options {
122-
option(w)
123-
}
124-
return w, nil
125-
}
126-
127108
// Write writes a byte slice to the diskslice.
128109
func (w *Writer) Write(b []byte) error {
129110
if w.interceptor != nil {
@@ -141,7 +122,11 @@ func (w *Writer) Write(b []byte) error {
141122
w.mu.Lock()
142123
defer w.mu.Unlock()
143124

144-
if _, err := w.file.Write(b); err != nil {
125+
var writer io.Writer = w.file
126+
if w.buf != nil {
127+
writer = w.buf
128+
}
129+
if _, err := writer.Write(b); err != nil {
145130
return fmt.Errorf("problem writing value to disk: %q", err)
146131
}
147132

@@ -161,17 +146,34 @@ func (w *Writer) Write(b []byte) error {
161146

162147
// Close closes the file for writing and writes our index to the file.
163148
func (w *Writer) Close() error {
149+
var writer io.Writer = w.file
150+
if w.buf != nil {
151+
writer = w.buf
152+
}
153+
164154
// Write each data offset, then the length of the key, then finally the key to disk (our index) for each entry.
165155
for _, entry := range w.index {
166-
if err := binary.Write(w.file, endian, entry.offset); err != nil {
156+
if err := binary.Write(writer, endian, entry.offset); err != nil {
167157
return fmt.Errorf("could not write offset value %d: %q", entry.offset, err)
168158
}
169159

170-
if err := binary.Write(w.file, endian, entry.length); err != nil {
160+
if err := binary.Write(writer, endian, entry.length); err != nil {
171161
return fmt.Errorf("could not write data length: %q", err)
172162
}
173163
}
174164

165+
if w.buf != nil {
166+
w.buf.Flush()
167+
w.dio.Flush()
168+
169+
w.file.Close()
170+
f, err := os.OpenFile(w.name, os.O_RDWR, 0666)
171+
if err != nil {
172+
return err
173+
}
174+
w.file = f
175+
}
176+
175177
// Now that we've written all our data to the end of the file, we can go back to our reserved header
176178
// and write our offset to the index at the beginnign of the file.
177179
if _, err := w.file.Seek(0, 0); err != nil {

0 commit comments

Comments
 (0)