serialize_reads: add read serialization logic
Due to kernel readahead, we usually get multiple read requests at the same time. These get submitted to the backing storage in random order, which is a problem if seeking is very expensive. Details: https://github.com/rfjakob/gocryptfs/issues/92
This commit is contained in:
parent
14038a1644
commit
00df0771e3
|
@ -18,7 +18,7 @@ type argContainer struct {
|
||||||
debug, init, zerokey, fusedebug, openssl, passwd, fg, version,
|
debug, init, zerokey, fusedebug, openssl, passwd, fg, version,
|
||||||
plaintextnames, quiet, nosyslog, wpanic,
|
plaintextnames, quiet, nosyslog, wpanic,
|
||||||
longnames, allow_other, ro, reverse, aessiv, nonempty, raw64,
|
longnames, allow_other, ro, reverse, aessiv, nonempty, raw64,
|
||||||
noprealloc, speed, hkdf bool
|
noprealloc, speed, hkdf, serialize_reads bool
|
||||||
masterkey, mountpoint, cipherdir, cpuprofile, extpass,
|
masterkey, mountpoint, cipherdir, cpuprofile, extpass,
|
||||||
memprofile, ko, passfile, ctlsock, fsname string
|
memprofile, ko, passfile, ctlsock, fsname string
|
||||||
// Configuration file name override
|
// Configuration file name override
|
||||||
|
@ -112,6 +112,7 @@ func parseCliOpts() (args argContainer) {
|
||||||
flagSet.BoolVar(&args.noprealloc, "noprealloc", false, "Disable preallocation before writing")
|
flagSet.BoolVar(&args.noprealloc, "noprealloc", false, "Disable preallocation before writing")
|
||||||
flagSet.BoolVar(&args.speed, "speed", false, "Run crypto speed test")
|
flagSet.BoolVar(&args.speed, "speed", false, "Run crypto speed test")
|
||||||
flagSet.BoolVar(&args.hkdf, "hkdf", true, "Use HKDF as an additional key derivation step")
|
flagSet.BoolVar(&args.hkdf, "hkdf", true, "Use HKDF as an additional key derivation step")
|
||||||
|
flagSet.BoolVar(&args.serialize_reads, "serialize_reads", false, "Try to serialize read operations")
|
||||||
flagSet.StringVar(&args.masterkey, "masterkey", "", "Mount with explicit master key")
|
flagSet.StringVar(&args.masterkey, "masterkey", "", "Mount with explicit master key")
|
||||||
flagSet.StringVar(&args.cpuprofile, "cpuprofile", "", "Write cpu profile to specified file")
|
flagSet.StringVar(&args.cpuprofile, "cpuprofile", "", "Write cpu profile to specified file")
|
||||||
flagSet.StringVar(&args.memprofile, "memprofile", "", "Write memory profile to specified file")
|
flagSet.StringVar(&args.memprofile, "memprofile", "", "Write memory profile to specified file")
|
||||||
|
|
|
@ -27,4 +27,6 @@ type Args struct {
|
||||||
// Use HKDF key derivation.
|
// Use HKDF key derivation.
|
||||||
// Corresponds to the HKDF feature flag introduced in gocryptfs v1.3.
|
// Corresponds to the HKDF feature flag introduced in gocryptfs v1.3.
|
||||||
HKDF bool
|
HKDF bool
|
||||||
|
// Try to serialize read operations, "-serialize_reads"
|
||||||
|
SerializeReads bool
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/hanwen/go-fuse/fuse/nodefs"
|
"github.com/hanwen/go-fuse/fuse/nodefs"
|
||||||
|
|
||||||
"github.com/rfjakob/gocryptfs/internal/contentenc"
|
"github.com/rfjakob/gocryptfs/internal/contentenc"
|
||||||
|
"github.com/rfjakob/gocryptfs/internal/serialize_reads"
|
||||||
"github.com/rfjakob/gocryptfs/internal/syscallcompat"
|
"github.com/rfjakob/gocryptfs/internal/syscallcompat"
|
||||||
"github.com/rfjakob/gocryptfs/internal/tlog"
|
"github.com/rfjakob/gocryptfs/internal/tlog"
|
||||||
)
|
)
|
||||||
|
@ -176,6 +177,7 @@ func (f *file) doRead(off uint64, length uint64) ([]byte, fuse.Status) {
|
||||||
alignedOffset, alignedLength := blocks[0].JointCiphertextRange(blocks)
|
alignedOffset, alignedLength := blocks[0].JointCiphertextRange(blocks)
|
||||||
skip := blocks[0].Skip
|
skip := blocks[0].Skip
|
||||||
tlog.Debug.Printf("JointCiphertextRange(%d, %d) -> %d, %d, %d", off, length, alignedOffset, alignedLength, skip)
|
tlog.Debug.Printf("JointCiphertextRange(%d, %d) -> %d, %d, %d", off, length, alignedOffset, alignedLength, skip)
|
||||||
|
|
||||||
ciphertext := make([]byte, int(alignedLength))
|
ciphertext := make([]byte, int(alignedLength))
|
||||||
n, err := f.fd.ReadAt(ciphertext, int64(alignedOffset))
|
n, err := f.fd.ReadAt(ciphertext, int64(alignedOffset))
|
||||||
// We don't care if the file ID changes after we have read the data. Drop the lock.
|
// We don't care if the file ID changes after we have read the data. Drop the lock.
|
||||||
|
@ -224,8 +226,17 @@ func (f *file) Read(buf []byte, off int64) (resultData fuse.ReadResult, code fus
|
||||||
return nil, fuse.EBADF
|
return nil, fuse.EBADF
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if f.fs.args.SerializeReads {
|
||||||
|
serialize_reads.Wait(off, len(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("%02d\n", off/131072)
|
||||||
out, status := f.doRead(uint64(off), uint64(len(buf)))
|
out, status := f.doRead(uint64(off), uint64(len(buf)))
|
||||||
|
|
||||||
|
if f.fs.args.SerializeReads {
|
||||||
|
serialize_reads.Done()
|
||||||
|
}
|
||||||
|
|
||||||
if status == fuse.EIO {
|
if status == fuse.EIO {
|
||||||
tlog.Warn.Printf("ino%d: Read: returning EIO, offset=%d, length=%d", f.devIno.ino, len(buf), off)
|
tlog.Warn.Printf("ino%d: Read: returning EIO, offset=%d, length=%d", f.devIno.ino, len(buf), off)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/rfjakob/gocryptfs/internal/contentenc"
|
"github.com/rfjakob/gocryptfs/internal/contentenc"
|
||||||
"github.com/rfjakob/gocryptfs/internal/cryptocore"
|
"github.com/rfjakob/gocryptfs/internal/cryptocore"
|
||||||
"github.com/rfjakob/gocryptfs/internal/nametransform"
|
"github.com/rfjakob/gocryptfs/internal/nametransform"
|
||||||
|
"github.com/rfjakob/gocryptfs/internal/serialize_reads"
|
||||||
"github.com/rfjakob/gocryptfs/internal/syscallcompat"
|
"github.com/rfjakob/gocryptfs/internal/syscallcompat"
|
||||||
"github.com/rfjakob/gocryptfs/internal/tlog"
|
"github.com/rfjakob/gocryptfs/internal/tlog"
|
||||||
)
|
)
|
||||||
|
@ -43,6 +44,10 @@ func NewFS(args Args) *FS {
|
||||||
contentEnc := contentenc.New(cryptoCore, contentenc.DefaultBS)
|
contentEnc := contentenc.New(cryptoCore, contentenc.DefaultBS)
|
||||||
nameTransform := nametransform.New(cryptoCore.EMECipher, args.LongNames, args.Raw64)
|
nameTransform := nametransform.New(cryptoCore.EMECipher, args.LongNames, args.Raw64)
|
||||||
|
|
||||||
|
if args.SerializeReads {
|
||||||
|
serialize_reads.Init()
|
||||||
|
}
|
||||||
|
|
||||||
return &FS{
|
return &FS{
|
||||||
FileSystem: pathfs.NewLoopbackFileSystem(args.Cipherdir),
|
FileSystem: pathfs.NewLoopbackFileSystem(args.Cipherdir),
|
||||||
args: args,
|
args: args,
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
package serialize_reads
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rfjakob/gocryptfs/internal/tlog"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SerializerStruct struct {
|
||||||
|
// we get submissions through the "input" channel
|
||||||
|
input chan *submission
|
||||||
|
// q = Queue
|
||||||
|
q []*submission
|
||||||
|
// wg is used to wait for the read to complete before unblocking the next
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait places the caller into a queue and blocks
|
||||||
|
func Wait(offset int64, size int) {
|
||||||
|
serializer.wait(offset, size)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Done signals that the read operation has finished
|
||||||
|
func Done() {
|
||||||
|
serializer.wg.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
type submission struct {
|
||||||
|
// "ch" is closed by "eventLoop" once it wants to unblock the caller
|
||||||
|
ch chan struct{}
|
||||||
|
// submissions are prioritized by offset (lowest offset gets unblocked first)
|
||||||
|
offset int64
|
||||||
|
// size will be used in the future to detect consecutive read requests. These
|
||||||
|
// can be unblocked immediately.
|
||||||
|
size int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sr *SerializerStruct) wait(offset int64, size int) {
|
||||||
|
ch := make(chan struct{})
|
||||||
|
sb := &submission{
|
||||||
|
ch: ch,
|
||||||
|
offset: offset,
|
||||||
|
size: size,
|
||||||
|
}
|
||||||
|
// Send our submission
|
||||||
|
sr.input <- sb
|
||||||
|
// Wait till we get unblocked
|
||||||
|
<-ch
|
||||||
|
}
|
||||||
|
|
||||||
|
// push returns true if the queue is full after the element has been stored.
|
||||||
|
// It panics if it did not have space to store the element.
|
||||||
|
func (sr *SerializerStruct) push(sb *submission) (full bool) {
|
||||||
|
free := 0
|
||||||
|
stored := false
|
||||||
|
for i, v := range sr.q {
|
||||||
|
if v != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !stored {
|
||||||
|
sr.q[i] = sb
|
||||||
|
stored = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
free++
|
||||||
|
}
|
||||||
|
if !stored {
|
||||||
|
// This should never happen because eventLoop checks if the queue got full
|
||||||
|
log.Panic("BUG: unhandled queue overflow")
|
||||||
|
}
|
||||||
|
if free == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// pop the submission with the lowest offset off the queue
|
||||||
|
func (sr *SerializerStruct) pop() *submission {
|
||||||
|
var winner *submission
|
||||||
|
var winnerIndex int
|
||||||
|
for i, v := range sr.q {
|
||||||
|
if v == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if winner == nil {
|
||||||
|
winner = v
|
||||||
|
winnerIndex = i
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if v.offset < winner.offset {
|
||||||
|
winner = v
|
||||||
|
winnerIndex = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if winner == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
sr.q[winnerIndex] = nil
|
||||||
|
return winner
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sr *SerializerStruct) eventLoop() {
|
||||||
|
sr.input = make(chan *submission)
|
||||||
|
empty := true
|
||||||
|
for {
|
||||||
|
if empty {
|
||||||
|
// If the queue is empty we block on the channel to conserve CPU
|
||||||
|
sb := <-sr.input
|
||||||
|
sr.push(sb)
|
||||||
|
empty = false
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case sb := <-sr.input:
|
||||||
|
full := sr.push(sb)
|
||||||
|
if full {
|
||||||
|
// Queue is full, unblock the new request immediately
|
||||||
|
tlog.Warn.Printf("serialize_reads: queue full, forcing unblock")
|
||||||
|
sr.unblockOne()
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
case <-time.After(time.Microsecond * 500):
|
||||||
|
// Looks like we have waited out all concurrent requests.
|
||||||
|
empty = sr.unblockOne()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unblock a submission and wait for completion
|
||||||
|
func (sr *SerializerStruct) unblockOne() (empty bool) {
|
||||||
|
winner := sr.pop()
|
||||||
|
if winner == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
sr.wg.Add(1)
|
||||||
|
close(winner.ch)
|
||||||
|
sr.wg.Wait()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var serializer SerializerStruct
|
||||||
|
|
||||||
|
// Called by fusefrontend.NewFS
|
||||||
|
func Init() {
|
||||||
|
serializer.input = make(chan *submission)
|
||||||
|
serializer.q = make([]*submission, 10)
|
||||||
|
go serializer.eventLoop()
|
||||||
|
}
|
1
mount.go
1
mount.go
|
@ -191,6 +191,7 @@ func initFuseFrontend(key []byte, args *argContainer, confFile *configfile.ConfF
|
||||||
Raw64: args.raw64,
|
Raw64: args.raw64,
|
||||||
NoPrealloc: args.noprealloc,
|
NoPrealloc: args.noprealloc,
|
||||||
HKDF: args.hkdf,
|
HKDF: args.hkdf,
|
||||||
|
SerializeReads: args.serialize_reads,
|
||||||
}
|
}
|
||||||
// confFile is nil when "-zerokey" or "-masterkey" was used
|
// confFile is nil when "-zerokey" or "-masterkey" was used
|
||||||
if confFile != nil {
|
if confFile != nil {
|
||||||
|
|
Loading…
Reference in New Issue