Showing posts with label Distributed System. Show all posts
Showing posts with label Distributed System. Show all posts

Sunday, 14 May 2023

Spark 3.3 Notes

1. Spark Driver is initialized first. The no. of executors and the Memory and CPU cores to be assigned to each Executor is specified via configuration which is supplied to the driver.

2. The Spark Driver (session) requests Cluster Manager to provision a container for each Executor with required amount of CPU cores and Memory. Once the containers are started, the Spark Driver starts the Executor process within each container. The Executor is nothing but a JVM process just like the Spark driver and the process can use the CPU cores and Memory allocated to it.

3. The Spark driver then works with the Storage Manager to gather insights on no. of data partitions to create and the executor to which the partition needs to be assigned. E.g. if the data files are distributed over 10 storage nodes and there are 5 Executors with 5 CPU Cores each, each Executor can load 5 partitions of data and work on them in parallel. This means the data stored across 10 storage nodes can be split up into 25 smaller partitions, 5 of which can be assigned to one Executor. The actual assignment of partition to Executor will also take into account the proximity of the Executor container to the storage node as greater proximity implies lower network latency.

4. Each Action in Spark induces the creation of a new Spark Job. Each job is a series of one or more Stages. A new Stage is created each time Spark encounters a wide transformation. Each Stage has one or more Tasks that perform narrow transformations on a data partition and hence all Tasks within a Stage can be executed in parallel as they operate on their own data partitions. Stages themselves are serial in nature, i.e. Stage (i+1) needs to wait for completion of State (i).

Monday, 13 September 2021

Keeping datacenters in sync with asynchronous replication using durable queue

I had often wondered what eventual consistency actually implied in the context of large-scale distributed systems. Does it mean that data would eventually be replicated achieving consistency at a later point in time or that all best attempts would be made to keep disjoint data storage systems in sync before eventually giving up leading to a quasi-consistency scheme in effect?

Over the years, having worked with AWS, third-party clouds and after conceiving a potential working  scheme for consumers of such distributed cloud vendors, I was beginning to understand that the idea of eventual consistency more closely mirrored to what was described about it in theory and resembled less with the cynical view I had first carried about a best-effort, possibly vain scheme to keep data in sync.

Here is a relatively simplistic, high-level depiction about an architecture on the backend that could keep geographically-distributed, trans-continental (just indulging my artistic freedom now) data centers in sync. You can think of the asynchronous queue forwarder as a process/service that collects the writes to be made to the individual data centers albeit indirectly via queuing system that could support concurrent multiple consumers (such as Kafka or SQS), which in this case would be the data centers that would need to be kept in sync. The durability guarantees of the queue should be sufficient to ensure that the data written to would never be lost.

The queue forwarder service could be a standalone, micro-service or otherwise a daemon that would be running locally inside your application container or pod. I have more to comment on micro-service v/s daemon design so stay tuned for a follow-on post. At this point though, I don't necessarily see why one design should be better than the other.





Thursday, 24 December 2020

Replication for Beginners

Replication helps improve read throughput by boosting data redundancy even if it comes at the expense of having to manage data consistency between different replicas. 


Replication strategies - 


Leader based replication - write to a leader, followers catch up with leader eventually


A more durable approach is for write from client to be waiting till the write has propagated to at least one follower in addition to the leader.


Purely Synchronous replication, Most Durable- Client writes to leader and blocks until all followers have gotten a copy of the write from the leader


Semi-Synchronous replication, Less Durable than Purely synchronous but More Available - Client writes to a leader and blocks until some number of followers, “w” have obtained a copy of the write.



How does follower catch up with leader ?


  1. When writes happen, one of the parties, client or the server (leader/ cluster manager) generates a sequence id for the write that is either auto-incrementing or corresponds to the timestamp when the write was received from the client.

    In 1, we mentioned leader or cluster manager does this sequence id generation operation but there are merits to both parties discharging this responsibility. In typical distributed systems that do not have a dedicated cluster manager, leader could assume this responsibility. In others, cluster manager may be a better bet to introduce greater separation and distribution of concerns. There are challenges with both approaches, when leader generates sequence ids, the reliability of sequence id generation are more tightly dependent on the reliability of leader election and subsequent new leader catching up with the former one’s write logs. When cluster manager is introduced in a set up, it is recommended that a high-performant, costly instance is used to reduce the likelihood of this system not failing or otherwise reliability of the system needs to be improved with a secondary cluster manager that will be catching up with the primary one from time to time to maintain durability of writes at all times. 


  1. Going back to 1, after leader has received a write, it typically writes it to a write log (Similar to a write log in databases) first before writing it to its own index and subsequently to the disk. Note that individual implementations may differ based on the challenges involved in keeping writes durable in its environment or application scenarios. For example, some systems may treat a write to leader as completed immediately after a memory-based index has been written to while others may wait until the memory-based index write has also been written to a storage system or write block on disk.


  1. Once the write has completed on the leader, the writes happen to followers at a very simplistic level, based on a push or pull model.

    In Push-based replication models, leader typically writes to a durable and robust queue that guarantees writes to it are never lost and that reads from it also never fail to deliver to any of the consumers registered to it as subscribers. If the robustness of a separate queue aren’t needed, writes can also propagate from the leader to followers in a peering-based or star-based model or ring-based model. Note that replication from leader to followers themselves may be architected in a multitude of different ways based on the specific application scenarios and conditions.

    In Pull-based replication models, followers could poll the leader for all writes that happened on the leader since a sequence-id passed to it by the follower and the changes received from the leader may be applied to the followers’ own indexes and storage systems. 

Sunday, 23 October 2016

Key insight- Consistent hashing

I had glossed over the overview of Consistent Hashing many times in the past, and as much as I knew it was a highly important distributed system hashing technique, I never really appreciated the myriad of applications that it underlay.

Today, I started reading Facebook's paper on scaling memcached for supporting its highly scalable and distributed system architecture. I read the term Consistent Hashing once again and this time, I took the onus to read it out in depth to understand the specifics as much as the high level overview I had already read many times in the past. To those of you reading this, I will try to keep it as technical as possible with key mathematical insights from related sources I have already read. Additionally, I will provide references to the original posts as always so that you can catch a glimpse of the material from the biblical sources themselves. Let's get started now.

Most of you know that naive hashing techniques hash a key, say 'k' to a bucketing system with 'n' buckets, 0,1,2,3,...,n-1 using the formula:

hash(k) mod (n)

While this is a good, simple technique for single system, in-memory hashtables, it doesn't quite scale well in multiple node, distributed system architectures. Let me illustrate with an example.

Suppose you add a new bucket, 'n' to the system. You now have n+1 buckets in your system and this requires you to re-hash each of your already present K keys using the new formula:

hash(k) mod (n+1)

Mathematically speaking, this will move n/(n+1) keys in your existing system to a new bucket. This actually involves an awful lot of network transfers and the communication overhead involved is significantly high for most real-time production applications. When you think about it, if this kind of addition of buckets or removal of buckets happens frequently enough, as is the case with most distributed systems that require frequent horizontal scaling in the form of addition of new nodes or removal of existing nodes in the event of node failures, such re-hashing and consequent movement of keys to new buckets has the potential to bring down whole back-end content servers by bombarding them with great number of write requests within a short interval of time.

Such was the need to revisit hashing from a new perspective and, in 1997, David Karger published a new paper on Consistent Hashing which still retains wide applicability in most real-time, massive, distributed hashing systems today.

With consistent hashing, the idea is to keep hash value of the key nearly the same and largely independent of the number of buckets/nodes in your distributed system. The simplest and earliest implementations of consistent hashing hashed both keys and buckets using the same hash function that ensured both were normalized to a [0,1) interval.  

Suppose hash of your key is 0.6 and you have you have three buckets with hash values, 0.2, 0.5 and 0.8, respectively. You pick the bucket with hash value that is closest to the hash value of your key in a clockwise direction, i.e., 0.8. Let me quickly illustrate this idea:

In a clockwise direction starting at 0 corresponding to 00:00 hours, the above hash values appear in order, 0.2, 0.5, 0.6 and 0.8 respectively. This would map your key in bucket corresponding to hash value 0.8 as you pick the nearest bucket in the clockwise direction.

If you have understood so much, it can be anticipated with reasonable common sense that you will be concerned by many apparent limitations with this approach, more particularly, concerns about its rather non-uniform mapping from keys to buckets. This is understandable and valid and you will see it being addressed shortly, later in this post but for the time being, I will continue to present some key insights about this approach that will make it seemingly more attractive to the naive hashing technique that was discussed earlier. In this technique, if, say, a new bucket, n is added to the system that has n buckets, 1,2,3,4,....,n-1, only 1/(n+1) keys need to be moved to the new node while the remaining keys continue to stay mapped to their original buckets.

Let me make sure you understand this with an illustration. Any time you add a new bucket to this system, it is trivial to note that only a subset of the keys from the bucket preceding this one in the clockwise direction are going to be mapped to it. As a result, it is rather straight-forward and simple to derive the potential set of keys that require movement as a result of the underlying change in the hashing system itself.

Now, I will proceed further to discuss how a more sophisticated variant of consistent hashing addresses the issue concerning the non-uniform key distribution among the existing buckets. This new design is called consistent hashing with virtual nodes.

The idea behind this new approach is to assign to each bucket, a set of disjoint key ranges or partitions as they are usually called as opposed to a single one as we see in the primordial implementation above. To visualize this partition assignment to buckets, check this link on how consistent hashing is implemented in Cassandra, Cassandra Consistent Hashing.

As you can see it's relatively simple to view this organization as a pre-configured mapping from key partitions to nodes themselves. Such table-like mapping may be conceived at application start up and stored for lookup later in cluster leaders, node managers, master nodes or such other distributed system entities that are entrusted with the responsibility to manage a set of machines. 

I hope this post managed to educate you about consistent hashing or otherwise at least, arouse an interest to learn more about hashing systems in general. 

Related sources: