I am in the middle of a work project which requires a distributed (multi-host) locking mechanism which supports multiple readers and multiple writers. In this case I am not literally reading and writing a specific piece of data, but rather I am in a situation where a front-end reads and writes data to an authoritative back-end system that generally “stays the same”, but semi-regularly undergoes changes (think CMS-level changes) that have the potential to fundamentally change the content the front-end accesses and manipulates, or the validity of the stuff the front-end has already done.

Using a “read/write” lock seems like a good fit - the front-end can obtain a read lock whenever it wants to read from or manipulate data on the back-end, and the back-end can obtain a write lock whenever it wants to make these fundamental changes that directly impact the front-end. Then the front-end is free to check in with the back-end and reconcile any problems that may exist now that the back-end has changed. All this is to say that while this might be called a read/write lock in practice, the way this lock can be used is a little broader and more abstract.

Here’s my (not yet tested in production) implementation of this lock. Assume that it doesn’t work yet - but tests locally seem to suggest it’s doing what we want it to do. The code is well-documented so if you want to understand how it works, give it a read.

# WARNING: This locking algorithm is not yet production-tested and should not
# be used in production without further testing. Testing on a local machine
# seems to suggest that things are working as expected, but I am anticipating
# that there will be edge cases that I have missed. If you find one then feel
# free to contact me.
#
# The goal of this class is to model and provide a distributed (Redis-backed)
# multiple-reader, multiple-writer lock.
#
# The lock is read-preferring, meaning that readers will be able to read
# concurrently, but writers will be blocked until all readers have finished
# reading. This is a definite shortcoming, but it is currently not clear to me
# how to implement a write-preferring lock which is also distributed. There are
# plenty of reference implementations when concurrency is only needed between
# threads (write-preferring locks require the use of condition variables, and
# the ability to send "wakeup" signals to waiting "threads", which Redis does
# not appear to be able to provide).
#
# Despite being called a "read/write" lock, this class does not actually read
# or write any specific data. It is simply a lock which can be used to ensure a
# resource is not modified while it is being read, or read while it is being
# modified, and that multiple readers can read concurrently.
#
# The reference algorithm for this class is located on Wikipedia, linked below.
# The page also contains the write-preferring algorithm.
# https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_two_mutexes
require "digest"
# redlock - 2.0.6 at time of writing
# https://github.com/leandromoreira/redlock-rb
# Provides a Redis-backed locking mechanism, and allows any reader to release
# the lock (which is necessary for this algorithm to work).
require "redlock"
# redis-objects - 2.0.0.beta at time of writing
# https://github.com/nateware/redis-objects
# Provides Redis-backed data structures, such as counters and generic values.
require "redis-objects"
require "redis"
class DistributedReadWriteLock
REDIS_URL = "redis://localhost:6379/0"
REDIS_PASSWORD = nil
# @param key [String, Symbol]
# @param lock_acquisition_attempts [Integer] number of times to attempt to
# acquire a lock before giving up and raising an exception
# @param lock_acquisition_delay [Integer] number of milliseconds to wait
# between lock acquisition attempts
# @param read_lock_max_duration [Integer] number of milliseconds to hold a
# read lock before it expires and is automatically released
# @param global_lock_max_duration [Integer] number of milliseconds to hold a
# global lock before it expires and is automatically released
# @param debug [Boolean] whether to print debug messages
def initialize(
key,
lock_acquisition_attempts: 10,
lock_acquisition_delay: 500,
read_lock_max_duration: 10000,
global_lock_max_duration: 30000,
debug: false
)
@key = key
configure_redis_objects
# A counter which tracks the number of active readers. It should only be
# accessed while the reader lock is held. This allows us to determine when
# the reader is the "first" reader, meaning that before the reader lock was
# acquired, there were no other readers. This is important because the
# "first" reader is responsible for acquiring the global lock on behalf of
# all readers. The same is true for the "last" reader, which is responsible
# for releasing the global lock. This mechanism is why this algorithm is
# read-preferring - we ensure the global lock is always held by a reader,
# if there are any readers. The lock expires after 60 seconds, so whenever
# a new reader comes along, the global lock will need to be extended by
# each new reader.
@num_readers_active = Redis::Counter.new("#{key}_num_readers_active", expireat: -> { Time.now + 60 })
@num_readers_active.value = 0
# A marshaled hash object which is a "lock info" hash - essentially the
# full object representation of an acquired lock as far as Redlock is
# concerned. Readers need to be able to acquire the global lock on behalf
# of all readers (the "first" reader is responsible for this), and any
# reader must be able to release the global lock (the "last" reader is
# responsible for this). In order to accomplish this, we store the lock
# info hash Redis, and recall it when we need to release the global lock.
@global_lock_info = Redis::Value.new("#{key}_global_lock_info", marshal: true)
@lock_acquisition_attempts = lock_acquisition_attempts
@lock_acquisition_delay = lock_acquisition_delay
@read_lock_max_duration = read_lock_max_duration
@global_lock_max_duration = global_lock_max_duration
@debug = debug
end
# Obtain a read lock. Must be called with a block. The lock will be released
# when the block returns.
#
# @return [void]
def read
raise "No block provided" unless block_given?
begin_read
debug "Performing read"
begin
yield
ensure
end_read
end
end
# Obtain a write lock. Must be called with a block. The lock will be released
# when the block returns.
#
# @return [void]
def write
raise "No block provided" unless block_given?
if (global_lock_info = global_lock)
debug "Acquired global lock (writer)"
debug "Performing write"
result = begin
yield
ensure
redlock_client.unlock(global_lock_info)
debug "Released global lock (writer)"
end
else
raise "Failed to acquire global lock (writer)"
end
result || nil
end
private
# @return [Hash] the lock info hash
def begin_read
if (reader_lock_info = reader_lock)
debug "Acquired reader lock (begin_read) - #{reader_lock_info}"
global_lock_info = nil
global_lock_attempt = false
# NOTE: Atomic increment operation. If this block raises an exception, or
# returns a falsey value, the change to the value will be discarded.
@num_readers_active.increment do |new_value|
debug "Incremented @num_readers_active to #{new_value}"
if new_value == 1
# We are the first reader, so we need to acquire the global lock on
# behalf of all readers, and store it for future recall by either
# this reader, or other readers.
global_lock_attempt = true
if (global_lock_info = global_lock)
debug "Acquired global lock due to being first reader"
@global_lock_info.value = global_lock_info
end
elsif (global_lock_info = @global_lock_info.value)
# We are not the first reader, but there is a global lock stored, so
# we should refresh its expiration.
debug "Attempting to refresh global lock expiration"
global_lock_attempt = true
if (refreshed_global_lock_info = redlock_client.lock(global_lock_info[:resource], @global_lock_max_duration, extend: global_lock_info))
debug "Refreshed global lock expiration"
@global_lock_info.value = refreshed_global_lock_info
else
debug "Unable to refresh global lock expiration"
end
end
failed_global_lock_unlock = !global_lock_info && global_lock_attempt
# Ensure that we release the reader lock, even if something went wrong
redlock_client.unlock(reader_lock_info)
debug "Released reader lock (begin_read)"
if failed_global_lock_unlock
raise "Failed to acquire global lock"
end
# Reset the expiration time of the counter, otherwise it might
# disappear. TODO: Ensure that the expiration length is at least as
# long as the longest possible read operation, otherwise a long-running
# read could cause the counter to expire and be reset to 0, then get
# decremented to -1, which would be bad.
@num_readers_active.reset(new_value)
# Truthy return value is required by Redis::Counter to avoid the change
# being discarded
true
end
else
raise "Failed to acquire reader lock (begin_read)"
end
end
# @return [void]
def end_read
if (reader_lock_info = reader_lock)
debug "Acquired reader lock (end_read) - #{reader_lock_info}"
@num_readers_active.decrement do |new_value|
debug "Decremented @num_readers_active to #{new_value}"
failed_global_lock_unlock = false
if new_value == 0
if (global_lock_info = @global_lock_info.value)
redlock_client.unlock(global_lock_info)
@global_lock_info.value = nil
debug "Released and cleared global lock due to being final reader"
else
failed_global_lock_unlock = true
debug "Expected to find global lock info to perform unlock, but none found"
end
end
redlock_client.unlock(reader_lock_info)
debug "Released reader lock (end_read)"
# Defer raising an exception until after we have released the reader
# lock, otherwise we might end up in a situation where we have a reader
# lock but no global lock, which would be bad.
if failed_global_lock_unlock
raise "Expected to find global lock info to perform unlock, but none found"
end
# Reset the expiration time of the counter, otherwise it might
# disappear. TODO: Ensure that the expiration length is at least as
# long as the longest possible read operation, otherwise a long-running
# read could cause the counter to expire and be reset to 0, then get
# decremented to -1, which would be bad.
@num_readers_active.reset(new_value)
# Truthy return value is required by Redis::Counter to avoid the change
# being discarded
true
end
else
raise "Failed to acquire reader lock (end_read)"
end
end
# @return [Hash, FalseClass] lock info hash if the lock was available, false
# otherwise
def reader_lock
redlock_client.lock("#{@key}_reader_lock", @read_lock_max_duration)
end
# @return [Hash, FalseClass] lock info hash if the lock was available, false
# otherwise
def global_lock
redlock_client.lock("#{@key}_global_lock", @global_lock_max_duration)
end
# @return [Redlock::Client]
def redlock_client
Redlock::Client.new(
[REDIS_URL]
retry_count: @lock_acquisition_attempts,
retry_delay: @lock_acquisition_delay
)
end
# @return [void]
def configure_redis_objects
redis_config = {url: REDIS_URL, password: REDIS_PASSWORD}
redis_config.delete(:password) if redis_config[:password].blank?
Redis::Objects.redis = Redis.new(redis_config)
end
# @param message [String]
#
# @return [void]
def debug(message)
return unless @debug
tid = Thread.current.object_id.to_s
label = Digest::MD5.hexdigest(tid)[0..2]
puts "(#{label}) #{message}"
nil
end
end

Happy hacking!