Distributed, multi-reader, multi-writer lock, with Redis & Ruby
I am in the middle of a work project which requires a distributed (multi-host) locking mechanism which supports multiple readers and multiple writers. In this case I am not literally reading and writing a specific piece of data, but rather I am in a situation where a front-end reads and writes data to an authoritative back-end system that generally “stays the same”, but semi-regularly undergoes changes (think CMS-level changes) that have the potential to fundamentally change the content the front-end accesses and manipulates, or the validity of the stuff the front-end has already done.
Using a “read/write” lock seems like a good fit - the front-end can obtain a read lock whenever it wants to read from or manipulate data on the back-end, and the back-end can obtain a write lock whenever it wants to make these fundamental changes that directly impact the front-end. Then the front-end is free to check in with the back-end and reconcile any problems that may exist now that the back-end has changed. All this is to say that while this might be called a read/write lock in practice, the way this lock can be used is a little broader and more abstract.
Here’s my (not yet tested in production) implementation of this lock. Assume that it doesn’t work yet - but tests locally seem to suggest it’s doing what we want it to do. The code is well-documented so if you want to understand how it works, give it a read.
| # WARNING: This locking algorithm is not yet production-tested and should not | |
| # be used in production without further testing. Testing on a local machine | |
| # seems to suggest that things are working as expected, but I am anticipating | |
| # that there will be edge cases that I have missed. If you find one then feel | |
| # free to contact me. | |
| # | |
| # The goal of this class is to model and provide a distributed (Redis-backed) | |
| # multiple-reader, multiple-writer lock. | |
| # | |
| # The lock is read-preferring, meaning that readers will be able to read | |
| # concurrently, but writers will be blocked until all readers have finished | |
| # reading. This is a definite shortcoming, but it is currently not clear to me | |
| # how to implement a write-preferring lock which is also distributed. There are | |
| # plenty of reference implementations when concurrency is only needed between | |
| # threads (write-preferring locks require the use of condition variables, and | |
| # the ability to send "wakeup" signals to waiting "threads", which Redis does | |
| # not appear to be able to provide). | |
| # | |
| # Despite being called a "read/write" lock, this class does not actually read | |
| # or write any specific data. It is simply a lock which can be used to ensure a | |
| # resource is not modified while it is being read, or read while it is being | |
| # modified, and that multiple readers can read concurrently. | |
| # | |
| # The reference algorithm for this class is located on Wikipedia, linked below. | |
| # The page also contains the write-preferring algorithm. | |
| # https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_two_mutexes | |
| require "digest" | |
| # redlock - 2.0.6 at time of writing | |
| # https://github.com/leandromoreira/redlock-rb | |
| # Provides a Redis-backed locking mechanism, and allows any reader to release | |
| # the lock (which is necessary for this algorithm to work). | |
| require "redlock" | |
| # redis-objects - 2.0.0.beta at time of writing | |
| # https://github.com/nateware/redis-objects | |
| # Provides Redis-backed data structures, such as counters and generic values. | |
| require "redis-objects" | |
| require "redis" | |
| class DistributedReadWriteLock | |
| REDIS_URL = "redis://localhost:6379/0" | |
| REDIS_PASSWORD = nil | |
| # @param key [String, Symbol] | |
| # @param lock_acquisition_attempts [Integer] number of times to attempt to | |
| # acquire a lock before giving up and raising an exception | |
| # @param lock_acquisition_delay [Integer] number of milliseconds to wait | |
| # between lock acquisition attempts | |
| # @param read_lock_max_duration [Integer] number of milliseconds to hold a | |
| # read lock before it expires and is automatically released | |
| # @param global_lock_max_duration [Integer] number of milliseconds to hold a | |
| # global lock before it expires and is automatically released | |
| # @param debug [Boolean] whether to print debug messages | |
| def initialize( | |
| key, | |
| lock_acquisition_attempts: 10, | |
| lock_acquisition_delay: 500, | |
| read_lock_max_duration: 10000, | |
| global_lock_max_duration: 30000, | |
| debug: false | |
| ) | |
| @key = key | |
| configure_redis_objects | |
| # A counter which tracks the number of active readers. It should only be | |
| # accessed while the reader lock is held. This allows us to determine when | |
| # the reader is the "first" reader, meaning that before the reader lock was | |
| # acquired, there were no other readers. This is important because the | |
| # "first" reader is responsible for acquiring the global lock on behalf of | |
| # all readers. The same is true for the "last" reader, which is responsible | |
| # for releasing the global lock. This mechanism is why this algorithm is | |
| # read-preferring - we ensure the global lock is always held by a reader, | |
| # if there are any readers. The lock expires after 60 seconds, so whenever | |
| # a new reader comes along, the global lock will need to be extended by | |
| # each new reader. | |
| @num_readers_active = Redis::Counter.new("#{key}_num_readers_active", expireat: -> { Time.now + 60 }) | |
| @num_readers_active.value = 0 | |
| # A marshaled hash object which is a "lock info" hash - essentially the | |
| # full object representation of an acquired lock as far as Redlock is | |
| # concerned. Readers need to be able to acquire the global lock on behalf | |
| # of all readers (the "first" reader is responsible for this), and any | |
| # reader must be able to release the global lock (the "last" reader is | |
| # responsible for this). In order to accomplish this, we store the lock | |
| # info hash Redis, and recall it when we need to release the global lock. | |
| @global_lock_info = Redis::Value.new("#{key}_global_lock_info", marshal: true) | |
| @lock_acquisition_attempts = lock_acquisition_attempts | |
| @lock_acquisition_delay = lock_acquisition_delay | |
| @read_lock_max_duration = read_lock_max_duration | |
| @global_lock_max_duration = global_lock_max_duration | |
| @debug = debug | |
| end | |
| # Obtain a read lock. Must be called with a block. The lock will be released | |
| # when the block returns. | |
| # | |
| # @return [void] | |
| def read | |
| raise "No block provided" unless block_given? | |
| begin_read | |
| debug "Performing read" | |
| begin | |
| yield | |
| ensure | |
| end_read | |
| end | |
| end | |
| # Obtain a write lock. Must be called with a block. The lock will be released | |
| # when the block returns. | |
| # | |
| # @return [void] | |
| def write | |
| raise "No block provided" unless block_given? | |
| if (global_lock_info = global_lock) | |
| debug "Acquired global lock (writer)" | |
| debug "Performing write" | |
| result = begin | |
| yield | |
| ensure | |
| redlock_client.unlock(global_lock_info) | |
| debug "Released global lock (writer)" | |
| end | |
| else | |
| raise "Failed to acquire global lock (writer)" | |
| end | |
| result || nil | |
| end | |
| private | |
| # @return [Hash] the lock info hash | |
| def begin_read | |
| if (reader_lock_info = reader_lock) | |
| debug "Acquired reader lock (begin_read) - #{reader_lock_info}" | |
| global_lock_info = nil | |
| global_lock_attempt = false | |
| # NOTE: Atomic increment operation. If this block raises an exception, or | |
| # returns a falsey value, the change to the value will be discarded. | |
| @num_readers_active.increment do |new_value| | |
| debug "Incremented @num_readers_active to #{new_value}" | |
| if new_value == 1 | |
| # We are the first reader, so we need to acquire the global lock on | |
| # behalf of all readers, and store it for future recall by either | |
| # this reader, or other readers. | |
| global_lock_attempt = true | |
| if (global_lock_info = global_lock) | |
| debug "Acquired global lock due to being first reader" | |
| @global_lock_info.value = global_lock_info | |
| end | |
| elsif (global_lock_info = @global_lock_info.value) | |
| # We are not the first reader, but there is a global lock stored, so | |
| # we should refresh its expiration. | |
| debug "Attempting to refresh global lock expiration" | |
| global_lock_attempt = true | |
| if (refreshed_global_lock_info = redlock_client.lock(global_lock_info[:resource], @global_lock_max_duration, extend: global_lock_info)) | |
| debug "Refreshed global lock expiration" | |
| @global_lock_info.value = refreshed_global_lock_info | |
| else | |
| debug "Unable to refresh global lock expiration" | |
| end | |
| end | |
| failed_global_lock_unlock = !global_lock_info && global_lock_attempt | |
| # Ensure that we release the reader lock, even if something went wrong | |
| redlock_client.unlock(reader_lock_info) | |
| debug "Released reader lock (begin_read)" | |
| if failed_global_lock_unlock | |
| raise "Failed to acquire global lock" | |
| end | |
| # Reset the expiration time of the counter, otherwise it might | |
| # disappear. TODO: Ensure that the expiration length is at least as | |
| # long as the longest possible read operation, otherwise a long-running | |
| # read could cause the counter to expire and be reset to 0, then get | |
| # decremented to -1, which would be bad. | |
| @num_readers_active.reset(new_value) | |
| # Truthy return value is required by Redis::Counter to avoid the change | |
| # being discarded | |
| true | |
| end | |
| else | |
| raise "Failed to acquire reader lock (begin_read)" | |
| end | |
| end | |
| # @return [void] | |
| def end_read | |
| if (reader_lock_info = reader_lock) | |
| debug "Acquired reader lock (end_read) - #{reader_lock_info}" | |
| @num_readers_active.decrement do |new_value| | |
| debug "Decremented @num_readers_active to #{new_value}" | |
| failed_global_lock_unlock = false | |
| if new_value == 0 | |
| if (global_lock_info = @global_lock_info.value) | |
| redlock_client.unlock(global_lock_info) | |
| @global_lock_info.value = nil | |
| debug "Released and cleared global lock due to being final reader" | |
| else | |
| failed_global_lock_unlock = true | |
| debug "Expected to find global lock info to perform unlock, but none found" | |
| end | |
| end | |
| redlock_client.unlock(reader_lock_info) | |
| debug "Released reader lock (end_read)" | |
| # Defer raising an exception until after we have released the reader | |
| # lock, otherwise we might end up in a situation where we have a reader | |
| # lock but no global lock, which would be bad. | |
| if failed_global_lock_unlock | |
| raise "Expected to find global lock info to perform unlock, but none found" | |
| end | |
| # Reset the expiration time of the counter, otherwise it might | |
| # disappear. TODO: Ensure that the expiration length is at least as | |
| # long as the longest possible read operation, otherwise a long-running | |
| # read could cause the counter to expire and be reset to 0, then get | |
| # decremented to -1, which would be bad. | |
| @num_readers_active.reset(new_value) | |
| # Truthy return value is required by Redis::Counter to avoid the change | |
| # being discarded | |
| true | |
| end | |
| else | |
| raise "Failed to acquire reader lock (end_read)" | |
| end | |
| end | |
| # @return [Hash, FalseClass] lock info hash if the lock was available, false | |
| # otherwise | |
| def reader_lock | |
| redlock_client.lock("#{@key}_reader_lock", @read_lock_max_duration) | |
| end | |
| # @return [Hash, FalseClass] lock info hash if the lock was available, false | |
| # otherwise | |
| def global_lock | |
| redlock_client.lock("#{@key}_global_lock", @global_lock_max_duration) | |
| end | |
| # @return [Redlock::Client] | |
| def redlock_client | |
| Redlock::Client.new( | |
| [REDIS_URL] | |
| retry_count: @lock_acquisition_attempts, | |
| retry_delay: @lock_acquisition_delay | |
| ) | |
| end | |
| # @return [void] | |
| def configure_redis_objects | |
| redis_config = {url: REDIS_URL, password: REDIS_PASSWORD} | |
| redis_config.delete(:password) if redis_config[:password].blank? | |
| Redis::Objects.redis = Redis.new(redis_config) | |
| end | |
| # @param message [String] | |
| # | |
| # @return [void] | |
| def debug(message) | |
| return unless @debug | |
| tid = Thread.current.object_id.to_s | |
| label = Digest::MD5.hexdigest(tid)[0..2] | |
| puts "(#{label}) #{message}" | |
| nil | |
| end | |
| end |
Happy hacking!