fusefronted: disallow writes running concurrently with reads
As uncovered by xfstests generic/465, concurrent reads and writes could lead to this, doRead 3015532: corrupt block #1039: stupidgcm: message authentication failed, as the read could pick up a block that has not yet been completely written - write() is not atomic! Now writes take ContentLock exclusively, while reads take it shared, meaning that multiple reads can run in parallel with each other, but not with a write. This also simplifies the file header locking.
This commit is contained in:
parent
c70df522d2
commit
f316f1b2df
@ -145,39 +145,34 @@ func (f *File) createHeader() (fileID []byte, err error) {
|
|||||||
//
|
//
|
||||||
// Called by Read() for normal reading,
|
// Called by Read() for normal reading,
|
||||||
// by Write() and Truncate() via doWrite() for Read-Modify-Write.
|
// by Write() and Truncate() via doWrite() for Read-Modify-Write.
|
||||||
//
|
func (f *File) doRead(dst []byte, off uint64, length uint64) ([]byte, fuse.Status) {
|
||||||
// doWrite() uses nolock=true because it makes sure the ID is in the cache and
|
// Get the file ID, either from the open file table, or from disk.
|
||||||
// HeaderLock is locked before calling doRead.
|
var fileID []byte
|
||||||
func (f *File) doRead(dst []byte, off uint64, length uint64, nolock bool) ([]byte, fuse.Status) {
|
f.fileTableEntry.IDLock.Lock()
|
||||||
if !nolock {
|
if f.fileTableEntry.ID != nil {
|
||||||
// Make sure we have the file ID.
|
// Use the cached value in the file table
|
||||||
f.fileTableEntry.HeaderLock.RLock()
|
fileID = f.fileTableEntry.ID
|
||||||
if f.fileTableEntry.ID == nil {
|
} else {
|
||||||
f.fileTableEntry.HeaderLock.RUnlock()
|
// Not cached, we have to read it from disk.
|
||||||
// Yes, somebody else may take the lock before we can. This will get
|
var err error
|
||||||
// the header read twice, but causes no harm otherwise.
|
fileID, err = f.readFileID()
|
||||||
f.fileTableEntry.HeaderLock.Lock()
|
if err != nil {
|
||||||
tmpID, err := f.readFileID()
|
f.fileTableEntry.IDLock.Unlock()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
f.fileTableEntry.HeaderLock.Unlock()
|
// Empty file
|
||||||
return nil, fuse.OK
|
return nil, fuse.OK
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.fileTableEntry.HeaderLock.Unlock()
|
|
||||||
tlog.Warn.Printf("doRead %d: corrupt header: %v", f.qIno.Ino, err)
|
tlog.Warn.Printf("doRead %d: corrupt header: %v", f.qIno.Ino, err)
|
||||||
return nil, fuse.EIO
|
return nil, fuse.EIO
|
||||||
}
|
}
|
||||||
f.fileTableEntry.ID = tmpID
|
|
||||||
// Downgrade the lock.
|
|
||||||
f.fileTableEntry.HeaderLock.Unlock()
|
|
||||||
// The file ID may change in here. This does no harm because we
|
|
||||||
// re-read it after the RLock().
|
|
||||||
f.fileTableEntry.HeaderLock.RLock()
|
|
||||||
}
|
}
|
||||||
|
// Save into the file table
|
||||||
|
f.fileTableEntry.ID = fileID
|
||||||
}
|
}
|
||||||
fileID := f.fileTableEntry.ID
|
f.fileTableEntry.IDLock.Unlock()
|
||||||
if fileID == nil {
|
if fileID == nil {
|
||||||
log.Panicf("filedID=%v, nolock=%v", fileID, nolock)
|
log.Panicf("fileID=%v", fileID)
|
||||||
}
|
}
|
||||||
// Read the backing ciphertext in one go
|
// Read the backing ciphertext in one go
|
||||||
blocks := f.contentEnc.ExplodePlainRange(off, length)
|
blocks := f.contentEnc.ExplodePlainRange(off, length)
|
||||||
@ -189,10 +184,6 @@ func (f *File) doRead(dst []byte, off uint64, length uint64, nolock bool) ([]byt
|
|||||||
ciphertext := f.fs.contentEnc.CReqPool.Get()
|
ciphertext := f.fs.contentEnc.CReqPool.Get()
|
||||||
ciphertext = ciphertext[:int(alignedLength)]
|
ciphertext = ciphertext[:int(alignedLength)]
|
||||||
n, err := f.fd.ReadAt(ciphertext, int64(alignedOffset))
|
n, err := f.fd.ReadAt(ciphertext, int64(alignedOffset))
|
||||||
// We don't care if the file ID changes after we have read the data. Drop the lock.
|
|
||||||
if !nolock {
|
|
||||||
f.fileTableEntry.HeaderLock.RUnlock()
|
|
||||||
}
|
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
tlog.Warn.Printf("read: ReadAt: %s", err.Error())
|
tlog.Warn.Printf("read: ReadAt: %s", err.Error())
|
||||||
return nil, fuse.ToStatus(err)
|
return nil, fuse.ToStatus(err)
|
||||||
@ -251,11 +242,14 @@ func (f *File) Read(buf []byte, off int64) (resultData fuse.ReadResult, code fus
|
|||||||
f.fdLock.RLock()
|
f.fdLock.RLock()
|
||||||
defer f.fdLock.RUnlock()
|
defer f.fdLock.RUnlock()
|
||||||
|
|
||||||
|
f.fileTableEntry.ContentLock.RLock()
|
||||||
|
defer f.fileTableEntry.ContentLock.RUnlock()
|
||||||
|
|
||||||
tlog.Debug.Printf("ino%d: FUSE Read: offset=%d length=%d", f.qIno.Ino, len(buf), off)
|
tlog.Debug.Printf("ino%d: FUSE Read: offset=%d length=%d", f.qIno.Ino, len(buf), off)
|
||||||
if f.fs.args.SerializeReads {
|
if f.fs.args.SerializeReads {
|
||||||
serialize_reads.Wait(off, len(buf))
|
serialize_reads.Wait(off, len(buf))
|
||||||
}
|
}
|
||||||
out, status := f.doRead(buf[:0], uint64(off), uint64(len(buf)), false)
|
out, status := f.doRead(buf[:0], uint64(off), uint64(len(buf)))
|
||||||
if f.fs.args.SerializeReads {
|
if f.fs.args.SerializeReads {
|
||||||
serialize_reads.Done()
|
serialize_reads.Done()
|
||||||
}
|
}
|
||||||
@ -277,31 +271,25 @@ func (f *File) Read(buf []byte, off int64) (resultData fuse.ReadResult, code fus
|
|||||||
// Empty writes do nothing and are allowed.
|
// Empty writes do nothing and are allowed.
|
||||||
func (f *File) doWrite(data []byte, off int64) (uint32, fuse.Status) {
|
func (f *File) doWrite(data []byte, off int64) (uint32, fuse.Status) {
|
||||||
fileWasEmpty := false
|
fileWasEmpty := false
|
||||||
// If the file ID is not cached, read it from disk
|
// Get the file ID, create a new one if it does not exist yet.
|
||||||
if f.fileTableEntry.ID == nil {
|
var fileID []byte
|
||||||
// Block other readers while we mess with the file header. Other writers
|
// The caller has exclusively locked ContentLock, which blocks all other
|
||||||
// are blocked by ContentLock already.
|
// readers and writers. No need to take IDLock.
|
||||||
f.fileTableEntry.HeaderLock.Lock()
|
if f.fileTableEntry.ID != nil {
|
||||||
tmpID, err := f.readFileID()
|
fileID = f.fileTableEntry.ID
|
||||||
|
} else {
|
||||||
|
// If the file ID is not cached, read it from disk
|
||||||
|
var err error
|
||||||
|
fileID, err = f.readFileID()
|
||||||
// Write a new file header if the file is empty
|
// Write a new file header if the file is empty
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
tmpID, err = f.createHeader()
|
fileID, err = f.createHeader()
|
||||||
fileWasEmpty = true
|
fileWasEmpty = true
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.fileTableEntry.HeaderLock.Unlock()
|
|
||||||
return 0, fuse.ToStatus(err)
|
return 0, fuse.ToStatus(err)
|
||||||
}
|
}
|
||||||
f.fileTableEntry.ID = tmpID
|
f.fileTableEntry.ID = fileID
|
||||||
if fileWasEmpty {
|
|
||||||
// The file was empty and we wrote a new file header. Keep the lock
|
|
||||||
// as we might have to kill the file header again if the data write
|
|
||||||
// fails.
|
|
||||||
defer f.fileTableEntry.HeaderLock.Unlock()
|
|
||||||
} else {
|
|
||||||
// We won't touch the header again, drop the lock immediately.
|
|
||||||
f.fileTableEntry.HeaderLock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Handle payload data
|
// Handle payload data
|
||||||
dataBuf := bytes.NewBuffer(data)
|
dataBuf := bytes.NewBuffer(data)
|
||||||
@ -312,7 +300,7 @@ func (f *File) doWrite(data []byte, off int64) (uint32, fuse.Status) {
|
|||||||
// Incomplete block -> Read-Modify-Write
|
// Incomplete block -> Read-Modify-Write
|
||||||
if b.IsPartial() {
|
if b.IsPartial() {
|
||||||
// Read
|
// Read
|
||||||
oldData, status := f.doRead(nil, b.BlockPlainOff(), f.contentEnc.PlainBS(), true)
|
oldData, status := f.doRead(nil, b.BlockPlainOff(), f.contentEnc.PlainBS())
|
||||||
if status != fuse.OK {
|
if status != fuse.OK {
|
||||||
tlog.Warn.Printf("ino%d fh%d: RMW read failed: %s", f.qIno.Ino, f.intFd(), status.String())
|
tlog.Warn.Printf("ino%d fh%d: RMW read failed: %s", f.qIno.Ino, f.intFd(), status.String())
|
||||||
return 0, status
|
return 0, status
|
||||||
@ -382,9 +370,7 @@ func (f *File) Write(data []byte, off int64) (uint32, fuse.Status) {
|
|||||||
f.fdLock.RLock()
|
f.fdLock.RLock()
|
||||||
defer f.fdLock.RUnlock()
|
defer f.fdLock.RUnlock()
|
||||||
if f.released {
|
if f.released {
|
||||||
// The file descriptor has been closed concurrently, which also means
|
// The file descriptor has been closed concurrently
|
||||||
// the wlock has been freed. Exit here so we don't crash trying to access
|
|
||||||
// it.
|
|
||||||
tlog.Warn.Printf("ino%d fh%d: Write on released file", f.qIno.Ino, f.intFd())
|
tlog.Warn.Printf("ino%d fh%d: Write on released file", f.qIno.Ino, f.intFd())
|
||||||
return 0, fuse.EBADF
|
return 0, fuse.EBADF
|
||||||
}
|
}
|
||||||
|
@ -111,9 +111,7 @@ func (f *File) Truncate(newSize uint64) fuse.Status {
|
|||||||
return fuse.ToStatus(err)
|
return fuse.ToStatus(err)
|
||||||
}
|
}
|
||||||
// Truncate to zero kills the file header
|
// Truncate to zero kills the file header
|
||||||
f.fileTableEntry.HeaderLock.Lock()
|
|
||||||
f.fileTableEntry.ID = nil
|
f.fileTableEntry.ID = nil
|
||||||
f.fileTableEntry.HeaderLock.Unlock()
|
|
||||||
return fuse.OK
|
return fuse.OK
|
||||||
}
|
}
|
||||||
// We need the old file size to determine if we are growing or shrinking
|
// We need the old file size to determine if we are growing or shrinking
|
||||||
@ -144,7 +142,7 @@ func (f *File) Truncate(newSize uint64) fuse.Status {
|
|||||||
var data []byte
|
var data []byte
|
||||||
if lastBlockLen > 0 {
|
if lastBlockLen > 0 {
|
||||||
var status fuse.Status
|
var status fuse.Status
|
||||||
data, status = f.doRead(nil, plainOff, lastBlockLen, false)
|
data, status = f.doRead(nil, plainOff, lastBlockLen)
|
||||||
if status != fuse.OK {
|
if status != fuse.OK {
|
||||||
tlog.Warn.Printf("Truncate: shrink doRead returned error: %v", err)
|
tlog.Warn.Printf("Truncate: shrink doRead returned error: %v", err)
|
||||||
return status
|
return status
|
||||||
@ -206,8 +204,6 @@ func (f *File) truncateGrowFile(oldPlainSz uint64, newPlainSz uint64) fuse.Statu
|
|||||||
if newPlainSz%f.contentEnc.PlainBS() == 0 {
|
if newPlainSz%f.contentEnc.PlainBS() == 0 {
|
||||||
// The file was empty, so it did not have a header. Create one.
|
// The file was empty, so it did not have a header. Create one.
|
||||||
if oldPlainSz == 0 {
|
if oldPlainSz == 0 {
|
||||||
f.fileTableEntry.HeaderLock.Lock()
|
|
||||||
defer f.fileTableEntry.HeaderLock.Unlock()
|
|
||||||
id, err := f.createHeader()
|
id, err := f.createHeader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fuse.ToStatus(err)
|
return fuse.ToStatus(err)
|
||||||
|
@ -57,18 +57,16 @@ type table struct {
|
|||||||
|
|
||||||
// Entry is an entry in the open file table
|
// Entry is an entry in the open file table
|
||||||
type Entry struct {
|
type Entry struct {
|
||||||
// Reference count
|
// Reference count. Protected by the table lock.
|
||||||
refCount int
|
refCount int
|
||||||
// ContentLock guards the file content from concurrent writes. Every writer
|
// ContentLock protects on-disk content from concurrent writes. Every writer
|
||||||
// must take this lock before modifying the file content.
|
// must take this lock before modifying the file content.
|
||||||
ContentLock countingMutex
|
ContentLock countingMutex
|
||||||
// HeaderLock guards the file ID (in this struct) and the file header (on
|
|
||||||
// disk). Take HeaderLock.RLock() to make sure the file ID does not change
|
|
||||||
// behind your back. If you modify the file ID, you must take
|
|
||||||
// HeaderLock.Lock().
|
|
||||||
HeaderLock sync.RWMutex
|
|
||||||
// ID is the file ID in the file header.
|
// ID is the file ID in the file header.
|
||||||
ID []byte
|
ID []byte
|
||||||
|
// IDLock must be taken before reading or writing the ID field in this struct,
|
||||||
|
// unless you have an exclusive lock on ContentLock.
|
||||||
|
IDLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register creates an open file table entry for "qi" (or incrementes the
|
// Register creates an open file table entry for "qi" (or incrementes the
|
||||||
@ -101,15 +99,15 @@ func Unregister(qi QIno) {
|
|||||||
|
|
||||||
// countingMutex incrementes t.writeLockCount on each Lock() call.
|
// countingMutex incrementes t.writeLockCount on each Lock() call.
|
||||||
type countingMutex struct {
|
type countingMutex struct {
|
||||||
sync.Mutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *countingMutex) Lock() {
|
func (c *countingMutex) Lock() {
|
||||||
c.Mutex.Lock()
|
c.RWMutex.Lock()
|
||||||
atomic.AddUint64(&t.writeOpCount, 1)
|
atomic.AddUint64(&t.writeOpCount, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteOpCount returns the write lock counter value. This value is encremented
|
// WriteOpCount returns the write lock counter value. This value is incremented
|
||||||
// each time writeLock.Lock() on a file table entry is called.
|
// each time writeLock.Lock() on a file table entry is called.
|
||||||
func WriteOpCount() uint64 {
|
func WriteOpCount() uint64 {
|
||||||
return atomic.LoadUint64(&t.writeOpCount)
|
return atomic.LoadUint64(&t.writeOpCount)
|
||||||
|
Loading…
Reference in New Issue
Block a user