Thursday, 28 August 2014

Distributed Crawling

Around 3 months ago, I have posted one article explaining our approach and consideration to build Cloud Application. From this article, I will gradually share our practical design to solve this challenge.

As mentioned before, our final goal is to build a Saas big data analysis application, which will deployed in AWS servers. In order to fulfill this goal, we need to build distributed crawling, indexing and distributed training systems.

The focus of this article is how to build the distributed crawling system. The fancy name for this system will be Black Widow.


As usual, let start with the business requirement for the system. Our goal is to build a scalable crawling system that can be deployed on the cloud. The system should be able to function in an unreliable, high-latency network and can recover automatically from a partial hardware or network failure.

For the first release, the system can crawl from 3 kind of sources, Datasift, Twitter API and Rss feeds. The data crawled back are called Comment. The Rss crawlers suppose to read public sources like website or blog. It is free of charge. DataSift and Twitter both provide proprietary APIs to access their streaming service. Datasift charges its users by comment count and the complexity of CSLD (Curated Stream Definition Language, their own query language). Twitter, in the other hand, offers free Twitter Sampler streaming.

In order to do cost control, we need to implement mechanism to limit the amount of comments crawled from commercial source like Datasift. As Datasift provided Twitter comment, it is possible to have single comment coming from different sources. At the moment, we did not try to eliminate and accept it as data duplication. However, this problem can be eliminated manually by user configuration (avoid choosing both Twitter and Datasift Twitter together).

For future extension, the system should be able to link up related comments to from a conversation.

Food for Thought

Centralized Architecture

Our first thought when getting requirement is to build the crawling on the nodes, which we called Spawn and let the hub, which we called Black Widow to manage the collaboration of effort among nodes. This idea was quickly accepted by team members as it allows the system to scale well with the hub doing limited work.

As any other centralized system, Black Widow suffers from single point of failure problem. To help easing this problem, we allow the node to function independently for a short period after losing connection to Black Widow. This will give the support team a breathing room to bring up backup server.

Another bottle neck in the system is data storage. For the volume of data being crawled (easily reach few thousands records per seconds), NoSQL is clearly the choice for storing the crawled comments. We have experiences working with Lucene and MongoDB. However, after research and some minor experiments, we choose Cassandra as the NoSQL database.

With that few thoughts, we visualize the distributed crawling system to be build following this prototype:

In the diagram above, Black Widow, or the hub is the only server that has access to the SQL database system. This is where we store the configuration for crawling. Therefore, all the Spawns, or crawling nodes are fully stateless. It simply wakes up, registers itself to Black Widow and does the assigned jobs. After getting the comments, the Spawn stores it to Cassandra cluster and also push it to some queues for further processing.

Brainstorming of possible issues

To explain the design to non-technical people, we like to relate the business requirement to a similar problem in real life so that it can be easier to understand. The similar problem we choose would be collaborating of efforts among volunteers.

Imagine if we need to do a lot of preparation work for the upcoming Olympic and decide to recruit volunteers all around the world to help. We do not know volunteers but the volunteers know our email, so they can contact us to register. Only then, we know their emails and may send tasks to them through email. We would not want to send one task to two volunteers or left some tasks unattended. We want to distribute the tasks evenly so that no volunteers are suffering too much.

Due to cost issue, we would not contact them through mobile phone. However, because email is less reliable, when sending out tasks to volunteers, we would request a confirmation. The task is consider assigned only when the volunteer replied with confirmation.

With above example, the volunteers represent Spawn nodes while email communication represent unreliable and high latency network. Here are some problems that we need to solve:

1/ Node failure

For this problems, the best way is to check regularly. If a volunteer stop responding to the regular progress check email, the task should be re-assign to someone else.

2/ Optimization of tasks assigning

Some tasks are related. Therefore assigning related tasks to the same person can help to reduce total effort. This happen with our crawling as well because some crawling configurations have similar search terms, grouping  them together to share the streaming channel will help to reduce final bill.

Another concern is the fairness or ability to distribute the amount of works evenly among volunteers. The simplest strategy we can think of is Round Robin but with a minor tweak by remembering earlier assignments. Therefore, if a task is pretty similar to the tasks we assigned before, the task can be skipped from Round Robin selection and directly assign to the same volunteer.

3/ The hub is not working

If due to some reasons, our email server is down and we cannot contact volunteer any more, it is better to let the volunteers stop working on the assigning tasks. The main concern here is over-running of cost or wasted efforts. However, stopping working immediately is too hasty as temporary infrastructure issue may cause the communication problem.

Hence, we need to find a reasonable amount of time for the node to continue functioning after being detached from the hub.

4/ Cost control

Due to business requirement, there are two kinds of cost control that we need to implement. First is the total of comments being crawled per crawler and second is the total of comments crawled by all crawlers belong to the same user.

This is where we have a debate about the best approach to implement cost control. It is very straight forward to implement the limit for each crawler. We can simply pass this limit to the Spawn node and it will automatically stop the crawler when the limit is reached.

However, for the limit per user, it is not so straight forward and we have two possible approaches. For the simpler choice, we can send all the crawlers of one user to the same node. Then, similar to the earlier problem, the Spawn node knows  the amount of comments collected and stops all crawlers when limit reached. This approach is simple but it limits the ability to distribute jobs evenly among nodes. The alternative approach is to let all the nodes retrieve and update a global counter. This approach creates huge network traffic internally and add considerable delay to comment processing time.

At this point, we temporarily choose the global counter approach. This can be considered again if the performance become a huge concern.

5/ Deploy on the cloud

As any other Cloud application, we can not put too much trust in the network or infrastructure. Here is how we make our application conform to the check-list mentioned in last article:
  • Stateless: Our spawn node is stateless but the hub is not. Therefore, in our design, the nodes do actual work and the hub only collaborates efforts.
  • Idempotence: We implement hashCode and equal methods for every crawler configuration. We store the crawler configurations in the Map or Set. Therefore, the crawler configuration can be sent multiple times without any other side effect. Moreover, our node selection approach ensure that the job will be sent to the same node.
  • Data Access Object: We apply the JsonIgnore filter on every model objects to make sure no confidential data flying around in the network.
  • Play Safe: We implement health-check API for each node and the hub itself. The first level of support will get notified immediately when anything wrong happened.
6/ Recovery

We try our best to make the system heal itself from partial failure. There are some type of failure that we can recover from:
  • Hub failure: Node register itself to the hub when it start up. From then, it is the one way communication when only the hub send jobs to node and also poll for status update. The node is consider detached if it failed to get any contact from Hub for a pre-defined period. If a node is detached, it will clear all the job configurations and start registering itself to the hub again. If the incident is caused by hub failure, a new hub will fetch crawling configurations from database and start distributing jobs again. All the existing jobs on Spawn nodes will be cleared when the Spawn node go to detached mode.
  • Node failure: When hub fail to poll a node, it will do a hard reset by removing all working jobs and re-distribute from beginning again to the working nodes. This re-distribution process help to ensure optimized distribution.
  • Job failure: There are two kind of failures happened when the hub do sending and polling jobs. If a job is failed in the polling process but the Spawn node is still working well, Black Widow can re-assign the job to the same node again. The same thing can be done if the job sending failed. 


Data Source and Subscriber

In the initial thought, each crawler can open it own channel to retrieve data but this does not make sense any more when inspecting further. For Rss, we can scan all URLs once and find out the keywords that may belong to multiple crawlers. For Twitter, it supports up to 200 search terms for one single query. Therefore, it is possible for us to open single channel that serve multiple crawlers. For Datasift, it is quite rare, but due to human mistake or luck, it is possible to have crawlers with identical search terms.

This situation lead us to split out crawler to two entities: subscriber and data source. Subscriber is in charge of consuming the comments while data source is in charge of crawling the comments. With this design, if there are two crawlers with similar keywords, a single data source will be created to serve two subscribers, each processing the comments their own ways.

Data source will be created when and only when no similar data source exist. It starts working when having the first subscriber subscribe to it and retire when the last subscriber unsubscribe from it. With the help of Black Widow to send similar subscribers to the same node, we can minimize the amount of data sources created and indirectly, minimize the crawling cost.

Data Structure

The biggest concern of data structure is Thread Safe issue. In the Spawn node, we must store all running subscribers and data sources in memory. There are a few scenarios that we need to modify or access these data:

  • When a subscriber hit the limit, it automatically unsubscribe from data source, which may lead to deactivation of data source.
  • When Black Widow send a new subscriber to Spawn nodes. 
  • When Black Widow send a request to unsubscribe an existing subscriber. 
  • Health check API expose all running subscribers and data sources. 
  • Black Widow regularly polls the status of each assigned subscriber.
  • The Spawn node regularly checks and disables orphan subscribers (subscriber which is not polled by Black Widow).
Another concern of data structure is idempotence of operations. Any of operation above can be missing or being duplicated. To handle this problem, here is our approach
  • Implement hashCode and equals method for every subscriber and data source. 
  • We choose the Set or Map to store collection of subscribers and data sources. For records with identical hash code, Map will replace the record when there is new insertion but Set will skip the new record. Therefore, if we use Set, we need to ensure new records can replace old record. 
  • We use synchronized in data access code.
  • If Spawn node receive a new subscriber that similar to existing subscriber, it will compare and prefer to update existing subscriber instead of replacing. This avoid the process of unsubscribing and subscribing identical subscribers, which may interrupt data source streaming.

As mentioned before, we need to find a routing mechanism that serve two purposes:
  • Distribute the jobs evenly among Spawn nodes.
  • Route similar jobs to the same nodes.
We solved this problem by generating an unique representation of each query  named uuid. After that, we can use a simple modular function to find out the note to route:

int size = activeBwsNodes.size();
int hashCode = uuid.hashCode();
int index = hashCode % size;
assignedNode = activeBwsNodes.get(index);

With this implementation, subscribers with similar uuid will always be sent to the same node and each node has equals chance of being selected to serve a subscriber. 

This whole practice can be screwed up when there is change to the collection of active Spawn nodes. Therefore, Black Widow must clear up all running jobs and reassign from beginning whenever there is a node change. However, node change should be quite rare in production environment.


Below is the sequence diagram of Black Widow and Node collaboration

Black Widow does not know Spawn node. It wait for the Spawn node to register itself to the Black Widow. From there, Black Widow has the responsibility to poll the node to maintain connectivity. If Black Widow fail to poll a node, it will remove the node from the its container. The orphan node will eventually go to detached mode because it is not being polled any more. In this mode, Spawn node will clear existing jobs and try to register itself again.

The next diagram is the subscriber life-cycle

Similar to above, Black Widow has the responsibility of polling the subscribers it send to Spawn node. If a subscriber is not being polled by Black Widow anymore, Spawn node will treat the subscriber as orphan and remove it. This practice help to eliminate the threat of Spawn node running obsoleted subscriber.

On Black Widow, when a subscriber polling fails, it will try to get a new node to assign the job. If the Spawn node of the subscriber still available, it is likely that the same job will go to the same node again due to our routing mechanism we used.


In a happy scenario, all the subscribers are running, Black Widow is polling and nothing else happen. However, this is not likely to happen in real life. There will be changes in Black Widow and Spawn nodes from time to time, triggered by various events.

For Black Widow, there will be changes under following circumstances:

  • Subscriber hit limit
  • Found new subscriber
  • Existing subscriber disabled by user
  • Polling of subscriber fails
  • Polling of Spawn node fails
To handle changes, Black Widow monitoring tool offers two services: hard reload and soft reload. Hard Reload happen on node change while Soft Reload happen on subscriber change. Hard Reload process takes back all running jobs, redistribute from beginning over available nodes. Soft Reload process removes obsoleted jobs, assigns new jobs and re-assigns failed jobs.

Compare to Black Widow, the monitoring of Spawn node is simpler. The two main concerns are maintaining connectivity to Black Widow and removing orphan subscribers.

Deployment Strategy

The deployment strategy is straight forward. We need to bring up Black Widow and at least one Spawn node. The Spawn node should know the URL of Black Widow. From then, the Health Check API will give use the amount of subscribers per node. We can integrate Health Check with AWS API to automatically bring up new Spawn node if existing nodes are overloaded. The Spawn node image will need to have Spawn application running as service. Similarly, when the nodes are not utilized, we can bring down redundant Spawn nodes.

Black Widow need special treatment due to its importance. If Black Widow fails, we can restart the application. This will cause all existing jobs on Spawn nodes to become orphan and all the Spawn nodes go to detached mode. Slowly, all the nodes will clean up itself and try to register again. Under default configuration, the whole restarting process will happen within 15 minutes.

Threats and possible improvement

When choosing centralized architecture, we know that Black Widow is the biggest risk to the system. While Spawn node failure only causes a minor interruption in the affected subscribers, Black Widow failure finally lead to Spawn nodes restart, which will take much longer time to recover. 

Moreover, even the system can recover from partial, there still be interruption of service in recovery process. Therefore, if the polling requests failed too often due to unstable infrastructure, the operation will be greatly hampered. 

Scalability is another concern for centralized architecture. We have not had a concrete amount of maximum Spawn nodes that the Black Widow can handle. Theoretically, this should be very high because Black Widow only do minor processing, most of its effort are on sending out HTTP requests. It is possible that network is the main limit factor for this architecture. Because of this, we let the Black Widow polling the nodes rather than the nodes polling Black Widow (other people do this, like Hadoop). With this approach, Black Widow may work at its own pace, not under pressure of Spawn nodes.

One of the first question we got is whether it is a Map Reduce problem and the answer is No. Each subscriber in our Distributed Crawling System processes its own comments and does not reporting result back to Black Widow. That why we do not use any Map Reduce product like Hadoop. Our monitor is business logic aware rather than purely infrastructure monitoring, that why we choose to build ourselves over using monitoring tools like Zoo Keeper or AKKA

For future improvement, it is better to walk away from Centralized Architecture by having multiple hubs collaborating with each other. This should not be too difficult provided that the only time Black Widow accessing database is loading subscriber. Therefore, we can slice the data and let each Black Widow load a portion of it. 

Another point that make me feel pretty unsatisfied is the checking of global counter for user limit. As the check happened on every comment crawled, this greatly increases internal network traffic and limit the scalability of system. The better strategy should be divide of quota based on processing speed. Black Widow can regulate and redistribute quota for each subscriber (on different nodes).

Wednesday, 20 August 2014

The Emergence of DevOps and the Fall of the Old Order

Software Engineering has always been dependent on IT operations to take care of the deployment of software to a production environment. In the various roles that I have been in, the role of IT operations has come in various monikers from “Data Center” to “Web Services”. An organisation delivering software used to be able to separate these roles cleanly. Software Engineering and IT Operations were able to work in a somewhat isolated manner, with neither having the need to really know the knowledge that the other hold in their respective domains. Software Engineering would communicate with IT operations through “Deployment Requests”. This is usually done after ensuring that adequate tests have been conducted on their software.
However, the traditional way of organising departments in a software delivery organisation is starting to seem obsolete. The reason is that software infrastructure have moved towards the direction of being “agile”. The same buzzword that had gripped the software development world has started to exert its effect on IT infrastructure. The evidence of this seismic shift is seen in the fastest growing (and disruptive) companies today. Companies like Netflix, Whatsapp and many tech companies have gone into what we would call “cloud” infrastructure that is dominated by Amazon Web Services.
There is huge progress in the virtualization technologies of hardware resources. This have in turn allowed companies like AWS and Rackspace to convert their server farms into discrete units of computing resources that can be diced and parcelled and redistributed as a service to their customers in an efficient manner. It is inevitable that all this configurable “hardware” resources will eventually be some form of “software” resource that can be maximally utilized by businesses. This has in turn bred a whole new genre of skillset that is required to manage, control and deploy these Infrastructure As A Service (IaaS). Some of the tools used by these services include provisioning tools like Chef or Puppet. Together with the software apis provided by the IaaS vendors, infrastructure can be brought up or down as required.
The availability of large quantities of computing resources without all the upfront costs associated with capital expenditures on hardware have led to an explosion in the number of startups trying to solve problems of all kinds imaginable and coupled with the prevalence of powerful mobile devices have led to a digital renaissance for many industries. However, this renaissance has also led to the demand for a different kind of software organisation. As someone who has been part of software engineering and development, I am witness to the rapid evolution of profession.
The increasing scale of data and processing needs requires a complete shift in paradigm from the old software delivery organisation to a new one that melds software engineering and IT operations together. This is where the role of a “DevOps” come into the picture. Recruiting DevOps in an organisation and restructuring the IT operations around such roles enable businesses to be Agile. Some businesses whose survival depends on the availability of their software on the Internet will find it imperative to model their software delivery organisation around DevOps. Having the ability to capitalise on software automation to deploy infrastructure within minutes allows a business to scale up quickly. Being able to practise continuous delivery of software allow features to get into the market quickly and allows a feedback loop in which a business can improve itself.
We are witness to a new world order and software delivery organisations that cannot successfully transition to this Brave New World will find themselves falling behind quickly especially when a competitor is able to scale and deliver software faster, reliably and with less personnel.

Sunday, 3 August 2014

Information is money

When people ask me what am I doing, my immediate response is IT. Even though, the answer is not very specific, it is the easiest to understand and it still helps to describe what we are doing. In fact, it doesn't matter what programming languages we use, our responsibility is to build the information system, which deliver information to end-user. Therefore, we should value information more than anyone else. However, in reality, I feel there are so much wasted information in modern information system.

In this article, I would like to discuss the opportunity to collect user behaviour and measure user happiness when building information system. I also want to share my idea on how to improve user experience based on data collected.

How important is user's behaviour information

Let begin with a story that happened in my earlier career. We need to implement an online betting system for customer, which function similarly to a stock market. In this system, there is no traditional bookmakers like William Hill. Instead, each user can people offer and accept the bets from another. Because it is a mass market with big pool of users, the rate offered is quite accurate and the commission is pretty small. However, the betting system is not our focus today. What capture my intention most is not the  technical aspect of the project, even though it is quite challenging. In stead, I feel interested with the way the system silently but legally make huge amount of profit based on the information it collected.

The system captured the bet history of every user, through that, identify top winners and top losers of each month. Based on that information, the system automatically place the bet follow the winners and against the losers. Can you imagine that you are the only person in the world who know Warren Buffett's activities in real time? Then, it should be quite simple to simulate his performance, even without his knowledge? Needless to say, this hidden feature generated profit at level of hundred thousands dollars every single day.

In the open market, information is everything and we see why the law punish insider trading or any other attempt to gain advantage of information so strictly like that. However, there is no such kind of law for online gambling activity yet and this practice is still legal. That early experience gave me a deep impression on how important is information.

Later, I have interest in applying psychology when dealing with customer. In order to persuade one person or making sale happened, one guy need to observe and understand his client. Relate what I have learnt to the information system that I built before, I feel that it is not so nice to implement a system just only serve as information provider or selling tool. Actually, we do have chance to do much better if we really want.

Website authors know the importance of user experience and they did try their best to collect user information using online survey. However, personally, I feel this approach will never work. I have never answer any survey myself. Any time I saw a popup, it doesn't matter how polite is the words or how beautiful is the design, I will just click on close button.

We should not forget that no matter how important is the user feedback, it is not the user's benefit to answer our survey. In fact, no sale person approach client to ask them to do customer experience survey, unless there is incentive to do it.

Hence, the information is still need to be collected, but in a way that user does not notice it (remember how Google silently monitor anyone using their services?)

How should we use the information?

We should not waste effort collecting information if we even don't know what to do with it. However, this is not something new. Whenever I go to a professional selling site like Amazon, I find it is quite cool that they have managed to use every single piece of information they have to push sale. One time, I went there searching for helmet, next time I saw all the items for a rider like me. They remembers every single item that users have viewed or bought and regularly offer new things based on the data they collected.

Google and Facebook also do similar things. They will try to guess what you like or care about before delivering any ads to you. The million dollars question is can we do any better than this?

I vote yes. It does not means that I do not appreciate the talent and the profession of the product teams in Amazon, Google for Facebook. However, I feel that there is still a distance between these products and an experience sale person. Let imagine there is a real person that sharing desktop view with customer, seeing every mouse-click, movement and keys entered. Given this guy can pause user for a while, so that he can think, analyse and decide what user will see next, what will he do?

It is apparently that the information we collect from user screen cannot compare to the information from a face to face communication, but we have not used up this information yet. Most of the system automatically make the guess that any product that customer clicked on is what he like. A person can do better than that. If an user open the phone in 3 seconds and immediately move to other phones, he may accidentally click on the phone rather than by intention. Moreover, if he spend more time on a phone, keep coming back to it and even open the specification, we can be very sure that this is what he is looking for.

How should we collect the information?

As mentioned above, it will never work if we interrupt users to ask questions. The right mechanism for collecting information must be observation. For all the available solution in the market, I think what is missing in the ability to measure time stamp of events and connecting individual events to form an user journey. Without connecting the dot, there will be no line, without connecting events, there will be no user journey. Without the time stamp, it will be very hard to measure user satisfaction and concern.

Capturing user actions is not very challenging provided that we are the owner of website. Google Analytic can help to capture user actions but it is a bit hard to use in our case because of the limited information that it carry (HTTP GET request). We should understand that this is the only choice that Google Analytic team have because any other kind of requests will be blocked by cross-site scripting prevention.

The better way to carry this information is through HTTP POST request, which can carry the full event object, serialized in JSON format. This is perfectly eligible as the events is sent back to the same domain. To link up the events together, it is best to assign an unique but temporary id for user. We do not need to remember or identify user, therefore, this information may not need to be stored as a persisted cookie on browser. With the temporary id, two separated visits to website by the same user will be logged to 2 different journeys. While it is not optimal, it is still offer some benefits over normal kind of tracking.

If you can persist the cookie on browser or if user login, things will bet more interesting as we can link individual journeys to one.

After this, there come the biggest and most challenging part of the system where you need to figure out one mechanism to optimize customer experiences based on his journey. Unfortunately, this part is too specific for each system that our experience and methods may not be very useful for you at all. However, in general, we can measure user satisfaction and happiness based on the time users spend at each step. We also can figure out user interest by measuring the time spend for each product. From there, please build and optimize your own analysing tool. This is a very challenging but interesting task.