Skip to content
Snippets Groups Projects
Commit da6bbfb2 authored by Todd Gamblin's avatar Todd Gamblin
Browse files

Add byte-range parameters to llnl.util.lock

parent ea10e3ba
No related branches found
No related tags found
No related merge requests found
...@@ -45,32 +45,46 @@ ...@@ -45,32 +45,46 @@
class Lock(object): class Lock(object):
"""This is an implementation of a filesystem lock using Python's lockf. """This is an implementation of a filesystem lock using Python's lockf.
In Python, `lockf` actually calls `fcntl`, so this should work with any In Python, `lockf` actually calls `fcntl`, so this should work with
filesystem implementation that supports locking through the fcntl calls. any filesystem implementation that supports locking through the fcntl
This includes distributed filesystems like Lustre (when flock is enabled) calls. This includes distributed filesystems like Lustre (when flock
and recent NFS versions. is enabled) and recent NFS versions.
""" """
def __init__(self, path): def __init__(self, path, start=0, length=0):
"""Construct a new lock on the file at ``path``.
By default, the lock applies to the whole file. Optionally,
caller can specify a byte range beginning ``start`` bytes from
the start of the file and extending ``length`` bytes from there.
This exposes a subset of fcntl locking functionality. It does
not currently expose the ``whence`` parameter -- ``whence`` is
always os.SEEK_SET and ``start`` is always evaluated from the
beginning of the file.
"""
self.path = path self.path = path
self._file = None self._file = None
self._reads = 0 self._reads = 0
self._writes = 0 self._writes = 0
# byte range parameters
self._start = start
self._length = length
# PID and host of lock holder # PID and host of lock holder
self.pid = self.old_pid = None self.pid = self.old_pid = None
self.host = self.old_host = None self.host = self.old_host = None
def _lock(self, op, timeout): def _lock(self, op, timeout=_default_timeout):
"""This takes a lock using POSIX locks (``fnctl.lockf``). """This takes a lock using POSIX locks (``fnctl.lockf``).
The lock is implemented as a spin lock using a nonblocking The lock is implemented as a spin lock using a nonblocking call
call to lockf(). to lockf().
On acquiring an exclusive lock, the lock writes this process's On acquiring an exclusive lock, the lock writes this process's
pid and host to the lock file, in case the holding process pid and host to the lock file, in case the holding process needs
needs to be killed later. to be killed later.
If the lock times out, it raises a ``LockError``. If the lock times out, it raises a ``LockError``.
""" """
...@@ -97,7 +111,8 @@ def _lock(self, op, timeout): ...@@ -97,7 +111,8 @@ def _lock(self, op, timeout):
self._file = os.fdopen(fd, fd_mode) self._file = os.fdopen(fd, fd_mode)
# Try to get the lock (will raise if not available.) # Try to get the lock (will raise if not available.)
fcntl.lockf(self._file, op | fcntl.LOCK_NB) fcntl.lockf(self._file, op | fcntl.LOCK_NB,
self._length, self._start, os.SEEK_SET)
# All locks read the owner PID and host # All locks read the owner PID and host
self._read_lock_data() self._read_lock_data()
...@@ -158,7 +173,8 @@ def _unlock(self): ...@@ -158,7 +173,8 @@ def _unlock(self):
be masquerading as write locks, but this removes either. be masquerading as write locks, but this removes either.
""" """
fcntl.lockf(self._file, fcntl.LOCK_UN) fcntl.lockf(self._file, fcntl.LOCK_UN,
self._length, self._start, os.SEEK_SET)
self._file.close() self._file.close()
self._file = None self._file = None
...@@ -174,8 +190,9 @@ def acquire_read(self, timeout=_default_timeout): ...@@ -174,8 +190,9 @@ def acquire_read(self, timeout=_default_timeout):
""" """
if self._reads == 0 and self._writes == 0: if self._reads == 0 and self._writes == 0:
tty.debug('READ LOCK : {0.path} [Acquiring]'.format(self)) tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
self._lock(fcntl.LOCK_SH, timeout) # can raise LockError. .format(self))
self._lock(fcntl.LOCK_SH, timeout=timeout) # can raise LockError.
self._reads += 1 self._reads += 1
return True return True
else: else:
...@@ -194,8 +211,10 @@ def acquire_write(self, timeout=_default_timeout): ...@@ -194,8 +211,10 @@ def acquire_write(self, timeout=_default_timeout):
""" """
if self._writes == 0: if self._writes == 0:
tty.debug('WRITE LOCK : {0.path} [Acquiring]'.format(self)) tty.debug(
self._lock(fcntl.LOCK_EX, timeout) # can raise LockError. 'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
.format(self))
self._lock(fcntl.LOCK_EX, timeout=timeout) # can raise LockError.
self._writes += 1 self._writes += 1
return True return True
else: else:
...@@ -215,7 +234,8 @@ def release_read(self): ...@@ -215,7 +234,8 @@ def release_read(self):
assert self._reads > 0 assert self._reads > 0
if self._reads == 1 and self._writes == 0: if self._reads == 1 and self._writes == 0:
tty.debug('READ LOCK : {0.path} [Released]'.format(self)) tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Released]'
.format(self))
self._unlock() # can raise LockError. self._unlock() # can raise LockError.
self._reads -= 1 self._reads -= 1
return True return True
...@@ -236,7 +256,8 @@ def release_write(self): ...@@ -236,7 +256,8 @@ def release_write(self):
assert self._writes > 0 assert self._writes > 0
if self._writes == 1 and self._reads == 0: if self._writes == 1 and self._reads == 0:
tty.debug('WRITE LOCK : {0.path} [Released]'.format(self)) tty.debug('WRITE LOCK: {0.path}[{0._start}:{0._length}] [Released]'
.format(self))
self._unlock() # can raise LockError. self._unlock() # can raise LockError.
self._writes -= 1 self._writes -= 1
return True return True
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment