diff --git a/lib/spack/llnl/util/filesystem.py b/lib/spack/llnl/util/filesystem.py
index c3ecfde4f48b0509185b0ba5fd10aaef8a6ac17d..e522fdda6d85356aa388e6f2479cea8dececf6a2 100644
--- a/lib/spack/llnl/util/filesystem.py
+++ b/lib/spack/llnl/util/filesystem.py
@@ -39,15 +39,34 @@
 import llnl.util.tty as tty
 from llnl.util.lang import dedupe
 
-__all__ = ['set_install_permissions', 'install', 'install_tree',
-           'traverse_tree',
-           'expand_user', 'working_dir', 'touch', 'touchp', 'mkdirp',
-           'force_remove', 'join_path', 'ancestor', 'can_access',
-           'filter_file',
-           'FileFilter', 'change_sed_delimiter', 'is_exe', 'force_symlink',
-           'set_executable', 'copy_mode', 'unset_executable_mode',
-           'remove_dead_links', 'remove_linked_tree',
-           'fix_darwin_install_name', 'find_libraries', 'LibraryList']
+__all__ = [
+    'FileFilter',
+    'LibraryList',
+    'ancestor',
+    'can_access',
+    'change_sed_delimiter',
+    'copy_mode',
+    'expand_user',
+    'filter_file',
+    'find_libraries',
+    'fix_darwin_install_name',
+    'force_remove',
+    'force_symlink',
+    'install',
+    'install_tree',
+    'is_exe',
+    'join_path',
+    'mkdirp',
+    'remove_dead_links',
+    'remove_if_dead_link',
+    'remove_linked_tree',
+    'set_executable',
+    'set_install_permissions',
+    'touch',
+    'touchp',
+    'traverse_tree',
+    'unset_executable_mode',
+    'working_dir']
 
 
 def filter_file(regex, repl, *filenames, **kwargs):
@@ -388,10 +407,20 @@ def remove_dead_links(root):
     """
     for file in os.listdir(root):
         path = join_path(root, file)
-        if os.path.islink(path):
-            real_path = os.path.realpath(path)
-            if not os.path.exists(real_path):
-                os.unlink(path)
+        remove_if_dead_link(path)
+
+
+def remove_if_dead_link(path):
+    """
+    Removes the argument if it is a dead link, does nothing otherwise
+
+    Args:
+        path: the potential dead link
+    """
+    if os.path.islink(path):
+        real_path = os.path.realpath(path)
+        if not os.path.exists(real_path):
+            os.unlink(path)
 
 
 def remove_linked_tree(path):
diff --git a/lib/spack/llnl/util/lock.py b/lib/spack/llnl/util/lock.py
index f5f53101ae4a0db45a6ed286669b477aa0896bca..2e44a94798573809eb3019e299cdaf2c560ee87b 100644
--- a/lib/spack/llnl/util/lock.py
+++ b/lib/spack/llnl/util/lock.py
@@ -28,9 +28,13 @@
 import time
 import socket
 
+import llnl.util.tty as tty
+
+
 __all__ = ['Lock', 'LockTransaction', 'WriteTransaction', 'ReadTransaction',
            'LockError']
 
+
 # Default timeout in seconds, after which locks will raise exceptions.
 _default_timeout = 60
 
@@ -41,51 +45,86 @@
 class Lock(object):
     """This is an implementation of a filesystem lock using Python's lockf.
 
-    In Python, `lockf` actually calls `fcntl`, so this should work with any
-    filesystem implementation that supports locking through the fcntl calls.
-    This includes distributed filesystems like Lustre (when flock is enabled)
-    and recent NFS versions.
-
+    In Python, `lockf` actually calls `fcntl`, so this should work with
+    any filesystem implementation that supports locking through the fcntl
+    calls.  This includes distributed filesystems like Lustre (when flock
+    is enabled) and recent NFS versions.
     """
 
-    def __init__(self, file_path):
-        self._file_path = file_path
-        self._fd = None
+    def __init__(self, path, start=0, length=0):
+        """Construct a new lock on the file at ``path``.
+
+        By default, the lock applies to the whole file.  Optionally,
+        caller can specify a byte range beginning ``start`` bytes from
+        the start of the file and extending ``length`` bytes from there.
+
+        This exposes a subset of fcntl locking functionality.  It does
+        not currently expose the ``whence`` parameter -- ``whence`` is
+        always os.SEEK_SET and ``start`` is always evaluated from the
+        beginning of the file.
+        """
+        self.path = path
+        self._file = None
         self._reads = 0
         self._writes = 0
 
-    def _lock(self, op, timeout):
+        # byte range parameters
+        self._start = start
+        self._length = length
+
+        # PID and host of lock holder
+        self.pid = self.old_pid = None
+        self.host = self.old_host = None
+
+    def _lock(self, op, timeout=_default_timeout):
         """This takes a lock using POSIX locks (``fnctl.lockf``).
 
-        The lock is implemented as a spin lock using a nonblocking
-        call to lockf().
+        The lock is implemented as a spin lock using a nonblocking call
+        to lockf().
 
         On acquiring an exclusive lock, the lock writes this process's
-        pid and host to the lock file, in case the holding process
-        needs to be killed later.
+        pid and host to the lock file, in case the holding process needs
+        to be killed later.
 
         If the lock times out, it raises a ``LockError``.
         """
         start_time = time.time()
         while (time.time() - start_time) < timeout:
             try:
-                # If this is already open read-only and we want to
-                # upgrade to an exclusive write lock, close first.
-                if self._fd is not None:
-                    flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
-                    if op == fcntl.LOCK_EX and flags | os.O_RDONLY:
-                        os.close(self._fd)
-                        self._fd = None
-
-                if self._fd is None:
-                    mode = os.O_RDWR if op == fcntl.LOCK_EX else os.O_RDONLY
-                    self._fd = os.open(self._file_path, mode)
-
-                fcntl.lockf(self._fd, op | fcntl.LOCK_NB)
+                # If we could write the file, we'd have opened it 'r+'.
+                # Raise an error when we attempt to upgrade to a write lock.
+                if op == fcntl.LOCK_EX:
+                    if self._file and self._file.mode == 'r':
+                        raise LockError(
+                            "Can't take exclusive lock on read-only file: %s"
+                            % self.path)
+
+                # Create file and parent directories if they don't exist.
+                if self._file is None:
+                    self._ensure_parent_directory()
+
+                    # Prefer to open 'r+' to allow upgrading to write
+                    # lock later if possible.  Open read-only if we can't
+                    # write the lock file at all.
+                    os_mode, fd_mode = (os.O_RDWR | os.O_CREAT), 'r+'
+                    if os.path.exists(self.path) and not os.access(
+                            self.path, os.W_OK):
+                        os_mode, fd_mode = os.O_RDONLY, 'r'
+
+                    fd = os.open(self.path, os_mode)
+                    self._file = os.fdopen(fd, fd_mode)
+
+                # Try to get the lock (will raise if not available.)
+                fcntl.lockf(self._file, op | fcntl.LOCK_NB,
+                            self._length, self._start, os.SEEK_SET)
+
+                # All locks read the owner PID and host
+                self._read_lock_data()
+
+                # Exclusive locks write their PID/host
                 if op == fcntl.LOCK_EX:
-                    os.write(
-                        self._fd,
-                        "pid=%s,host=%s" % (os.getpid(), socket.getfqdn()))
+                    self._write_lock_data()
+
                 return
 
             except IOError as error:
@@ -97,6 +136,40 @@ def _lock(self, op, timeout):
 
         raise LockError("Timed out waiting for lock.")
 
+    def _ensure_parent_directory(self):
+        parent = os.path.dirname(self.path)
+        try:
+            os.makedirs(parent)
+            return True
+        except OSError as e:
+            # makedirs can fail when diretory already exists.
+            if not (e.errno == errno.EEXIST and os.path.isdir(parent) or
+                    e.errno == errno.EISDIR):
+                raise
+
+    def _read_lock_data(self):
+        """Read PID and host data out of the file if it is there."""
+        line = self._file.read()
+        if line:
+            pid, host = line.strip().split(',')
+            _, _, self.pid = pid.rpartition('=')
+            _, _, self.host = host.rpartition('=')
+
+    def _write_lock_data(self):
+        """Write PID and host data to the file, recording old values."""
+        self.old_pid = self.pid
+        self.old_host = self.host
+
+        self.pid = os.getpid()
+        self.host = socket.getfqdn()
+
+        # write pid, host to disk to sync over FS
+        self._file.seek(0)
+        self._file.write("pid=%s,host=%s" % (self.pid, self.host))
+        self._file.truncate()
+        self._file.flush()
+        os.fsync(self._file.fileno())
+
     def _unlock(self):
         """Releases a lock using POSIX locks (``fcntl.lockf``)
 
@@ -104,9 +177,10 @@ def _unlock(self):
         be masquerading as write locks, but this removes either.
 
         """
-        fcntl.lockf(self._fd, fcntl.LOCK_UN)
-        os.close(self._fd)
-        self._fd = None
+        fcntl.lockf(self._file, fcntl.LOCK_UN,
+                    self._length, self._start, os.SEEK_SET)
+        self._file.close()
+        self._file = None
 
     def acquire_read(self, timeout=_default_timeout):
         """Acquires a recursive, shared lock for reading.
@@ -120,7 +194,9 @@ def acquire_read(self, timeout=_default_timeout):
 
         """
         if self._reads == 0 and self._writes == 0:
-            self._lock(fcntl.LOCK_SH, timeout)   # can raise LockError.
+            tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
+                      .format(self))
+            self._lock(fcntl.LOCK_SH, timeout=timeout)   # can raise LockError.
             self._reads += 1
             return True
         else:
@@ -139,7 +215,10 @@ def acquire_write(self, timeout=_default_timeout):
 
         """
         if self._writes == 0:
-            self._lock(fcntl.LOCK_EX, timeout)   # can raise LockError.
+            tty.debug(
+                'WRITE LOCK: {0.path}[{0._start}:{0._length}] [Acquiring]'
+                .format(self))
+            self._lock(fcntl.LOCK_EX, timeout=timeout)   # can raise LockError.
             self._writes += 1
             return True
         else:
@@ -159,6 +238,8 @@ def release_read(self):
         assert self._reads > 0
 
         if self._reads == 1 and self._writes == 0:
+            tty.debug('READ LOCK: {0.path}[{0._start}:{0._length}] [Released]'
+                      .format(self))
             self._unlock()      # can raise LockError.
             self._reads -= 1
             return True
@@ -179,6 +260,8 @@ def release_write(self):
         assert self._writes > 0
 
         if self._writes == 1 and self._reads == 0:
+            tty.debug('WRITE LOCK: {0.path}[{0._start}:{0._length}] [Released]'
+                      .format(self))
             self._unlock()      # can raise LockError.
             self._writes -= 1
             return True
diff --git a/lib/spack/spack/cmd/debug.py b/lib/spack/spack/cmd/debug.py
index 958eb829b4aa56819be762de408bbb885759116b..757c5bca808f35f72606cd1111569dfd45046a7c 100644
--- a/lib/spack/spack/cmd/debug.py
+++ b/lib/spack/spack/cmd/debug.py
@@ -23,6 +23,7 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 ##############################################################################
 import os
+import re
 from datetime import datetime
 from glob import glob
 
@@ -53,8 +54,12 @@ def _debug_tarball_suffix():
         if not os.path.isdir('.git'):
             return 'nobranch.nogit.%s' % suffix
 
+        # Get symbolic branch name and strip any special chars (mainly '/')
         symbolic = git(
             'rev-parse', '--abbrev-ref', '--short', 'HEAD', output=str).strip()
+        symbolic = re.sub(r'[^\w.-]', '-', symbolic)
+
+        # Get the commit hash too.
         commit = git(
             'rev-parse', '--short', 'HEAD', output=str).strip()
 
@@ -69,12 +74,23 @@ def create_db_tarball(args):
     tarball_name = "spack-db.%s.tar.gz" % _debug_tarball_suffix()
     tarball_path = os.path.abspath(tarball_name)
 
-    with working_dir(spack.spack_root):
+    base = os.path.basename(spack.install_path)
+    transform_args = []
+    if 'GNU' in tar('--version', output=str):
+        transform_args = ['--transform', 's/^%s/%s/' % (base, tarball_name)]
+    else:
+        transform_args = ['-s', '/^%s/%s/' % (base, tarball_name)]
+
+    wd = os.path.dirname(spack.install_path)
+    with working_dir(wd):
         files = [spack.installed_db._index_path]
-        files += glob('%s/*/*/*/.spack/spec.yaml' % spack.install_path)
+        files += glob('%s/*/*/*/.spack/spec.yaml' % base)
         files = [os.path.relpath(f) for f in files]
 
-        tar('-czf', tarball_path, *files)
+        args = ['-czf', tarball_path]
+        args += transform_args
+        args += files
+        tar(*args)
 
     tty.msg('Created %s' % tarball_name)
 
diff --git a/lib/spack/spack/cmd/diy.py b/lib/spack/spack/cmd/diy.py
index d60fd6bc7afbd3399c2856463e77ad7094c5eb88..08386cac076e8507b8ecead5e71a64dcea5ec4b3 100644
--- a/lib/spack/spack/cmd/diy.py
+++ b/lib/spack/spack/cmd/diy.py
@@ -65,43 +65,40 @@ def diy(self, args):
     if len(specs) > 1:
         tty.die("spack diy only takes one spec.")
 
-    # Take a write lock before checking for existence.
-    with spack.installed_db.write_transaction():
-        spec = specs[0]
-        if not spack.repo.exists(spec.name):
-            tty.warn("No such package: %s" % spec.name)
-            create = tty.get_yes_or_no("Create this package?", default=False)
-            if not create:
-                tty.msg("Exiting without creating.")
-                sys.exit(1)
-            else:
-                tty.msg("Running 'spack edit -f %s'" % spec.name)
-                edit_package(spec.name, spack.repo.first_repo(), None, True)
-                return
+    spec = specs[0]
+    if not spack.repo.exists(spec.name):
+        tty.warn("No such package: %s" % spec.name)
+        create = tty.get_yes_or_no("Create this package?", default=False)
+        if not create:
+            tty.msg("Exiting without creating.")
+            sys.exit(1)
+        else:
+            tty.msg("Running 'spack edit -f %s'" % spec.name)
+            edit_package(spec.name, spack.repo.first_repo(), None, True)
+            return
 
-        if not spec.versions.concrete:
-            tty.die(
-                "spack diy spec must have a single, concrete version. "
-                "Did you forget a package version number?")
+    if not spec.versions.concrete:
+        tty.die(
+            "spack diy spec must have a single, concrete version. "
+            "Did you forget a package version number?")
 
-        spec.concretize()
-        package = spack.repo.get(spec)
+    spec.concretize()
+    package = spack.repo.get(spec)
 
-        if package.installed:
-            tty.error("Already installed in %s" % package.prefix)
-            tty.msg("Uninstall or try adding a version suffix for this "
-                    "DIY build.")
-            sys.exit(1)
+    if package.installed:
+        tty.error("Already installed in %s" % package.prefix)
+        tty.msg("Uninstall or try adding a version suffix for this DIY build.")
+        sys.exit(1)
 
-        # Forces the build to run out of the current directory.
-        package.stage = DIYStage(os.getcwd())
+    # Forces the build to run out of the current directory.
+    package.stage = DIYStage(os.getcwd())
 
-        # TODO: make this an argument, not a global.
-        spack.do_checksum = False
+    # TODO: make this an argument, not a global.
+    spack.do_checksum = False
 
-        package.do_install(
-            keep_prefix=args.keep_prefix,
-            install_deps=not args.ignore_deps,
-            verbose=not args.quiet,
-            keep_stage=True,   # don't remove source dir for DIY.
-            dirty=args.dirty)
+    package.do_install(
+        keep_prefix=args.keep_prefix,
+        install_deps=not args.ignore_deps,
+        verbose=not args.quiet,
+        keep_stage=True,   # don't remove source dir for DIY.
+        dirty=args.dirty)
diff --git a/lib/spack/spack/cmd/install.py b/lib/spack/spack/cmd/install.py
index e51024b05f67768ce1fd1d3a9b551037afa1b244..70abe1dd00ec65e04884dd03b9f2177eb879fa7a 100644
--- a/lib/spack/spack/cmd/install.py
+++ b/lib/spack/spack/cmd/install.py
@@ -84,15 +84,14 @@ def install(parser, args):
     specs = spack.cmd.parse_specs(args.packages, concretize=True)
     for spec in specs:
         package = spack.repo.get(spec)
-        with spack.installed_db.write_transaction():
-            package.do_install(
-                keep_prefix=args.keep_prefix,
-                keep_stage=args.keep_stage,
-                install_deps=not args.ignore_deps,
-                install_self=not args.deps_only,
-                make_jobs=args.jobs,
-                run_tests=args.run_tests,
-                verbose=args.verbose,
-                fake=args.fake,
-                dirty=args.dirty,
-                explicit=True)
+        package.do_install(
+            keep_prefix=args.keep_prefix,
+            keep_stage=args.keep_stage,
+            install_deps=not args.ignore_deps,
+            install_self=not args.deps_only,
+            make_jobs=args.jobs,
+            run_tests=args.run_tests,
+            verbose=args.verbose,
+            fake=args.fake,
+            dirty=args.dirty,
+            explicit=True)
diff --git a/lib/spack/spack/cmd/uninstall.py b/lib/spack/spack/cmd/uninstall.py
index 2d09b88c8e87b79687274b583b6de70ff5fb0104..bbcd2e787c182d6eccaf9421e9034d5963d0e864 100644
--- a/lib/spack/spack/cmd/uninstall.py
+++ b/lib/spack/spack/cmd/uninstall.py
@@ -193,16 +193,14 @@ def uninstall(parser, args):
     if not args.packages and not args.all:
         tty.die("uninstall requires at least one package argument.")
 
-    with spack.installed_db.write_transaction():
+    uninstall_list = get_uninstall_list(args)
 
-        uninstall_list = get_uninstall_list(args)
+    if not args.yes_to_all:
+        tty.msg("The following packages will be uninstalled : ")
+        print('')
+        spack.cmd.display_specs(uninstall_list, **display_args)
+        print('')
+        spack.cmd.ask_for_confirmation('Do you want to proceed ? ')
 
-        if not args.yes_to_all:
-            tty.msg("The following packages will be uninstalled : ")
-            print('')
-            spack.cmd.display_specs(uninstall_list, **display_args)
-            print('')
-            spack.cmd.ask_for_confirmation('Do you want to proceed ? ')
-
-        # Uninstall everything on the list
-        do_uninstall(uninstall_list, args.force)
+    # Uninstall everything on the list
+    do_uninstall(uninstall_list, args.force)
diff --git a/lib/spack/spack/database.py b/lib/spack/spack/database.py
index f73d3765c8f0b67c261a663c39ad22692bc2c764..e9bd07d92cab1bcf5549dec1c4c1dd0a1505df20 100644
--- a/lib/spack/spack/database.py
+++ b/lib/spack/spack/database.py
@@ -33,7 +33,7 @@
   2. It will allow us to track external installations as well as lost
      packages and their dependencies.
 
-Prior ot the implementation of this store, a direcotry layout served
+Prior to the implementation of this store, a directory layout served
 as the authoritative database of packages in Spack.  This module
 provides a cache and a sanity checking mechanism for what is in the
 filesystem.
@@ -156,13 +156,13 @@ def __init__(self, root, db_dir=None):
         self._index_path = join_path(self._db_dir, 'index.yaml')
         self._lock_path = join_path(self._db_dir, 'lock')
 
+        # This is for other classes to use to lock prefix directories.
+        self.prefix_lock_path = join_path(self._db_dir, 'prefix_lock')
+
         # Create needed directories and files
         if not os.path.exists(self._db_dir):
             mkdirp(self._db_dir)
 
-        if not os.path.exists(self._lock_path):
-            touch(self._lock_path)
-
         # initialize rest of state.
         self.lock = Lock(self._lock_path)
         self._data = {}
diff --git a/lib/spack/spack/file_cache.py b/lib/spack/spack/file_cache.py
index 0a66166fd8f6ca7612a0a78305379e45a3b72a73..31ae009836191268c1192fa103f0d52f11e473bc 100644
--- a/lib/spack/spack/file_cache.py
+++ b/lib/spack/spack/file_cache.py
@@ -77,10 +77,7 @@ def _lock_path(self, key):
     def _get_lock(self, key):
         """Create a lock for a key, if necessary, and return a lock object."""
         if key not in self._locks:
-            lock_file = self._lock_path(key)
-            if not os.path.exists(lock_file):
-                touch(lock_file)
-            self._locks[key] = Lock(lock_file)
+            self._locks[key] = Lock(self._lock_path(key))
         return self._locks[key]
 
     def init_entry(self, key):
diff --git a/lib/spack/spack/package.py b/lib/spack/spack/package.py
index aa874bf5085d3c8a9d3ce7556d4d6eb68a67ca8f..768605294f8e7cb3362e34cbe584994713ad67fb 100644
--- a/lib/spack/spack/package.py
+++ b/lib/spack/spack/package.py
@@ -39,8 +39,16 @@
 import textwrap
 import time
 import string
+import contextlib
+from StringIO import StringIO
 
+import llnl.util.lock
 import llnl.util.tty as tty
+from llnl.util.filesystem import *
+from llnl.util.lang import *
+from llnl.util.link_tree import LinkTree
+from llnl.util.tty.log import log_output
+
 import spack
 import spack.build_environment
 import spack.compilers
@@ -53,12 +61,8 @@
 import spack.url
 import spack.util.web
 
-from StringIO import StringIO
-from llnl.util.filesystem import *
-from llnl.util.lang import *
-from llnl.util.link_tree import LinkTree
-from llnl.util.tty.log import log_output
 from spack.stage import Stage, ResourceStage, StageComposite
+from spack.util.crypto import bit_length
 from spack.util.environment import dump_environment
 from spack.util.executable import ProcessError, which
 from spack.version import *
@@ -305,6 +309,7 @@ class SomePackage(Package):
     Package creators override functions like install() (all of them do this),
     clean() (some of them do this), and others to provide custom behavior.
     """
+
     #
     # These are default values for instance variables.
     #
@@ -336,6 +341,9 @@ class SomePackage(Package):
     """
     sanity_check_is_dir = []
 
+    """Per-process lock objects for each install prefix."""
+    prefix_locks = {}
+
     class __metaclass__(type):
         """Ensure  attributes required by Spack directives are present."""
         def __init__(cls, name, bases, dict):
@@ -346,6 +354,9 @@ def __init__(self, spec):
         # this determines how the package should be built.
         self.spec = spec
 
+        # Lock on the prefix shared resource. Will be set in prefix property
+        self._prefix_lock = None
+
         # Name of package is the name of its module, without the
         # containing module names.
         self.name = self.module.__name__
@@ -691,6 +702,29 @@ def installed_dependents(self):
                     dependents.append(spec)
         return dependents
 
+    @property
+    def prefix_lock(self):
+        """Prefix lock is a byte range lock on the nth byte of a file.
+
+        The lock file is ``spack.installed_db.prefix_lock`` -- the DB
+        tells us what to call it and it lives alongside the install DB.
+
+        n is the sys.maxsize-bit prefix of the DAG hash.  This makes
+        likelihood of collision is very low AND it gives us
+        readers-writer lock semantics with just a single lockfile, so no
+        cleanup required.
+        """
+        if self._prefix_lock is None:
+            prefix = self.spec.prefix
+            if prefix not in Package.prefix_locks:
+                Package.prefix_locks[prefix] = llnl.util.lock.Lock(
+                    spack.installed_db.prefix_lock_path,
+                    self.spec.dag_hash_bit_prefix(bit_length(sys.maxsize)), 1)
+
+            self._prefix_lock = Package.prefix_locks[prefix]
+
+        return self._prefix_lock
+
     @property
     def prefix(self):
         """Get the prefix into which this package should be installed."""
@@ -875,6 +909,22 @@ def _resource_stage(self, resource):
         resource_stage_folder = '-'.join(pieces)
         return resource_stage_folder
 
+    @contextlib.contextmanager
+    def _prefix_read_lock(self):
+        try:
+            self.prefix_lock.acquire_read(60)
+            yield self
+        finally:
+            self.prefix_lock.release_read()
+
+    @contextlib.contextmanager
+    def _prefix_write_lock(self):
+        try:
+            self.prefix_lock.acquire_write(60)
+            yield self
+        finally:
+            self.prefix_lock.release_write()
+
     install_phases = set(['configure', 'build', 'install', 'provenance'])
 
     def do_install(self,
@@ -926,14 +976,18 @@ def do_install(self,
 
         # Ensure package is not already installed
         layout = spack.install_layout
-        if 'install' in install_phases and layout.check_installed(self.spec):
-            tty.msg("%s is already installed in %s" % (self.name, self.prefix))
-            rec = spack.installed_db.get_record(self.spec)
-            if (not rec.explicit) and explicit:
-                with spack.installed_db.write_transaction():
-                    rec = spack.installed_db.get_record(self.spec)
-                    rec.explicit = True
-            return
+        with self._prefix_read_lock():
+            if ('install' in install_phases and
+                layout.check_installed(self.spec)):
+
+                tty.msg(
+                    "%s is already installed in %s" % (self.name, self.prefix))
+                rec = spack.installed_db.get_record(self.spec)
+                if (not rec.explicit) and explicit:
+                    with spack.installed_db.write_transaction():
+                        rec = spack.installed_db.get_record(self.spec)
+                        rec.explicit = True
+                return
 
         tty.msg("Installing %s" % self.name)
 
@@ -983,7 +1037,7 @@ def build_process():
             self.build_directory = join_path(self.stage.path, 'spack-build')
             self.source_directory = self.stage.source_path
 
-            with self.stage:
+            with contextlib.nested(self.stage, self._prefix_write_lock()):
                 # Run the pre-install hook in the child process after
                 # the directory is created.
                 spack.hooks.pre_install(self)
@@ -1077,8 +1131,9 @@ def build_process():
                          wrap=False)
             raise
 
-        # note: PARENT of the build process adds the new package to
+        # Parent of the build process adds the new package to
         # the database, so that we don't need to re-read from file.
+        # NOTE: add() implicitly acquires a write-lock
         spack.installed_db.add(
             self.spec, spack.install_layout, explicit=explicit)
 
@@ -1259,11 +1314,12 @@ def do_uninstall(self, force=False):
                 raise PackageStillNeededError(self.spec, dependents)
 
         # Pre-uninstall hook runs first.
-        spack.hooks.pre_uninstall(self)
-
-        # Uninstalling in Spack only requires removing the prefix.
-        self.remove_prefix()
-        spack.installed_db.remove(self.spec)
+        with self._prefix_write_lock():
+            spack.hooks.pre_uninstall(self)
+            # Uninstalling in Spack only requires removing the prefix.
+            self.remove_prefix()
+            #
+            spack.installed_db.remove(self.spec)
         tty.msg("Successfully uninstalled %s" % self.spec.short_spec)
 
         # Once everything else is done, run post install hooks
diff --git a/lib/spack/spack/spec.py b/lib/spack/spack/spec.py
index ba9cea876d080f1ed945dbb340b034a545da6a3b..fc4bf41e349462a73ad29606f219dd206d0dd78d 100644
--- a/lib/spack/spack/spec.py
+++ b/lib/spack/spack/spec.py
@@ -120,6 +120,7 @@
 from spack.util.string import *
 import spack.util.spack_yaml as syaml
 from spack.util.spack_yaml import syaml_dict
+from spack.util.crypto import prefix_bits
 from spack.version import *
 from spack.provider_index import ProviderIndex
 
@@ -963,13 +964,10 @@ def prefix(self):
         return Prefix(spack.install_layout.path_for_spec(self))
 
     def dag_hash(self, length=None):
-        """
-        Return a hash of the entire spec DAG, including connectivity.
-        """
+        """Return a hash of the entire spec DAG, including connectivity."""
         if self._hash:
             return self._hash[:length]
         else:
-            # XXX(deptype): ignore 'build' dependencies here
             yaml_text = syaml.dump(
                 self.to_node_dict(), default_flow_style=True, width=sys.maxint)
             sha = hashlib.sha1(yaml_text)
@@ -978,6 +976,10 @@ def dag_hash(self, length=None):
                 self._hash = b32_hash
             return b32_hash
 
+    def dag_hash_bit_prefix(self, bits):
+        """Get the first <bits> bits of the DAG hash as an integer type."""
+        return base32_prefix_bits(self.dag_hash(), bits)
+
     def to_node_dict(self):
         d = syaml_dict()
 
@@ -999,6 +1001,8 @@ def to_node_dict(self):
         if self.architecture:
             d['arch'] = self.architecture.to_dict()
 
+        # TODO: restore build dependencies here once we have less picky
+        # TODO: concretization.
         deps = self.dependencies_dict(deptype=('link', 'run'))
         if deps:
             d['dependencies'] = syaml_dict([
@@ -2723,6 +2727,16 @@ def parse_anonymous_spec(spec_like, pkg_name):
     return anon_spec
 
 
+def base32_prefix_bits(hash_string, bits):
+    """Return the first <bits> bits of a base32 string as an integer."""
+    if bits > len(hash_string) * 5:
+        raise ValueError("Too many bits! Requested %d bit prefix of '%s'."
+                         % (bits, hash_string))
+
+    hash_bytes = base64.b32decode(hash_string, casefold=True)
+    return prefix_bits(hash_bytes, bits)
+
+
 class SpecError(spack.error.SpackError):
     """Superclass for all errors that occur while constructing specs."""
 
diff --git a/lib/spack/spack/stage.py b/lib/spack/spack/stage.py
index 1b12966bc1e3121effde0a1cfc3ee203c95997a8..c0dfbba98716fa3a5f9bea9aece529a33be86a08 100644
--- a/lib/spack/spack/stage.py
+++ b/lib/spack/spack/stage.py
@@ -23,12 +23,15 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 ##############################################################################
 import os
+import sys
 import errno
+import hashlib
 import shutil
 import tempfile
 from urlparse import urljoin
 
 import llnl.util.tty as tty
+import llnl.util.lock
 from llnl.util.filesystem import *
 
 import spack.util.pattern as pattern
@@ -38,6 +41,7 @@
 import spack.fetch_strategy as fs
 import spack.error
 from spack.version import *
+from spack.util.crypto import prefix_bits, bit_length
 
 STAGE_PREFIX = 'spack-stage-'
 
@@ -88,8 +92,12 @@ class Stage(object):
     similar, and are intended to persist for only one run of spack.
     """
 
-    def __init__(self, url_or_fetch_strategy,
-                 name=None, mirror_path=None, keep=False, path=None):
+    """Shared dict of all stage locks."""
+    stage_locks = {}
+
+    def __init__(
+            self, url_or_fetch_strategy,
+            name=None, mirror_path=None, keep=False, path=None, lock=True):
         """Create a stage object.
            Parameters:
              url_or_fetch_strategy
@@ -147,6 +155,20 @@ def __init__(self, url_or_fetch_strategy,
         # Flag to decide whether to delete the stage folder on exit or not
         self.keep = keep
 
+        # File lock for the stage directory.  We use one file for all
+        # stage locks. See Spec.prefix_lock for details on this approach.
+        self._lock = None
+        if lock:
+            if self.name not in Stage.stage_locks:
+                sha1 = hashlib.sha1(self.name).digest()
+                lock_id = prefix_bits(sha1, bit_length(sys.maxsize))
+                stage_lock_path = join_path(spack.stage_path, '.lock')
+
+                Stage.stage_locks[self.name] = llnl.util.lock.Lock(
+                    stage_lock_path, lock_id, 1)
+
+            self._lock = Stage.stage_locks[self.name]
+
     def __enter__(self):
         """
         Entering a stage context will create the stage directory
@@ -154,6 +176,8 @@ def __enter__(self):
         Returns:
             self
         """
+        if self._lock is not None:
+            self._lock.acquire_write(timeout=60)
         self.create()
         return self
 
@@ -175,6 +199,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
         if exc_type is None and not self.keep:
             self.destroy()
 
+        if self._lock is not None:
+            self._lock.release_write()
+
     def _need_to_create_path(self):
         """Makes sure nothing weird has happened since the last time we
            looked at path.  Returns True if path already exists and is ok.
@@ -416,7 +443,8 @@ def create(self):
         """
         # Create the top-level stage directory
         mkdirp(spack.stage_path)
-        remove_dead_links(spack.stage_path)
+        remove_if_dead_link(self.path)
+
         # If a tmp_root exists then create a directory there and then link it
         # in the stage area, otherwise create the stage directory in self.path
         if self._need_to_create_path():
diff --git a/lib/spack/spack/test/lock.py b/lib/spack/spack/test/lock.py
index 32cbe13ce18f91c2c524e38278163cd44d88006e..4f62cd85e9dc656a5c9a40cff2dcf15bdf6139e3 100644
--- a/lib/spack/spack/test/lock.py
+++ b/lib/spack/spack/test/lock.py
@@ -25,6 +25,7 @@
 """
 These tests ensure that our lock works correctly.
 """
+import os
 import shutil
 import tempfile
 import unittest
@@ -44,7 +45,6 @@ class LockTest(unittest.TestCase):
     def setUp(self):
         self.tempdir = tempfile.mkdtemp()
         self.lock_path = join_path(self.tempdir, 'lockfile')
-        touch(self.lock_path)
 
     def tearDown(self):
         shutil.rmtree(self.tempdir, ignore_errors=True)
@@ -64,98 +64,185 @@ def multiproc_test(self, *functions):
     #
     # Process snippets below can be composed into tests.
     #
-    def acquire_write(self, barrier):
-        lock = Lock(self.lock_path)
-        lock.acquire_write()  # grab exclusive lock
-        barrier.wait()
-        barrier.wait()  # hold the lock until exception raises in other procs.
-
-    def acquire_read(self, barrier):
-        lock = Lock(self.lock_path)
-        lock.acquire_read()  # grab shared lock
-        barrier.wait()
-        barrier.wait()  # hold the lock until exception raises in other procs.
-
-    def timeout_write(self, barrier):
-        lock = Lock(self.lock_path)
-        barrier.wait()  # wait for lock acquire in first process
-        self.assertRaises(LockError, lock.acquire_write, 0.1)
-        barrier.wait()
+    def acquire_write(self, start=0, length=0):
+        def fn(barrier):
+            lock = Lock(self.lock_path, start, length)
+            lock.acquire_write()  # grab exclusive lock
+            barrier.wait()
+            barrier.wait()  # hold the lock until timeout in other procs.
+        return fn
+
+    def acquire_read(self, start=0, length=0):
+        def fn(barrier):
+            lock = Lock(self.lock_path, start, length)
+            lock.acquire_read()  # grab shared lock
+            barrier.wait()
+            barrier.wait()  # hold the lock until timeout in other procs.
+        return fn
+
+    def timeout_write(self, start=0, length=0):
+        def fn(barrier):
+            lock = Lock(self.lock_path, start, length)
+            barrier.wait()  # wait for lock acquire in first process
+            self.assertRaises(LockError, lock.acquire_write, 0.1)
+            barrier.wait()
+        return fn
 
-    def timeout_read(self, barrier):
-        lock = Lock(self.lock_path)
-        barrier.wait()  # wait for lock acquire in first process
-        self.assertRaises(LockError, lock.acquire_read, 0.1)
-        barrier.wait()
+    def timeout_read(self, start=0, length=0):
+        def fn(barrier):
+            lock = Lock(self.lock_path, start, length)
+            barrier.wait()  # wait for lock acquire in first process
+            self.assertRaises(LockError, lock.acquire_read, 0.1)
+            barrier.wait()
+        return fn
 
     #
     # Test that exclusive locks on other processes time out when an
     # exclusive lock is held.
     #
     def test_write_lock_timeout_on_write(self):
-        self.multiproc_test(self.acquire_write, self.timeout_write)
+        self.multiproc_test(self.acquire_write(), self.timeout_write())
 
     def test_write_lock_timeout_on_write_2(self):
         self.multiproc_test(
-            self.acquire_write, self.timeout_write, self.timeout_write)
+            self.acquire_write(), self.timeout_write(), self.timeout_write())
 
     def test_write_lock_timeout_on_write_3(self):
         self.multiproc_test(
-            self.acquire_write, self.timeout_write, self.timeout_write,
-            self.timeout_write)
+            self.acquire_write(), self.timeout_write(), self.timeout_write(),
+            self.timeout_write())
+
+    def test_write_lock_timeout_on_write_ranges(self):
+        self.multiproc_test(
+            self.acquire_write(0, 1), self.timeout_write(0, 1))
+
+    def test_write_lock_timeout_on_write_ranges_2(self):
+        self.multiproc_test(
+            self.acquire_write(0, 64), self.acquire_write(65, 1),
+            self.timeout_write(0, 1), self.timeout_write(63, 1))
+
+    def test_write_lock_timeout_on_write_ranges_3(self):
+        self.multiproc_test(
+            self.acquire_write(0, 1), self.acquire_write(1, 1),
+            self.timeout_write(), self.timeout_write(), self.timeout_write())
+
+    def test_write_lock_timeout_on_write_ranges_4(self):
+        self.multiproc_test(
+            self.acquire_write(0, 1), self.acquire_write(1, 1),
+            self.acquire_write(2, 456), self.acquire_write(500, 64),
+            self.timeout_write(), self.timeout_write(), self.timeout_write())
 
     #
     # Test that shared locks on other processes time out when an
     # exclusive lock is held.
     #
     def test_read_lock_timeout_on_write(self):
-        self.multiproc_test(self.acquire_write, self.timeout_read)
+        self.multiproc_test(self.acquire_write(), self.timeout_read())
 
     def test_read_lock_timeout_on_write_2(self):
         self.multiproc_test(
-            self.acquire_write, self.timeout_read, self.timeout_read)
+            self.acquire_write(), self.timeout_read(), self.timeout_read())
 
     def test_read_lock_timeout_on_write_3(self):
         self.multiproc_test(
-            self.acquire_write, self.timeout_read, self.timeout_read,
-            self.timeout_read)
+            self.acquire_write(), self.timeout_read(), self.timeout_read(),
+            self.timeout_read())
+
+    def test_read_lock_timeout_on_write_ranges(self):
+        """small write lock, read whole file."""
+        self.multiproc_test(self.acquire_write(0, 1), self.timeout_read())
+
+    def test_read_lock_timeout_on_write_ranges_2(self):
+        """small write lock, small read lock"""
+        self.multiproc_test(self.acquire_write(0, 1), self.timeout_read(0, 1))
+
+    def test_read_lock_timeout_on_write_ranges_3(self):
+        """two write locks, overlapping read locks"""
+        self.multiproc_test(
+            self.acquire_write(0, 1), self.acquire_write(64, 128),
+            self.timeout_read(0, 1), self.timeout_read(128, 256))
 
     #
     # Test that exclusive locks time out when shared locks are held.
     #
     def test_write_lock_timeout_on_read(self):
-        self.multiproc_test(self.acquire_read, self.timeout_write)
+        self.multiproc_test(self.acquire_read(), self.timeout_write())
 
     def test_write_lock_timeout_on_read_2(self):
         self.multiproc_test(
-            self.acquire_read, self.timeout_write, self.timeout_write)
+            self.acquire_read(), self.timeout_write(), self.timeout_write())
 
     def test_write_lock_timeout_on_read_3(self):
         self.multiproc_test(
-            self.acquire_read, self.timeout_write, self.timeout_write,
-            self.timeout_write)
+            self.acquire_read(), self.timeout_write(), self.timeout_write(),
+            self.timeout_write())
+
+    def test_write_lock_timeout_on_read_ranges(self):
+        self.multiproc_test(self.acquire_read(0, 1), self.timeout_write())
+
+    def test_write_lock_timeout_on_read_ranges_2(self):
+        self.multiproc_test(self.acquire_read(0, 1), self.timeout_write(0, 1))
+
+    def test_write_lock_timeout_on_read_ranges_3(self):
+        self.multiproc_test(
+            self.acquire_read(0, 1), self.acquire_read(10, 1),
+            self.timeout_write(0, 1), self.timeout_write(10, 1))
+
+    def test_write_lock_timeout_on_read_ranges_4(self):
+        self.multiproc_test(
+            self.acquire_read(0, 64),
+            self.timeout_write(10, 1), self.timeout_write(32, 1))
+
+    def test_write_lock_timeout_on_read_ranges_5(self):
+        self.multiproc_test(
+            self.acquire_read(64, 128),
+            self.timeout_write(65, 1), self.timeout_write(127, 1),
+            self.timeout_write(90, 10))
 
     #
     # Test that exclusive locks time while lots of shared locks are held.
     #
     def test_write_lock_timeout_with_multiple_readers_2_1(self):
         self.multiproc_test(
-            self.acquire_read, self.acquire_read, self.timeout_write)
+            self.acquire_read(), self.acquire_read(), self.timeout_write())
 
     def test_write_lock_timeout_with_multiple_readers_2_2(self):
         self.multiproc_test(
-            self.acquire_read, self.acquire_read, self.timeout_write,
-            self.timeout_write)
+            self.acquire_read(), self.acquire_read(), self.timeout_write(),
+            self.timeout_write())
 
     def test_write_lock_timeout_with_multiple_readers_3_1(self):
         self.multiproc_test(
-            self.acquire_read, self.acquire_read, self.acquire_read,
-            self.timeout_write)
+            self.acquire_read(), self.acquire_read(), self.acquire_read(),
+            self.timeout_write())
 
     def test_write_lock_timeout_with_multiple_readers_3_2(self):
         self.multiproc_test(
-            self.acquire_read, self.acquire_read, self.acquire_read,
-            self.timeout_write, self.timeout_write)
+            self.acquire_read(), self.acquire_read(), self.acquire_read(),
+            self.timeout_write(), self.timeout_write())
+
+    def test_write_lock_timeout_with_multiple_readers_2_1_ranges(self):
+        self.multiproc_test(
+            self.acquire_read(0, 10), self.acquire_read(5, 10),
+            self.timeout_write(5, 5))
+
+    def test_write_lock_timeout_with_multiple_readers_2_3_ranges(self):
+        self.multiproc_test(
+            self.acquire_read(0, 10), self.acquire_read(5, 15),
+            self.timeout_write(0, 1), self.timeout_write(11, 3),
+            self.timeout_write(7, 1))
+
+    def test_write_lock_timeout_with_multiple_readers_3_1_ranges(self):
+        self.multiproc_test(
+            self.acquire_read(0, 5), self.acquire_read(5, 5),
+            self.acquire_read(10, 5),
+            self.timeout_write(0, 15))
+
+    def test_write_lock_timeout_with_multiple_readers_3_2_ranges(self):
+        self.multiproc_test(
+            self.acquire_read(0, 5), self.acquire_read(5, 5),
+            self.acquire_read(10, 5),
+            self.timeout_write(3, 10), self.timeout_write(5, 1))
 
     #
     # Test that read can be upgraded to write.
@@ -172,19 +259,42 @@ def test_upgrade_read_to_write(self):
         lock.acquire_read()
         self.assertTrue(lock._reads == 1)
         self.assertTrue(lock._writes == 0)
+        self.assertTrue(lock._file.mode == 'r+')
 
         lock.acquire_write()
         self.assertTrue(lock._reads == 1)
         self.assertTrue(lock._writes == 1)
+        self.assertTrue(lock._file.mode == 'r+')
 
         lock.release_write()
         self.assertTrue(lock._reads == 1)
         self.assertTrue(lock._writes == 0)
+        self.assertTrue(lock._file.mode == 'r+')
 
         lock.release_read()
         self.assertTrue(lock._reads == 0)
         self.assertTrue(lock._writes == 0)
-        self.assertTrue(lock._fd is None)
+        self.assertTrue(lock._file is None)
+
+    #
+    # Test that read-only file can be read-locked but not write-locked.
+    #
+    def test_upgrade_read_to_write_fails_with_readonly_file(self):
+        # ensure lock file exists the first time, so we open it read-only
+        # to begin wtih.
+        touch(self.lock_path)
+        os.chmod(self.lock_path, 0444)
+
+        lock = Lock(self.lock_path)
+        self.assertTrue(lock._reads == 0)
+        self.assertTrue(lock._writes == 0)
+
+        lock.acquire_read()
+        self.assertTrue(lock._reads == 1)
+        self.assertTrue(lock._writes == 0)
+        self.assertTrue(lock._file.mode == 'r')
+
+        self.assertRaises(LockError, lock.acquire_write)
 
     #
     # Longer test case that ensures locks are reusable. Ordering is
diff --git a/lib/spack/spack/test/spec_dag.py b/lib/spack/spack/test/spec_dag.py
index 40cdb029663ddfdda80bffaa61ad144cf64b0c0a..0bc63bcf0fb0e340cf8465cd3de07e92b6ee62ab 100644
--- a/lib/spack/spack/test/spec_dag.py
+++ b/lib/spack/spack/test/spec_dag.py
@@ -523,3 +523,37 @@ def descend_and_check(iterable, level=0):
             level = descend_and_check(dag.to_node_dict())
             # level just makes sure we are doing something here
             self.assertTrue(level >= 5)
+
+    def test_hash_bits(self):
+        """Ensure getting first n bits of a base32-encoded DAG hash works."""
+
+        # RFC 4648 base32 decode table
+        b32 = dict((j, i) for i, j in enumerate('abcdefghijklmnopqrstuvwxyz'))
+        b32.update(dict((j, i) for i, j in enumerate('234567', 26)))
+
+        # some package hashes
+        tests = [
+            '35orsd4cenv743hg4i5vxha2lzayycby',
+            '6kfqtj7dap3773rxog6kkmoweix5gpwo',
+            'e6h6ff3uvmjbq3azik2ckr6ckwm3depv',
+            'snz2juf4ij7sv77cq3vs467q6acftmur',
+            '4eg47oedi5bbkhpoxw26v3oe6vamkfd7',
+            'vrwabwj6umeb5vjw6flx2rnft3j457rw']
+
+        for test_hash in tests:
+            # string containing raw bits of hash ('1' and '0')
+            expected = ''.join([format(b32[c], '#07b').replace('0b', '')
+                                for c in test_hash])
+
+            for bits in (1, 2, 3, 4, 7, 8, 9, 16, 64, 117, 128, 160):
+                actual_int = spack.spec.base32_prefix_bits(test_hash, bits)
+                fmt = "#0%sb" % (bits + 2)
+                actual = format(actual_int, fmt).replace('0b', '')
+
+                self.assertEqual(expected[:bits], actual)
+
+            self.assertRaises(
+                ValueError, spack.spec.base32_prefix_bits, test_hash, 161)
+
+            self.assertRaises(
+                ValueError, spack.spec.base32_prefix_bits, test_hash, 256)
diff --git a/lib/spack/spack/util/crypto.py b/lib/spack/spack/util/crypto.py
index 22777fdb6896821f9cdb8347801702e2d84bfa0b..d0747160223387a282e3988a2f916b41eb64a890 100644
--- a/lib/spack/spack/util/crypto.py
+++ b/lib/spack/spack/util/crypto.py
@@ -100,3 +100,24 @@ def check(self, filename):
         self.sum = checksum(
             self.hash_fun, filename, block_size=self.block_size)
         return self.sum == self.hexdigest
+
+
+def prefix_bits(byte_array, bits):
+    """Return the first <bits> bits of a byte array as an integer."""
+    result = 0
+    n = 0
+    for i, b in enumerate(byte_array):
+        n += 8
+        result = (result << 8) | ord(b)
+        if n >= bits:
+            break
+
+    result >>= (n - bits)
+    return result
+
+
+def bit_length(num):
+    """Number of bits required to represent an integer in binary."""
+    s = bin(num)
+    s = s.lstrip('-0b')
+    return len(s)