Distributed, multi-reader, multi-writer lock, with Redis & Ruby
I am in the middle of a work project which requires a distributed (multi-host) locking mechanism which supports multiple readers and multiple writers. In this case I am not literally reading and writing a specific piece of data, but rather I am in a situation where a front-end reads and writes data to an authoritative back-end system that generally “stays the same”, but semi-regularly undergoes changes (think CMS-level changes) that have the potential to fundamentally change the content the front-end accesses and manipulates, or the validity of the stuff the front-end has already done.
Using a “read/write” lock seems like a good fit - the front-end can obtain a read lock whenever it wants to read from or manipulate data on the back-end, and the back-end can obtain a write lock whenever it wants to make these fundamental changes that directly impact the front-end. Then the front-end is free to check in with the back-end and reconcile any problems that may exist now that the back-end has changed. All this is to say that while this might be called a read/write lock in practice, the way this lock can be used is a little broader and more abstract.
Here’s my (not yet tested in production) implementation of this lock. Assume that it doesn’t work yet - but tests locally seem to suggest it’s doing what we want it to do. The code is well-documented so if you want to understand how it works, give it a read.
# WARNING: This locking algorithm is not yet production-tested and should not | |
# be used in production without further testing. Testing on a local machine | |
# seems to suggest that things are working as expected, but I am anticipating | |
# that there will be edge cases that I have missed. If you find one then feel | |
# free to contact me. | |
# | |
# The goal of this class is to model and provide a distributed (Redis-backed) | |
# multiple-reader, multiple-writer lock. | |
# | |
# The lock is read-preferring, meaning that readers will be able to read | |
# concurrently, but writers will be blocked until all readers have finished | |
# reading. This is a definite shortcoming, but it is currently not clear to me | |
# how to implement a write-preferring lock which is also distributed. There are | |
# plenty of reference implementations when concurrency is only needed between | |
# threads (write-preferring locks require the use of condition variables, and | |
# the ability to send "wakeup" signals to waiting "threads", which Redis does | |
# not appear to be able to provide). | |
# | |
# Despite being called a "read/write" lock, this class does not actually read | |
# or write any specific data. It is simply a lock which can be used to ensure a | |
# resource is not modified while it is being read, or read while it is being | |
# modified, and that multiple readers can read concurrently. | |
# | |
# The reference algorithm for this class is located on Wikipedia, linked below. | |
# The page also contains the write-preferring algorithm. | |
# https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_two_mutexes | |
require "digest" | |
# redlock - 2.0.6 at time of writing | |
# https://github.com/leandromoreira/redlock-rb | |
# Provides a Redis-backed locking mechanism, and allows any reader to release | |
# the lock (which is necessary for this algorithm to work). | |
require "redlock" | |
# redis-objects - 2.0.0.beta at time of writing | |
# https://github.com/nateware/redis-objects | |
# Provides Redis-backed data structures, such as counters and generic values. | |
require "redis-objects" | |
require "redis" | |
class DistributedReadWriteLock | |
REDIS_URL = "redis://localhost:6379/0" | |
REDIS_PASSWORD = nil | |
# @param key [String, Symbol] | |
# @param lock_acquisition_attempts [Integer] number of times to attempt to | |
# acquire a lock before giving up and raising an exception | |
# @param lock_acquisition_delay [Integer] number of milliseconds to wait | |
# between lock acquisition attempts | |
# @param read_lock_max_duration [Integer] number of milliseconds to hold a | |
# read lock before it expires and is automatically released | |
# @param global_lock_max_duration [Integer] number of milliseconds to hold a | |
# global lock before it expires and is automatically released | |
# @param debug [Boolean] whether to print debug messages | |
def initialize( | |
key, | |
lock_acquisition_attempts: 10, | |
lock_acquisition_delay: 500, | |
read_lock_max_duration: 10000, | |
global_lock_max_duration: 30000, | |
debug: false | |
) | |
@key = key | |
configure_redis_objects | |
# A counter which tracks the number of active readers. It should only be | |
# accessed while the reader lock is held. This allows us to determine when | |
# the reader is the "first" reader, meaning that before the reader lock was | |
# acquired, there were no other readers. This is important because the | |
# "first" reader is responsible for acquiring the global lock on behalf of | |
# all readers. The same is true for the "last" reader, which is responsible | |
# for releasing the global lock. This mechanism is why this algorithm is | |
# read-preferring - we ensure the global lock is always held by a reader, | |
# if there are any readers. The lock expires after 60 seconds, so whenever | |
# a new reader comes along, the global lock will need to be extended by | |
# each new reader. | |
@num_readers_active = Redis::Counter.new("#{key}_num_readers_active", expireat: -> { Time.now + 60 }) | |
@num_readers_active.value = 0 | |
# A marshaled hash object which is a "lock info" hash - essentially the | |
# full object representation of an acquired lock as far as Redlock is | |
# concerned. Readers need to be able to acquire the global lock on behalf | |
# of all readers (the "first" reader is responsible for this), and any | |
# reader must be able to release the global lock (the "last" reader is | |
# responsible for this). In order to accomplish this, we store the lock | |
# info hash Redis, and recall it when we need to release the global lock. | |
@global_lock_info = Redis::Value.new("#{key}_global_lock_info", marshal: true) | |
@lock_acquisition_attempts = lock_acquisition_attempts | |
@lock_acquisition_delay = lock_acquisition_delay | |
@read_lock_max_duration = read_lock_max_duration | |
@global_lock_max_duration = global_lock_max_duration | |
@debug = debug | |
end | |
# Obtain a read lock. Must be called with a block. The lock will be released | |
# when the block returns. | |
# | |
# @return [void] | |
def read | |
raise "No block provided" unless block_given? | |
begin_read | |
debug "Performing read" | |
begin | |
yield | |
ensure | |
end_read | |
end | |
end | |
# Obtain a write lock. Must be called with a block. The lock will be released | |
# when the block returns. | |
# | |
# @return [void] | |
def write | |
raise "No block provided" unless block_given? | |
if (global_lock_info = global_lock) | |
debug "Acquired global lock (writer)" | |
debug "Performing write" | |
result = begin | |
yield | |
ensure | |
redlock_client.unlock(global_lock_info) | |
debug "Released global lock (writer)" | |
end | |
else | |
raise "Failed to acquire global lock (writer)" | |
end | |
result || nil | |
end | |
private | |
# @return [Hash] the lock info hash | |
def begin_read | |
if (reader_lock_info = reader_lock) | |
debug "Acquired reader lock (begin_read) - #{reader_lock_info}" | |
global_lock_info = nil | |
global_lock_attempt = false | |
# NOTE: Atomic increment operation. If this block raises an exception, or | |
# returns a falsey value, the change to the value will be discarded. | |
@num_readers_active.increment do |new_value| | |
debug "Incremented @num_readers_active to #{new_value}" | |
if new_value == 1 | |
# We are the first reader, so we need to acquire the global lock on | |
# behalf of all readers, and store it for future recall by either | |
# this reader, or other readers. | |
global_lock_attempt = true | |
if (global_lock_info = global_lock) | |
debug "Acquired global lock due to being first reader" | |
@global_lock_info.value = global_lock_info | |
end | |
elsif (global_lock_info = @global_lock_info.value) | |
# We are not the first reader, but there is a global lock stored, so | |
# we should refresh its expiration. | |
debug "Attempting to refresh global lock expiration" | |
global_lock_attempt = true | |
if (refreshed_global_lock_info = redlock_client.lock(global_lock_info[:resource], @global_lock_max_duration, extend: global_lock_info)) | |
debug "Refreshed global lock expiration" | |
@global_lock_info.value = refreshed_global_lock_info | |
else | |
debug "Unable to refresh global lock expiration" | |
end | |
end | |
failed_global_lock_unlock = !global_lock_info && global_lock_attempt | |
# Ensure that we release the reader lock, even if something went wrong | |
redlock_client.unlock(reader_lock_info) | |
debug "Released reader lock (begin_read)" | |
if failed_global_lock_unlock | |
raise "Failed to acquire global lock" | |
end | |
# Reset the expiration time of the counter, otherwise it might | |
# disappear. TODO: Ensure that the expiration length is at least as | |
# long as the longest possible read operation, otherwise a long-running | |
# read could cause the counter to expire and be reset to 0, then get | |
# decremented to -1, which would be bad. | |
@num_readers_active.reset(new_value) | |
# Truthy return value is required by Redis::Counter to avoid the change | |
# being discarded | |
true | |
end | |
else | |
raise "Failed to acquire reader lock (begin_read)" | |
end | |
end | |
# @return [void] | |
def end_read | |
if (reader_lock_info = reader_lock) | |
debug "Acquired reader lock (end_read) - #{reader_lock_info}" | |
@num_readers_active.decrement do |new_value| | |
debug "Decremented @num_readers_active to #{new_value}" | |
failed_global_lock_unlock = false | |
if new_value == 0 | |
if (global_lock_info = @global_lock_info.value) | |
redlock_client.unlock(global_lock_info) | |
@global_lock_info.value = nil | |
debug "Released and cleared global lock due to being final reader" | |
else | |
failed_global_lock_unlock = true | |
debug "Expected to find global lock info to perform unlock, but none found" | |
end | |
end | |
redlock_client.unlock(reader_lock_info) | |
debug "Released reader lock (end_read)" | |
# Defer raising an exception until after we have released the reader | |
# lock, otherwise we might end up in a situation where we have a reader | |
# lock but no global lock, which would be bad. | |
if failed_global_lock_unlock | |
raise "Expected to find global lock info to perform unlock, but none found" | |
end | |
# Reset the expiration time of the counter, otherwise it might | |
# disappear. TODO: Ensure that the expiration length is at least as | |
# long as the longest possible read operation, otherwise a long-running | |
# read could cause the counter to expire and be reset to 0, then get | |
# decremented to -1, which would be bad. | |
@num_readers_active.reset(new_value) | |
# Truthy return value is required by Redis::Counter to avoid the change | |
# being discarded | |
true | |
end | |
else | |
raise "Failed to acquire reader lock (end_read)" | |
end | |
end | |
# @return [Hash, FalseClass] lock info hash if the lock was available, false | |
# otherwise | |
def reader_lock | |
redlock_client.lock("#{@key}_reader_lock", @read_lock_max_duration) | |
end | |
# @return [Hash, FalseClass] lock info hash if the lock was available, false | |
# otherwise | |
def global_lock | |
redlock_client.lock("#{@key}_global_lock", @global_lock_max_duration) | |
end | |
# @return [Redlock::Client] | |
def redlock_client | |
Redlock::Client.new( | |
[REDIS_URL] | |
retry_count: @lock_acquisition_attempts, | |
retry_delay: @lock_acquisition_delay | |
) | |
end | |
# @return [void] | |
def configure_redis_objects | |
redis_config = {url: REDIS_URL, password: REDIS_PASSWORD} | |
redis_config.delete(:password) if redis_config[:password].blank? | |
Redis::Objects.redis = Redis.new(redis_config) | |
end | |
# @param message [String] | |
# | |
# @return [void] | |
def debug(message) | |
return unless @debug | |
tid = Thread.current.object_id.to_s | |
label = Digest::MD5.hexdigest(tid)[0..2] | |
puts "(#{label}) #{message}" | |
nil | |
end | |
end |
Happy hacking!