Tuesday, September 28, 2010

Dynamic mixins in Clojure: an experiment.

My web-friend Debasish Ghosh recently come up with an excellent post about modeling the same domain with Scala and Clojure, focusing in particular on how to implement dynamic behavior through Scala mixins and Clojure combinators.

Now, while the post is excellent as always, and Clojure combinators are pretty cool, I think a domain model is better represented with Clojure records and protocols, so let me show you a little experiment about how to implement dynamic mixins in Clojure.
This is my first Clojure-related blog post, and I'm no way a Clojure expert, so take it with care and feel free to hand me any kind of feedback ;)

First, for those unfamiliar with records and protocols, there are a few good articles around: to sum up in a single sentence, protocols are a way to describe a contract made up of functions you have to implement in order to adhere to the protocol itself, while records (and types) define data and implement protocols.
So, let's define a Person record and a simple Outputter protocol for outputting things:

(defrecord Person [name])

(defprotocol Outputter
(output [self logic])
)

The record defines only the person name, and the protocol defines an output function whose actual execution logic is plugged from the outside through the logic argument.

The most common way to implement a protocol is to extend a record type and implement the proper behavior:

(extend-type Person
Outputter
(output [self logic] (println (:name self)))
)

Glancing through the code, we're extending the Person record and implementing the Outputter by printing out the person name.
But there's a problem with that: the behavior is static. More specifically:

  • You can't dynamically remove protocol implementations.

  • Added protocol implementations are shared by all record instances, so you can't compose protocol implementations with specific record instances.


Let's try to come up with a solution.

First, let's define the function that is at the core of our experiment: the mix function.

(defn mix [source mixins]
(let [m (merge source mixins)] #(get m %1))
)

Very short and concise, isn't it? We'll come back to it in a moment.

Then, let's define a simple record instance and two protocol implementations (without extending any record this time): we'll call them, guess what, traits.

(def joe (Person. "joe"))

(deftype OutputterTrait [target]
Outputter
(output [self logic] (logic target))
)

(deftype DummyTrait []
Outputter
(output [self logic] (println "ignored external logic"))
)

We defined a simple record with no protocols implemented, and two separated protocol implementations.

Now, let's mix them up!

(def mixed (mix joe {:outputter (OutputterTrait. joe) :dummy (DummyTrait)}))

What's up?
We just used our previous mix function to dynamically associate joe with two different protocol instances, each one identified by a particular keyword.
Now, we can output our mixed Joe calling both protocol implementations:

(output (mixed :outputter) #(println (:name %1)))
(output (mixed :dummy) #(println "this will be ignored"))

The trick is the mix function that dynamically associates the record instance with protocol implementations identified by keyword, later used to recall the protocol back.

We've got dynamic mixins so, but at the cost of depending on an "external" keyword we have to be careful about: passing the wrong keyword may in fact lead to the wrong protocol implementation.

So, always prefer the classical, built-in way to use protocols and records ... but feel free to experiment with dynamic mixins when needing dynamic behavior!

Any feedback will be highly appreciated ... enjoy and see you next time!

Saturday, December 19, 2009

Terrastore and the Cap Theorem

This is an edited version of the original "Terrastore and the Cap Theorem" article, updated to reflect latest Terrastore developments.

Terrastore is a new born document store based on the wonderful Terracotta technology, focused on providing a feature-rich, scalable, yet consistent, data store.

Data stores today are often classified depending on how they deal with availability, partition-tolerance and consistency, and how they resolve the conflict between them.
Such a conflict is described as the CAP theorem, stating that you can only pick two of them and relax the third: the matter isn't trivial, but it's not the focus of this post, so I will not go any further with it and point you at some well written posts such as this one from Julian Browne, and this one from Jeff Darcy.

With the CAP theorem in mind, a few people wisely asked: how does Terrastore relate to it?

Let's discuss!

Terrastore architecture in a nutshell

Terrastore is a distributed/partitioned store with a master-based topology powered by Terracotta.
It can be configured to work with a single cluster, made up of one active master, one or more optional passive masters, and one or more servers; or it can be configured to work with a clusters ensemble: a set of more clusters working together as a single whole.
All Terrastore documents are automatically partitioned between clusters (if more than one is provided as an ensemble) and intra-cluster server nodes, so that every node only holds in its own main memory a subset of all documents.
All Terrastore server nodes are equal peers, talking to each other only for requesting documents actually owned by some other node.
Data replication is managed by the central (Terracotta) master: each active master manages data replication inside its own cluster; so when working as an ensemble, data never cross a cluster boundary and each active master scales independently from others.

Consistency (C)

Terrastore is a consistent store.
What does it mean? Does it support serializable transactions? Is it fully ACID?

Well, Terrastore is somewhat in the middle between ACID and BASE.
ACID means Atomic, Consistent, Isolated and Durable: that is, the flagship characteristics of your old relational database.
BASE means Basically Available, Soft state, Eventually consistent: that is, the flagship characteristics of many distributed non-relational databases currently on the market, which do not (normally) give any time guarantee in regards to data consistency among all distributed nodes (see my previous post about the topic).

Terrastore provides per-document consistency, meaning that all single updates to a single document will be atomic, isolated and durable: you will always see the latest version of a document, but no support is provided for transactions between different documents or between consecutive operations on the same document.

Avoiding cross-operation and cross-document transactions makes Terrastore more scalable than relational databases, while at the same time as safe as them to reason about updates order.
The complexity of a consistent write operation is very low, because no synchronous replication is involved between nodes: it has a best-case complexity of O(1), requiring only a semi-asynchronous replication toward the master, and a worst case of O(2) if the written document should be owned by another node (and hence internally routed there).

Availability (A) and Partition Tolerance (P)

I'll talk about availability and partition tolerance together because you can't really consider the former without taking into account the latter.

Terrastore availability depends on whether we're talking about clusters, masters or servers.

In case of a Terrastore ensemble made up of several clusters, if one or more clusters are completely unreachable all data hosted inside them will be unavailable, but other clusters with related data will remain available.
This means Terrastore can tolerate partitions between clusters: a cluster will continue serving its and other available clusters data, while considering as unavailable data belonging to unreachable clusters.

When moving our focus inside a single cluster, Terrastore will be fully available as far as there's one reachable master and one reachable server.
You can dynamically run and shutdown how many server nodes you want, and they can fail at any rate.
But, if connection between server(s) and master(s) is lost, the cluster will suddenly be unavailable: this means that Terrastore can't tolerate partitions between servers and masters inside a single cluster.

Final words: is Terrastore superior to other solutions?

The answer is obviously: no, it isn't.
Whatever other product-owners say, the truth is just that: there's no silver bullet, nor universal solution.
Just go with a non-distributed non-relational solution if you need super-fast access to your data.
Take a look at a "base"-oriented solution if you have very large amount of data, probably distributed on several data centers, and need both partition-tolerance and full data availability.
Or go with Terrastore if you need a distributed store with consistency features.

Evaluate.
Pick your choice.
And be happy.

Wednesday, November 04, 2009

Eventual Consistency by Example

Recently, there has been a lot of chitchat about the eventual consistency model as illustrated in the famous Amazon Dynamo paper, and today employed by several non-relational databases such as Voldemort or Cassandra.
Everything starts with this blog post by the Facebook Infrastructure Lead, claiming: "Dynamo: A flawed architecture", where he makes a few points against the eventual consistency model and the related "sloppy" quorum approach.

However, his points seems to be based on a few misconceptions which the Amazon paper doesn't help to clarify, so let's try to spread some light by first giving a few definitions, and then a simple example.

Definitions.

We may sum up the eventual consistency model in the following statement:

Given a total number on T nodes, we choose a subset of N nodes for holding key/value replicas, arrange them in a preference list calling the top node "coordinator", and pick the minimum number of writes (W) and reads (R) that must be executed by the coordinator on nodes belonging to the preference list (including itself) in order to define the write and read as "successful".

Here are the key concepts, extracted from the statement above:
  • N is the number of nodes defining the number of replicas for a given key/value.
  • Those nodes are arranged in a preference list.
  • The node at the top of the preference list is called coordinator.
  • W is the minimum number of nodes where the write must be successfully replicated.
  • R is the minimum number of nodes where the read must be successfully executed.
Writes are executed by the coordinator on the first W available nodes on the preference list.
Reads are executed by the coordinator on the first R available nodes on the preference list.
This is eventually consistent because consistency itself depends on W and R: you may successfully write and read from a number of nodes less than the total number of replicas (N), with conflicts resolved at read time and values asynchronously propagated by hinted-handoff or alike (which could both be a topic for another post).
The same way, this is a sloppy quorum because writes and reads may succeed even if not all replica nodes (N) are up and running, depending again on W and R.

More specifically, values for N, W and R can be tuned in order to:
  • Achieve high write availability by setting: W < N
  • Achieve high read availability by setting: R < N
  • Achieve full consistency by setting: W + R > N
This seems to be the most difficult part to grasp, so ... let's proceed with our example!

An example.

Let me show you the simplest possible example: a cluster of two nodes.

We have:
  • Node1 : the coordinator.
  • Node2 : guess what, the second cluster node.
  • N = 2 : our replication factor.
Let's examine all possible eventual consistency scenarios.

W = 1, R = 1 -> W < N, R < N

You get write/read high availability: you can write/read to/from whatever available node, but you may get inconsistent reads. For example, if you write to node1 and then read from node2 (maybe because node1 has suddenly failed), you'll get an inconsistent read.
With two nodes, this is the only eventually consistent scenario.
Let's see why.

W = 1, R = 2 -> W + R > N

This is a fully consistent scenario.
Here, you can write to whatever node, but you can only read when both nodes are online, more specifically:
  • If you write to node1 and then read from node2 without reading from node1 because suddenly death, the read will fail.
  • Same if writing and reading from the same node: if you write to node1 and then read from node1 without reading from node2 because suddenly death, the read will fail.
  • Finally, the two most misunderstood scenarios:
    • If you write to node2 because node1 is offline, then node1 gets online and you read the same value from it, your read will succeed because you have both nodes online and you will not get any inconsistent value! That's because node2 will have the most up-to-date version which will be "acquired" by node1 and returned to you!
    • If you write to node1, then it goes offline and you write again on node2, you will never lose the old value! That's because you will be able to read only after node1 coming online again: at that time, a read from node1 and node2 will return both values (because divergent), leaving the reconciliation process to the client.
W = 2, R = 1 -> W + R > N
W = 2, R = 2 -> W + R > N

Even easier: consistent reads are guaranteed by the W value equal to the replication factor.

Finally ...

Hope this write-up helped you clarifying a few misconceptions about eventual consistency.
Feedback is obviously highly appreciated.

See you!

Wednesday, April 08, 2009

Terracotta, Domain Driven Design and Anti-Patterns

Yesterday I was going to participate at a technical meeting, when the following statement suddenly appeared on my Twitter timeline:

One interesting reflection is that the "super static" property of Terracotta roots helps us with Domain Driven Design.
Finally we have (what I feel a nice) way of using Repositories in our Entities without Aspects or Register style coding.


My immediate reaction was to reply by saying:

Strongly disagree. That means your business/domain code depends on Terracotta semantics.


I thought my comment were pretty intuitive: if Terracotta is a transparent clustering solution (and it actually is), how can Terracotta help you writing your domain code without polluting the code itself?

Too bad, I was wrong: it generated a lot of discussions with a few of my favorite tech-friends, and what's worse, I wasn't able to further clarify my statement!

Let me blame Twitter 140-characters constraint, please ... I don't want to hurt my own self-respect ;)

So here is a simple-stupid sample, which I hope will clarify the following simple concept: if your code relies on Terracotta super-static variables to access repositories (as apparently stated by the first quote above), then your code is IMO wrong.

Let's go.

We have an Amazon-style application, dealing with a lot of users and books: so we choose to use Terracotta for transparently clustering our application and make it scale.
Taking a deeper look at our domain, and using the Domain Drive Design terminology, we have two aggregate roots: User and Book.


public class User {

private final String id;

// ...
}

public class Book {

private final String isbn;

// ...
}


They're two separated roots because completely independent and with different lyfecycles.
So we have two different repositories, too:


public class UserRepository {

public User findById(String id) {

//...
}

// ...
}

public class BookRepository {

public User findByIsbn(String isbn) { //... }

// ...
}


Now, we want to add the following responsibility to our User: tell us all books he viewed.


public class User {

private final String id;

// ...

public Collection getViewedBooks() { //... }
}


We don't want to establish a permanent relation between User and Book, i.e., make the collection of viewed books an instance variable of user: it would clutter the User entity, and associate it a lot of additional data which is related to the user, but in no way part of the user itself.
So we choose to access Book entities through the BookRepository, meaning that User entities must have access to the repository instance.

The repository is implemented as a collection of objects persisted and distributed by Terracotta, so the following idea comes to our mind: we may configure the repository as a Terracotta root, and access its instance everywhere in our cluster of User objects thanks to its super-static nature!

For those unfamiliar with Terracotta, a Terracotta root, also known as super-static variable, is an object instance shared among all cluster nodes: it is actually created only the first time the new operator is called. After that, that instance will be always the same among all nodes, and every other new invocation will have no effect.

It means that we have a number of super-easy ways to access our super-static repository from inside our User entity, and here is one:


public class User {

private static final BookRepository bookRepository =
new BookRepository();

private final String id;

// ...

public Collection getViewedBooks() {
for (String isbn : viewed) {
Book book = bookRepository.findByIsbn(isbn);
}
// ...
}
}


Thanks to Terracotta, the BookRepository will be actually created for the first time, and then always reused, even if we shutdown our application and restart: the bookRepository persisted/distributed state will be always there.

Well, this is IMHO an anti-pattern.

While it is true that Terracotta makes easy to access repositories from entities, it is IMO wrong because coupled to Terracotta itself: if you swap out Terracotta, that static variable will be always reset at every shutdown/restart, as you would expect in any Java application, and your data will be gone!

I hope it's clearer now.
And if someone disagrees, or just thinks I misled the original quote, I'd be more than happy to hear his thoughts.

Finally, a disclaimer.
I'm an happy Terracotta user, and there are many ways to correctly use Terracotta in your application: putting it simple, just hide the way it works behind the repository implementation, and access the repository itself as it were a normal Java object, not a super-static one.

And that's all.

Tuesday, February 03, 2009

Actor concurrency model in a nutshell

While the necessity of writing software applications capable of exploiting the multi-processor architecture of today computers is more and more common, concurrent programming is often perceived as an hard task.
No wonder, so, if many languages come to our rescue by supporting concurrent programming through first-class syntax support, or through higher level user libraries.
Two well-known languages providing explicit concurrent programming support are Erlang and Scala, and both have in common the same concurrency model: actors.

The actor model is a concurrency abstraction based on the concepts of message-passing concurrency: very different from the shared state concurrency model we're used to in general purpose languages such as Java, but more efficient and easier to program with.

Let's see the difference between the two.

Shared-state concurrency is based on two fundamental concepts: resource sharing and resource synchronization.
As already said, it's the most common scenario with general purpose OO languages such as Java: it's composed by computational units (often implemented as threads) concurrently executing code sections containing resources that must be shared, and hence, synchronized in order to guarantee correct ordering, visibility and data consistency.

Message-passing concurrency, also known as share-nothing concurrency, is the exact opposite: here, computational units are just endpoints exchanging immutable messages one another, and reacting to received messages by executing a given computation.
In such a concurrency model, so, there isn't any shared resource, nor there is any need for resource synchronization: messages are immutable, and each computational unit is only able to change its own state in response to a received message.
It has several interesting consequences, making message-passing concurrency preferable over shared-state one:

  • Message-passing concurrency is safer: there are no shared resources, so there is no possibility to corrupt data due to concurrent access.

  • Message-passing concurrency is faster: there is no resource synchronization, so there are no bottlenecks, deadlocks, livelocks, or similar locking issues.

  • Message-passing concurrency is easier: once you get used to the new paradigm, not to have to think at how to share and synchronize resources, is a big relief.


The actor concurrency model is a form of message-passing concurrency, based on:

  • Actors: the computational units capable of sending messages and reacting to received ones by executing a given function.

  • Messages: the form of communication used by actors in order to exchange data and carry on some kind of computation based on that data.

  • Mailboxes (or channels): a kind of buffer every actor has for storing received messages which haven't been processed yet.


Every actor can communicate with other actors by obtaining their mailbox (or channel) "address", and then sending a message to it: this is the only way an actor has for changing the state of the system.
Every actor can receive messages and process them by executing a behavior function: such a function can only change the state of the actor itself, or send new messages to other actors (already existent or created on-the-fly).
Communication between actors is completely asynchronous and decoupled: that is, actors do not block waiting for responses to their messages; they just send messages and forget, reacting to incoming messages without any correlation to previously sent messages.
A concurrent system implemented trough the actor concurrency model is considered to be:

  • Parallel: several actors can process several messages in parallel, each one independently from the other.

  • Scalable: actors can be implemented in several ways, as local computational units or distributed ones, so
    they can easily scale out to the number of available processors or computers.

  • Reconfigurable: actors can be dynamically added and removed from the system, and then communicate the new topology through special purpose messages.


Obtaining such a properties through a shared-state implementation is harder and requires a lot of challenges.
The actor model provides instead a well-defined, clearly stated way to easily build highly concurrent systems, at a cost of a paradigm shift.

Next time we will see how to implement an actor-based concurrent application through our favorite OO language: Java!