Skip to content
Snippets Groups Projects
Commit ead8ac58 authored by Todd Gamblin's avatar Todd Gamblin
Browse files

Working Lock class, now uses POSIX fcntl locks, extensive unit test.

- llnl.util.lock now uses fcntl.lockf instead of flock
  - purported to have more NFS compatibility.

- Added an extensive test case for locks.
  - tests acquiring, releasing, upgrading, timeouts, shared, & exclusive cases.
parent 908a93a4
No related branches found
No related tags found
No related merge requests found
##############################################################################
# Copyright (c) 2013, Lawrence Livermore National Security, LLC.
# Copyright (c) 2013-2015, Lawrence Livermore National Security, LLC.
# Produced at the Lawrence Livermore National Laboratory.
#
# This file is part of Spack.
......@@ -22,134 +22,135 @@
# along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##############################################################################
"""Lock implementation for shared filesystems."""
import os
import fcntl
import errno
import time
import socket
# Default timeout for locks.
DEFAULT_TIMEOUT = 60
# Default timeout in seconds, after which locks will raise exceptions.
_default_timeout = 60
class _ReadLockContext(object):
"""Context manager that takes and releases a read lock.
# Sleep time per iteration in spin loop (in seconds)
_sleep_time = 1e-5
Arguments are lock and timeout (default 5 minutes)
"""
def __init__(self, lock, timeout=DEFAULT_TIMEOUT):
self._lock = lock
self._timeout = timeout
def __enter__(self):
self._lock.acquire_read(self._timeout)
class Lock(object):
def __init__(self,file_path):
self._file_path = file_path
self._fd = None
self._reads = 0
self._writes = 0
def __exit__(self,type,value,traceback):
self._lock.release_read()
def _lock(self, op, timeout):
"""This takes a lock using POSIX locks (``fnctl.lockf``).
class _WriteLockContext(object):
"""Context manager that takes and releases a write lock.
The lock is implemented as a spin lock using a nonblocking
call to lockf().
Arguments are lock and timeout (default 5 minutes)
"""
def __init__(self, lock, timeout=DEFAULT_TIMEOUT):
self._lock = lock
self._timeout = timeout
On acquiring an exclusive lock, the lock writes this process's
pid and host to the lock file, in case the holding process
needs to be killed later.
def __enter__(self):
self._lock.acquire_write(self._timeout)
If the lock times out, it raises a ``LockError``.
"""
start_time = time.time()
while (time.time() - start_time) < timeout:
try:
if self._fd is None:
self._fd = os.open(self._file_path, os.O_RDWR)
def __exit__(self,type,value,traceback):
self._lock.release_write()
fcntl.lockf(self._fd, op | fcntl.LOCK_NB)
if op == fcntl.LOCK_EX:
os.write(self._fd, "pid=%s,host=%s" % (os.getpid(), socket.getfqdn()))
return
except IOError as error:
if error.errno == errno.EAGAIN or error.errno == errno.EACCES:
pass
else:
raise
time.sleep(_sleep_time)
raise LockError("Timed out waiting for lock.")
class Lock(object):
"""Distributed file-based lock using ``flock``."""
def __init__(self, file_path):
self._file_path = file_path
self._fd = os.open(file_path,os.O_RDWR)
self._reads = 0
self._writes = 0
def _unlock(self):
"""Releases a lock using POSIX locks (``fcntl.lockf``)
Releases the lock regardless of mode. Note that read locks may
be masquerading as write locks, but this removes either.
def write_lock(self, timeout=DEFAULT_TIMEOUT):
"""Convenience method that returns a write lock context."""
return _WriteLockContext(self, timeout)
"""
fcntl.lockf(self._fd,fcntl.LOCK_UN)
os.close(self._fd)
self._fd = None
def read_lock(self, timeout=DEFAULT_TIMEOUT):
"""Convenience method that returns a read lock context."""
return _ReadLockContext(self, timeout)
def acquire_read(self, timeout=_default_timeout):
"""Acquires a recursive, shared lock for reading.
Read and write locks can be acquired and released in arbitrary
order, but the POSIX lock is held until all local read and
write locks are released.
def acquire_read(self, timeout):
"""
Implements recursive lock. If held in both read and write mode,
the write lock will be maintained until all locks are released
"""
if self._reads == 0 and self._writes == 0:
self._lock(fcntl.LOCK_SH, timeout)
self._reads += 1
def acquire_write(self, timeout):
"""
Implements recursive lock
def acquire_write(self, timeout=_default_timeout):
"""Acquires a recursive, exclusive lock for writing.
Read and write locks can be acquired and released in arbitrary
order, but the POSIX lock is held until all local read and
write locks are released.
"""
if self._writes == 0:
self._lock(fcntl.LOCK_EX, timeout)
self._writes += 1
def _lock(self, op, timeout):
"""
The timeout is implemented using nonblocking flock()
to avoid using signals for timing
Write locks store pid and host information to the lock file
Read locks do not store data
"""
total_time = 0
while total_time < timeout:
try:
fcntl.flock(self._fd, op | fcntl.LOCK_NB)
if op == fcntl.LOCK_EX:
with open(self._file_path, 'w') as f:
f.write("pid = " + str(os.getpid()) + ", host = " + socket.getfqdn())
return
except IOError as error:
if error.errno == errno.EAGAIN or error.errno == EACCES:
pass
else:
raise
time.sleep(0.1)
total_time += 0.1
def release_read(self):
"""Releases a read lock.
Returns True if the last recursive lock was released, False if
there are still outstanding locks.
Does limited correctness checking: if a read lock is released
when none are held, this will raise an assertion error.
def release_read(self):
"""
Assert there is a lock of the right type to release, recursive lock
"""
assert self._reads > 0
if self._reads == 1 and self._writes == 0:
self._unlock()
self._reads -= 1
if self._reads == 0 and self._writes == 0:
self._unlock()
return True
return False
def release_write(self):
"""
Assert there is a lock of the right type to release, recursive lock
"""Releases a write lock.
Returns True if the last recursive lock was released, False if
there are still outstanding locks.
Does limited correctness checking: if a read lock is released
when none are held, this will raise an assertion error.
"""
assert self._writes > 0
if self._writes == 1 and self._reads == 0:
self._unlock()
self._writes -= 1
if self._writes == 0 and self._reads == 0:
self._unlock()
return True
return False
def _unlock(self):
"""
Releases the lock regardless of mode. Note that read locks may be
masquerading as write locks at times, but this removes either.
"""
fcntl.flock(self._fd, fcntl.LOCK_UN)
class LockError(Exception):
"""Raised when an attempt to acquire a lock times out."""
pass
......@@ -57,6 +57,7 @@
'optional_deps',
'make_executable',
'configure_guess',
'lock',
'database']
......@@ -77,7 +78,7 @@ def run(names, verbose=False):
if test not in test_names:
tty.error("%s is not a valid spack test name." % test,
"Valid names are:")
colify(test_names, indent=4)
colify(sorted(test_names), indent=4)
sys.exit(1)
runner = unittest.TextTestRunner(verbosity=verbosity)
......
##############################################################################
# Copyright (c) 2013-2015, Lawrence Livermore National Security, LLC.
# Produced at the Lawrence Livermore National Laboratory.
#
# This file is part of Spack.
# Written by Todd Gamblin, tgamblin@llnl.gov, All rights reserved.
# LLNL-CODE-647188
#
# For details, see https://scalability-llnl.github.io/spack
# Please also see the LICENSE file for our notice and the LGPL.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License (as published by
# the Free Software Foundation) version 2.1 dated February 1999.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the IMPLIED WARRANTY OF
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the terms and
# conditions of the GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##############################################################################
"""
These tests ensure that our lock works correctly.
"""
import unittest
import os
import tempfile
import shutil
from multiprocessing import Process
from llnl.util.lock import *
from llnl.util.filesystem import join_path, touch
from spack.util.multiproc import Barrier
# This is the longest a failed test will take, as the barriers will
# time out and raise an exception.
barrier_timeout = 5
def order_processes(*functions):
"""Order some processes using simple barrier synchronization."""
b = Barrier(len(functions), timeout=barrier_timeout)
procs = [Process(target=f, args=(b,)) for f in functions]
for p in procs: p.start()
for p in procs: p.join()
class LockTest(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.lock_path = join_path(self.tempdir, 'lockfile')
touch(self.lock_path)
def tearDown(self):
shutil.rmtree(self.tempdir, ignore_errors=True)
#
# Process snippets below can be composed into tests.
#
def acquire_write(self, barrier):
lock = Lock(self.lock_path)
lock.acquire_write() # grab exclusive lock
barrier.wait()
barrier.wait() # hold the lock until exception raises in other procs.
def acquire_read(self, barrier):
lock = Lock(self.lock_path)
lock.acquire_read() # grab shared lock
barrier.wait()
barrier.wait() # hold the lock until exception raises in other procs.
def timeout_write(self, barrier):
lock = Lock(self.lock_path)
barrier.wait() # wait for lock acquire in first process
self.assertRaises(LockError, lock.acquire_write, 0.1)
barrier.wait()
def timeout_read(self, barrier):
lock = Lock(self.lock_path)
barrier.wait() # wait for lock acquire in first process
self.assertRaises(LockError, lock.acquire_read, 0.1)
barrier.wait()
#
# Test that exclusive locks on other processes time out when an
# exclusive lock is held.
#
def test_write_lock_timeout_on_write(self):
order_processes(self.acquire_write, self.timeout_write)
def test_write_lock_timeout_on_write_2(self):
order_processes(self.acquire_write, self.timeout_write, self.timeout_write)
def test_write_lock_timeout_on_write_3(self):
order_processes(self.acquire_write, self.timeout_write, self.timeout_write, self.timeout_write)
#
# Test that shared locks on other processes time out when an
# exclusive lock is held.
#
def test_read_lock_timeout_on_write(self):
order_processes(self.acquire_write, self.timeout_read)
def test_read_lock_timeout_on_write_2(self):
order_processes(self.acquire_write, self.timeout_read, self.timeout_read)
def test_read_lock_timeout_on_write_3(self):
order_processes(self.acquire_write, self.timeout_read, self.timeout_read, self.timeout_read)
#
# Test that exclusive locks time out when shared locks are held.
#
def test_write_lock_timeout_on_read(self):
order_processes(self.acquire_read, self.timeout_write)
def test_write_lock_timeout_on_read_2(self):
order_processes(self.acquire_read, self.timeout_write, self.timeout_write)
def test_write_lock_timeout_on_read_3(self):
order_processes(self.acquire_read, self.timeout_write, self.timeout_write, self.timeout_write)
#
# Test that exclusive locks time while lots of shared locks are held.
#
def test_write_lock_timeout_with_multiple_readers_2_1(self):
order_processes(self.acquire_read, self.acquire_read, self.timeout_write)
def test_write_lock_timeout_with_multiple_readers_2_2(self):
order_processes(self.acquire_read, self.acquire_read, self.timeout_write, self.timeout_write)
def test_write_lock_timeout_with_multiple_readers_3_1(self):
order_processes(self.acquire_read, self.acquire_read, self.acquire_read, self.timeout_write)
def test_write_lock_timeout_with_multiple_readers_3_2(self):
order_processes(self.acquire_read, self.acquire_read, self.acquire_read, self.timeout_write, self.timeout_write)
#
# Longer test case that ensures locks are reusable. Ordering is
# enforced by barriers throughout -- steps are shown with numbers.
#
def test_complex_acquire_and_release_chain(self):
def p1(barrier):
lock = Lock(self.lock_path)
lock.acquire_write()
barrier.wait() # ---------------------------------------- 1
# others test timeout
barrier.wait() # ---------------------------------------- 2
lock.release_write() # release and others acquire read
barrier.wait() # ---------------------------------------- 3
self.assertRaises(LockError, lock.acquire_write, 0.1)
lock.acquire_read()
barrier.wait() # ---------------------------------------- 4
lock.release_read()
barrier.wait() # ---------------------------------------- 5
# p2 upgrades read to write
barrier.wait() # ---------------------------------------- 6
self.assertRaises(LockError, lock.acquire_write, 0.1)
self.assertRaises(LockError, lock.acquire_read, 0.1)
barrier.wait() # ---------------------------------------- 7
# p2 releases write and read
barrier.wait() # ---------------------------------------- 8
# p3 acquires read
barrier.wait() # ---------------------------------------- 9
# p3 upgrades read to write
barrier.wait() # ---------------------------------------- 10
self.assertRaises(LockError, lock.acquire_write, 0.1)
self.assertRaises(LockError, lock.acquire_read, 0.1)
barrier.wait() # ---------------------------------------- 11
# p3 releases locks
barrier.wait() # ---------------------------------------- 12
lock.acquire_read()
barrier.wait() # ---------------------------------------- 13
lock.release_read()
def p2(barrier):
lock = Lock(self.lock_path)
# p1 acquires write
barrier.wait() # ---------------------------------------- 1
self.assertRaises(LockError, lock.acquire_write, 0.1)
self.assertRaises(LockError, lock.acquire_read, 0.1)
barrier.wait() # ---------------------------------------- 2
lock.acquire_read()
barrier.wait() # ---------------------------------------- 3
# p1 tests shared read
barrier.wait() # ---------------------------------------- 4
# others release reads
barrier.wait() # ---------------------------------------- 5
lock.acquire_write() # upgrade read to write
barrier.wait() # ---------------------------------------- 6
# others test timeout
barrier.wait() # ---------------------------------------- 7
lock.release_write() # release read AND write (need both)
lock.release_read()
barrier.wait() # ---------------------------------------- 8
# p3 acquires read
barrier.wait() # ---------------------------------------- 9
# p3 upgrades read to write
barrier.wait() # ---------------------------------------- 10
self.assertRaises(LockError, lock.acquire_write, 0.1)
self.assertRaises(LockError, lock.acquire_read, 0.1)
barrier.wait() # ---------------------------------------- 11
# p3 releases locks
barrier.wait() # ---------------------------------------- 12
lock.acquire_read()
barrier.wait() # ---------------------------------------- 13
lock.release_read()
def p3(barrier):
lock = Lock(self.lock_path)
# p1 acquires write
barrier.wait() # ---------------------------------------- 1
self.assertRaises(LockError, lock.acquire_write, 0.1)
self.assertRaises(LockError, lock.acquire_read, 0.1)
barrier.wait() # ---------------------------------------- 2
lock.acquire_read()
barrier.wait() # ---------------------------------------- 3
# p1 tests shared read
barrier.wait() # ---------------------------------------- 4
lock.release_read()
barrier.wait() # ---------------------------------------- 5
# p2 upgrades read to write
barrier.wait() # ---------------------------------------- 6
self.assertRaises(LockError, lock.acquire_write, 0.1)
self.assertRaises(LockError, lock.acquire_read, 0.1)
barrier.wait() # ---------------------------------------- 7
# p2 releases write & read
barrier.wait() # ---------------------------------------- 8
lock.acquire_read()
barrier.wait() # ---------------------------------------- 9
lock.acquire_write()
barrier.wait() # ---------------------------------------- 10
# others test timeout
barrier.wait() # ---------------------------------------- 11
lock.release_read() # release read AND write in opposite
lock.release_write() # order from before on p2
barrier.wait() # ---------------------------------------- 12
lock.acquire_read()
barrier.wait() # ---------------------------------------- 13
lock.release_read()
order_processes(p1, p2, p3)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment