Sunday, 14 May 2023

Spark 3.3 Notes

1. Spark Driver is initialized first. The no. of executors and the Memory and CPU cores to be assigned to each Executor is specified via configuration which is supplied to the driver.

2. The Spark Driver (session) requests Cluster Manager to provision a container for each Executor with required amount of CPU cores and Memory. Once the containers are started, the Spark Driver starts the Executor process within each container. The Executor is nothing but a JVM process just like the Spark driver and the process can use the CPU cores and Memory allocated to it.

3. The Spark driver then works with the Storage Manager to gather insights on no. of data partitions to create and the executor to which the partition needs to be assigned. E.g. if the data files are distributed over 10 storage nodes and there are 5 Executors with 5 CPU Cores each, each Executor can load 5 partitions of data and work on them in parallel. This means the data stored across 10 storage nodes can be split up into 25 smaller partitions, 5 of which can be assigned to one Executor. The actual assignment of partition to Executor will also take into account the proximity of the Executor container to the storage node as greater proximity implies lower network latency.

4. Each Action in Spark induces the creation of a new Spark Job. Each job is a series of one or more Stages. A new Stage is created each time Spark encounters a wide transformation. Each Stage has one or more Tasks that perform narrow transformations on a data partition and hence all Tasks within a Stage can be executed in parallel as they operate on their own data partitions. Stages themselves are serial in nature, i.e. Stage (i+1) needs to wait for completion of State (i).

Sunday, 19 September 2021

Difference between VMs and Docker images


Having wondered about how Docker strived to differ from VMs, I embarked on a thought experiment for some time. I finally got some leads today and here is my best-effort explanation in discerning the difference between the two for posterity.

Let's begin by attributing due credit to the team at dotcloud, which was a start up in 2008 era that built Docker and has since come to be known by the same name. This team realized that managing application dependencies and binaries across a number of customers is cumbersome and unreliable.

Think of a product such as Adobe Dreamweaver that is deployed to a number of clusters across different organizations with custom environment setups. If Adobe were built as a standalone application that would be deployed on bare metal instances, that could cause OS level differences such as differing memory management and I/O access strategies from inducing a different behavior on different bare metal clusters.
 
Now think of an improvement that would enable developers at Adobe to have control over the environment of individual deployment centers themselves. That can be understood as one of the possible motivations for creating VMs apart from the obvious stated benefits such as reducing operational costs with better resource management. But a company that builds software products like Adobe has little control over exercising decisions concerning the environment of deployment in their customer application clusters. Hence what is preferable and in fact feasible is a mechanism that allows application builders to directly package their application in an environment that mandates what OS and specific dependency versions are needed to reliably run the application. This is Docker. The environment is contained in the Docker image that also includes the application binary and dependencies. This is without doubt one of the pioneering moments in software build, packaging and deployment history.

Monday, 13 September 2021

Keeping datacenters in sync with asynchronous replication using durable queue

I had often wondered what eventual consistency actually implied in the context of large-scale distributed systems. Does it mean that data would eventually be replicated achieving consistency at a later point in time or that all best attempts would be made to keep disjoint data storage systems in sync before eventually giving up leading to a quasi-consistency scheme in effect?

Over the years, having worked with AWS, third-party clouds and after conceiving a potential working  scheme for consumers of such distributed cloud vendors, I was beginning to understand that the idea of eventual consistency more closely mirrored to what was described about it in theory and resembled less with the cynical view I had first carried about a best-effort, possibly vain scheme to keep data in sync.

Here is a relatively simplistic, high-level depiction about an architecture on the backend that could keep geographically-distributed, trans-continental (just indulging my artistic freedom now) data centers in sync. You can think of the asynchronous queue forwarder as a process/service that collects the writes to be made to the individual data centers albeit indirectly via queuing system that could support concurrent multiple consumers (such as Kafka or SQS), which in this case would be the data centers that would need to be kept in sync. The durability guarantees of the queue should be sufficient to ensure that the data written to would never be lost.

The queue forwarder service could be a standalone, micro-service or otherwise a daemon that would be running locally inside your application container or pod. I have more to comment on micro-service v/s daemon design so stay tuned for a follow-on post. At this point though, I don't necessarily see why one design should be better than the other.





Thursday, 24 December 2020

Replication for Beginners

Replication helps improve read throughput by boosting data redundancy even if it comes at the expense of having to manage data consistency between different replicas. 


Replication strategies - 


Leader based replication - write to a leader, followers catch up with leader eventually


A more durable approach is for write from client to be waiting till the write has propagated to at least one follower in addition to the leader.


Purely Synchronous replication, Most Durable- Client writes to leader and blocks until all followers have gotten a copy of the write from the leader


Semi-Synchronous replication, Less Durable than Purely synchronous but More Available - Client writes to a leader and blocks until some number of followers, “w” have obtained a copy of the write.



How does follower catch up with leader ?


  1. When writes happen, one of the parties, client or the server (leader/ cluster manager) generates a sequence id for the write that is either auto-incrementing or corresponds to the timestamp when the write was received from the client.

    In 1, we mentioned leader or cluster manager does this sequence id generation operation but there are merits to both parties discharging this responsibility. In typical distributed systems that do not have a dedicated cluster manager, leader could assume this responsibility. In others, cluster manager may be a better bet to introduce greater separation and distribution of concerns. There are challenges with both approaches, when leader generates sequence ids, the reliability of sequence id generation are more tightly dependent on the reliability of leader election and subsequent new leader catching up with the former one’s write logs. When cluster manager is introduced in a set up, it is recommended that a high-performant, costly instance is used to reduce the likelihood of this system not failing or otherwise reliability of the system needs to be improved with a secondary cluster manager that will be catching up with the primary one from time to time to maintain durability of writes at all times. 


  1. Going back to 1, after leader has received a write, it typically writes it to a write log (Similar to a write log in databases) first before writing it to its own index and subsequently to the disk. Note that individual implementations may differ based on the challenges involved in keeping writes durable in its environment or application scenarios. For example, some systems may treat a write to leader as completed immediately after a memory-based index has been written to while others may wait until the memory-based index write has also been written to a storage system or write block on disk.


  1. Once the write has completed on the leader, the writes happen to followers at a very simplistic level, based on a push or pull model.

    In Push-based replication models, leader typically writes to a durable and robust queue that guarantees writes to it are never lost and that reads from it also never fail to deliver to any of the consumers registered to it as subscribers. If the robustness of a separate queue aren’t needed, writes can also propagate from the leader to followers in a peering-based or star-based model or ring-based model. Note that replication from leader to followers themselves may be architected in a multitude of different ways based on the specific application scenarios and conditions.

    In Pull-based replication models, followers could poll the leader for all writes that happened on the leader since a sequence-id passed to it by the follower and the changes received from the leader may be applied to the followers’ own indexes and storage systems. 

Sunday, 13 December 2020

NoSQL vs SQL For Beginners

Often times, we compare and contrast two popular DBMS paradigms, NoSQL and SQL. While we are all conversant with the basics of these two kinds of systems, significant developments in the recent versions of two technologies themselves has rendered a lot of our understanding moot.

5 years back, the following would have been the most common distinction between NoSQL and SQL as far as their applicable use cases concern-

NoSQL -

  1. You have a read/write heavy system with little to no updates.
  2. You want horizontal scaling flexibility
  3. You have a simple lookup use case with no/minimal joins between related collections/tables.
  4. You need the flexibility of a schema-less design and have arbitrary schema for each document in your collection.
  5. You believe that denormalized data storage eliminates need for joins at runtime.
SQL - 
  1. Your system is transactional in nature and as such needs to leverage ACID capabilities offered by a typical SQL solution
  2. You conceive little to no changes in data model/schema.
  3. You have planned your design for at least 3 years ahead into the future and any further performance improvements needed then shall be addressed simply with vertical scaling of your database servers.
  4. Oracle is the leading enterprise SQL solution and you don't need all the bells and whistles of a paid solution for a backend system that supports clients within your own enterprise.

Many of the above are simply true even today and need no further explanation. But here are a few that I believe would benefit from explicit reiteration - 

The premise that SQL databases cannot be horizontally scaled is simply not true. You can horizontally scale even your SQL databases by sharding your indexes if you think it will grow too huge for an index that can be completely held in memory. 

I guess about 15 years back, we were living in a time where SQL and NoSQL had a strict distinction over their respective capabilities but today the gap between them has bridged greatly. Many characteristics of SQL have been borrowed into NoSQL solutions (think of joins, various consistency models other than eventually consistent, transactional capabilities etc.) and vice-versa (think of schema-less design with json/clobs).

So next time you get asked this question in an interview, feel free to confidently clarify any misconceptions the interviewers may themselves have.

Appendix- 
ACID - Atomicity, Consistency, Isolation, Durability
BASE - Basically Available, Soft state, Eventually consistent

Friday, 19 June 2020

How naive implementation of find in Union-Find is quadratic in number of nodes

For naive implementation of find in union-find, how is the cost O(N^2)? Are all nodes in the graph at the same depth? Is that even possible if they are connected?

Let's take an example of graph of "n" nodes with maximum height, which looks somewhat like a left-skewed or right-skewed tree.

In a skewed tree, the leaf itself is at a depth N-1. For every node above it, depth is decreasing by one, i.e. N-2, N-3, N-4, ...,3,2,1, 0.
If we add this together, 0 + 1+2,+3+...N-1 => (ignoring 0, we get) (N-1) (N)/2 = O(N^2)
One of those instances when math clarifies logic. I will add code snippets to the the above shortly.




Sunday, 23 October 2016

Key insight- Consistent hashing

I had glossed over the overview of Consistent Hashing many times in the past, and as much as I knew it was a highly important distributed system hashing technique, I never really appreciated the myriad of applications that it underlay.

Today, I started reading Facebook's paper on scaling memcached for supporting its highly scalable and distributed system architecture. I read the term Consistent Hashing once again and this time, I took the onus to read it out in depth to understand the specifics as much as the high level overview I had already read many times in the past. To those of you reading this, I will try to keep it as technical as possible with key mathematical insights from related sources I have already read. Additionally, I will provide references to the original posts as always so that you can catch a glimpse of the material from the biblical sources themselves. Let's get started now.

Most of you know that naive hashing techniques hash a key, say 'k' to a bucketing system with 'n' buckets, 0,1,2,3,...,n-1 using the formula:

hash(k) mod (n)

While this is a good, simple technique for single system, in-memory hashtables, it doesn't quite scale well in multiple node, distributed system architectures. Let me illustrate with an example.

Suppose you add a new bucket, 'n' to the system. You now have n+1 buckets in your system and this requires you to re-hash each of your already present K keys using the new formula:

hash(k) mod (n+1)

Mathematically speaking, this will move n/(n+1) keys in your existing system to a new bucket. This actually involves an awful lot of network transfers and the communication overhead involved is significantly high for most real-time production applications. When you think about it, if this kind of addition of buckets or removal of buckets happens frequently enough, as is the case with most distributed systems that require frequent horizontal scaling in the form of addition of new nodes or removal of existing nodes in the event of node failures, such re-hashing and consequent movement of keys to new buckets has the potential to bring down whole back-end content servers by bombarding them with great number of write requests within a short interval of time.

Such was the need to revisit hashing from a new perspective and, in 1997, David Karger published a new paper on Consistent Hashing which still retains wide applicability in most real-time, massive, distributed hashing systems today.

With consistent hashing, the idea is to keep hash value of the key nearly the same and largely independent of the number of buckets/nodes in your distributed system. The simplest and earliest implementations of consistent hashing hashed both keys and buckets using the same hash function that ensured both were normalized to a [0,1) interval.  

Suppose hash of your key is 0.6 and you have you have three buckets with hash values, 0.2, 0.5 and 0.8, respectively. You pick the bucket with hash value that is closest to the hash value of your key in a clockwise direction, i.e., 0.8. Let me quickly illustrate this idea:

In a clockwise direction starting at 0 corresponding to 00:00 hours, the above hash values appear in order, 0.2, 0.5, 0.6 and 0.8 respectively. This would map your key in bucket corresponding to hash value 0.8 as you pick the nearest bucket in the clockwise direction.

If you have understood so much, it can be anticipated with reasonable common sense that you will be concerned by many apparent limitations with this approach, more particularly, concerns about its rather non-uniform mapping from keys to buckets. This is understandable and valid and you will see it being addressed shortly, later in this post but for the time being, I will continue to present some key insights about this approach that will make it seemingly more attractive to the naive hashing technique that was discussed earlier. In this technique, if, say, a new bucket, n is added to the system that has n buckets, 1,2,3,4,....,n-1, only 1/(n+1) keys need to be moved to the new node while the remaining keys continue to stay mapped to their original buckets.

Let me make sure you understand this with an illustration. Any time you add a new bucket to this system, it is trivial to note that only a subset of the keys from the bucket preceding this one in the clockwise direction are going to be mapped to it. As a result, it is rather straight-forward and simple to derive the potential set of keys that require movement as a result of the underlying change in the hashing system itself.

Now, I will proceed further to discuss how a more sophisticated variant of consistent hashing addresses the issue concerning the non-uniform key distribution among the existing buckets. This new design is called consistent hashing with virtual nodes.

The idea behind this new approach is to assign to each bucket, a set of disjoint key ranges or partitions as they are usually called as opposed to a single one as we see in the primordial implementation above. To visualize this partition assignment to buckets, check this link on how consistent hashing is implemented in Cassandra, Cassandra Consistent Hashing.

As you can see it's relatively simple to view this organization as a pre-configured mapping from key partitions to nodes themselves. Such table-like mapping may be conceived at application start up and stored for lookup later in cluster leaders, node managers, master nodes or such other distributed system entities that are entrusted with the responsibility to manage a set of machines. 

I hope this post managed to educate you about consistent hashing or otherwise at least, arouse an interest to learn more about hashing systems in general. 

Related sources:






Sunday, 9 October 2016

Enumerate Primes from Elements of programming interviews

I read the solution to this problem about a year back for the first time and while one time was enough to come to terms with the reality of its ingenuity, one time was never enough to appreciate how it really works.

I would have solved it roughly 5 times over a span of a year and today, I note how a subtle idea can come between you thinking you got it right and actually getting it right. When it comes to problem solving, there are questions you cannot solve, you can solve and you think you can solve.

This is likely to be one of those problems you believe you can solve and you also get the output on your console which if not noted carefully and not rigorously tested would pass of as right and which only when seen through the lenses of authors of the great book like EPI would show as invalid for the first time.

Aziz, Lee and Prakash state that once a number has been identified as prime, it suffices to mark as non-prime, all of its multiples starting its square. The number, say i itself is in index, (i - 3)/2 of a lookup list, isPrime. In other words, if index is i, the value at index i would be 2 * i +3. Now, the square of (2 * i + 3) would be  (4 * i * i + 12 * i + 9) and this square would be present in (4 * i * i + 12 * i + 9 - 3) /2= 2 * i * i + 6 * i + 3

Trust me the genius of these 3 authors would preclude them from providing these step-by-step explanations but it took me good time to decode all these identities for myself.

Now, just tell yourself time and again that the lookup list isPrime only indicates if an odd number starting with 3 is prime because even numbers after 2 are never prime.

if(isPrime.get(i) == Boolean.TRUE) {
/* add number at index i to prime numbers result */
int p  = 2 * i +  3;
primes.add(p);

for (long j = (2 * i * i + 6 * i + 3) ; j  < size ; j += p)  //How obvious is this?{
isPrime.set((int) j , Boolean.FALSE);
}
}

If you read my comment on the code snippet, that's exactly the little subtlety which comes between the way of your getting this one and thinking you got this one.

Interestingly enough, this works. What he is implying is that starting with the index of the prime number square, it suffices to mark all indices at steps of the prime number itself and this works.

E.g. p = 3
p^2 = 9

p_i = 0
p^2_i = 3
p^2_i  + 3 = 6 corresponds to 2 * 6 + 3 = 15
p^2_i + 3 + 3 = 9 corresponds to 2 * 9 + 3 = 21

If you see carefully, he is wisely avoiding all even multiples between 9 and 15 and 15 and 21 and so on. If you test this identity for any other prime, you will see the brilliance of this idea.


  

Thursday, 6 October 2016

Python safeguards


  • Never create a resource with open() or with open() inside run method of a worker thread. Create a resource in the main thread and pass the reference to the resource to the worker instead.
  • with open() is better than open() for writers because it closes the writer automatically when the control exits its scope. This flushes all write buffer to the disk.
  • If open() is used for write(), explicitly call f.close() or f.flush() as you would normally do in Java.

Friday, 9 September 2016

Long.parseLong(String val, 16) gives incorrect Hex value

I recently noted a bug in the Java Long.parseLong function that causes the returned hexadecimal value to be incorrect sometimes when run on different machines. I presume this is likely due to inconsistency in implementations between different java versions, however this needs to be verified.

Interestingly enough, I also verified that the alternate Long.toHexString function gives the correct hexadecimal value.

Just a heads up!

Friday, 6 May 2016

Brian Kernighan's technique to count set bits

Brian Kernighan's iterative technique to count set bits in an integer is described in the link below. Just a little help here to see how it works.

Let's start with an example.
Say for x = 9, you want to count number of set bits (9 = (1001) base 2)
Initialize a count variable to 0;

while (x > 0) {
Compute x - 1 = 8. 
Compute x = x & ( x - 1) 
increment count
}

Number of set bits = final value of "count" variable. Now, to see how it works, it is a good idea to rely on a method of back propagation. If you observe carefully, the stopping condition for the algorithm is when x = 0. It is good to understand when x becomes zero. x becomes zero during the iteration in which the value of x is a power of 2 which would also be when only one bit would be set in x.



Wednesday, 4 May 2016

WORKING!!! A decent hack for an unresolvable Gmail issue!

It has been long and I have something interesting to share here.

About 6 months back, one of my Gmail accounts became victimized by a series of spoofing attacks that continues till this day. For starters, spoofing attacks are not absolutely worrisome. Your account does not get compromised here. It happens when an unknown attacker starts spamming people with your email address in the from or sender section due to which you receive these great many email bounces everyday and also get your account blocked from further email compositions on that day. This is because Gmail has an outbound mail threshold of 500 per day after which any further outbound activity from your mailbox is temporarily blocked.

What is interesting about spoofing itself is that there is no known technique to remediate the attack once it is initiated on your account. As for me, I tried everything possible from filing a bug to reporting to Gmail and I finally understood that I would simply just have to let go. But, that was until my account itself got suspended by Gmail at which point I realized I had to either shut down my account or try to workaround it separately.

If you every find yourself in a similar predicament, here is a simple thing you can do to override account suspension. Gmail lets you set a default email address for all your outbound mails and if you can create a new sender account and have it configured to be the default mail address for all your outgoing mails, what you are in essence doing is let this account take over all unwanted outgoing spams that would have otherwise gone from your mailbox. Of course, this would not be a one time job and you would need to repeat this exercise when inevitably your designated sender account would get suspended too at some point. But, at least, you have something to begin with here.

I have been following the changes for about a week now and I am excited to announce that it has been working till this time :) :)

Saturday, 8 August 2015

Compute Binomial Coefficient


Crux  of the solution is recalling an important mathematical identity:
nCr = (n-1)Cr + (n-1)C(r-1)

//algo
To find nCk; define array a of size n+1;

a[0]=1;
for (int i = 1; i <= n ; i++) {
 for (int j = min(i,k); j>= 1; j--) {
    a[j] = a[j] + a[j-1];
  }
 }


return a[k];