Remove serialize_reads package

Will be replaced by go-fuse's new SyncRead flag.

More info: https://github.com/hanwen/go-fuse/issues/395
SyncRead commit: 15a8bb029a
master
Jakob Unterwurzacher 1 year ago
parent e69a85769f
commit b83ca9c921
  1. 8
      internal/fusefrontend/file.go
  2. 4
      internal/fusefrontend/root_node.go
  3. 150
      internal/serialize_reads/sr.go

@ -20,7 +20,6 @@ import (
"github.com/rfjakob/gocryptfs/v2/internal/contentenc"
"github.com/rfjakob/gocryptfs/v2/internal/inomap"
"github.com/rfjakob/gocryptfs/v2/internal/openfiletable"
"github.com/rfjakob/gocryptfs/v2/internal/serialize_reads"
"github.com/rfjakob/gocryptfs/v2/internal/stupidgcm"
"github.com/rfjakob/gocryptfs/v2/internal/syscallcompat"
"github.com/rfjakob/gocryptfs/v2/internal/tlog"
@ -252,13 +251,7 @@ func (f *File) Read(ctx context.Context, buf []byte, off int64) (resultData fuse
defer f.fileTableEntry.ContentLock.RUnlock()
tlog.Debug.Printf("ino%d: FUSE Read: offset=%d length=%d", f.qIno.Ino, off, len(buf))
if f.rootNode.args.SerializeReads {
serialize_reads.Wait(off, len(buf))
}
out, errno := f.doRead(buf[:0], uint64(off), uint64(len(buf)))
if f.rootNode.args.SerializeReads {
serialize_reads.Done()
}
if errno != 0 {
return nil, errno
}
@ -389,6 +382,7 @@ func (f *File) Write(ctx context.Context, data []byte, off int64) (uint32, sysca
// But if the write directly follows an earlier write, it cannot create a
// hole, and we can save one Stat() call.
if !f.isConsecutiveWrite(off) {
fmt.Printf("isConsecutiveWrite=false, off=%d\n", off)
errno := f.writePadHole(off)
if errno != 0 {
return 0, errno

@ -11,7 +11,6 @@ import (
"github.com/rfjakob/gocryptfs/v2/internal/contentenc"
"github.com/rfjakob/gocryptfs/v2/internal/inomap"
"github.com/rfjakob/gocryptfs/v2/internal/nametransform"
"github.com/rfjakob/gocryptfs/v2/internal/serialize_reads"
"github.com/rfjakob/gocryptfs/v2/internal/syscallcompat"
"github.com/rfjakob/gocryptfs/v2/internal/tlog"
)
@ -63,9 +62,6 @@ type RootNode struct {
}
func NewRootNode(args Args, c *contentenc.ContentEnc, n *nametransform.NameTransform) *RootNode {
if args.SerializeReads {
serialize_reads.InitSerializer()
}
if len(args.Exclude) > 0 {
tlog.Warn.Printf("Forward mode does not support -exclude")
}

@ -1,150 +0,0 @@
package serialize_reads
import (
"log"
"sync"
"time"
"github.com/rfjakob/gocryptfs/v2/internal/tlog"
)
// serializerState is used by the Wait and Done functions
type serializerState struct {
// we get submissions through the "input" channel
input chan *submission
// q = Queue
q []*submission
// wg is used to wait for the read to complete before unblocking the next
wg sync.WaitGroup
}
// Wait places the caller into a queue and blocks
func Wait(offset int64, size int) {
serializer.wait(offset, size)
}
// Done signals that the read operation has finished
func Done() {
serializer.wg.Done()
}
type submission struct {
// "ch" is closed by "eventLoop" once it wants to unblock the caller
ch chan struct{}
// submissions are prioritized by offset (lowest offset gets unblocked first)
offset int64
// size will be used in the future to detect consecutive read requests. These
// can be unblocked immediately.
size int
}
func (sr *serializerState) wait(offset int64, size int) {
ch := make(chan struct{})
sb := &submission{
ch: ch,
offset: offset,
size: size,
}
// Send our submission
sr.input <- sb
// Wait till we get unblocked
<-ch
}
// push returns true if the queue is full after the element has been stored.
// It panics if it did not have space to store the element.
func (sr *serializerState) push(sb *submission) (full bool) {
free := 0
stored := false
for i, v := range sr.q {
if v != nil {
continue
}
if !stored {
sr.q[i] = sb
stored = true
continue
}
free++
}
if !stored {
// This should never happen because eventLoop checks if the queue got full
log.Panic("BUG: unhandled queue overflow")
}
if free == 0 {
return true
}
return false
}
// pop the submission with the lowest offset off the queue
func (sr *serializerState) pop() *submission {
var winner *submission
var winnerIndex int
for i, v := range sr.q {
if v == nil {
continue
}
if winner == nil {
winner = v
winnerIndex = i
continue
}
if v.offset < winner.offset {
winner = v
winnerIndex = i
}
}
if winner == nil {
return nil
}
sr.q[winnerIndex] = nil
return winner
}
func (sr *serializerState) eventLoop() {
sr.input = make(chan *submission)
empty := true
for {
if empty {
// If the queue is empty we block on the channel to conserve CPU
sb := <-sr.input
sr.push(sb)
empty = false
}
select {
case sb := <-sr.input:
full := sr.push(sb)
if full {
// Queue is full, unblock the new request immediately
tlog.Warn.Printf("serialize_reads: queue full, forcing unblock")
sr.unblockOne()
}
case <-time.After(time.Microsecond * 500):
// Looks like we have waited out all concurrent requests.
empty = sr.unblockOne()
}
}
}
// Unblock a submission and wait for completion
func (sr *serializerState) unblockOne() (empty bool) {
winner := sr.pop()
if winner == nil {
return true
}
sr.wg.Add(1)
close(winner.ch)
sr.wg.Wait()
return false
}
var serializer serializerState
// InitSerializer sets up the internal serializer state and starts the event loop.
// Called by fusefrontend.NewFS.
func InitSerializer() {
serializer.input = make(chan *submission)
serializer.q = make([]*submission, 10)
go serializer.eventLoop()
}
Loading…
Cancel
Save