Introduction
At Qubole, we are working on various products to make analysis on top of big data easier and simple. One of the core products in this lineup is the Scheduler which offers our users a way to schedule periodic workflows that run at specific intervals. The scheduler is an important part of the analytic pipeline as shown by the 60K+ commands already scheduled on our framework, which is being used to schedule various types of workflows like re-partitioning data, logs cleanup, aggregation, etc.
This post covers one of the most common uses of the Scheduler – and how we have made it dramatically simpler for our users.
Background
Qubole workflows can run any combination of Map-Reduce, Pig, Shell Scripts, Hive, and Import/Export commands. When running a periodic workflow; specifying the data to be consumed by a single instance of the workflow is one of the most interesting parts. In a scheduler like Oozie – this is achieved as follows:
- An instance of a periodic workflow enumerates the Data Units it wants to work on.
- It specifies these data units as dependencies for the Scheduler framework (Oozie).
- When the dependencies are satisfied – the workflow (typically) consumes these data units.
From very early on – we spent considerable energy making it easier for our users to specify these dependencies:
- We enhanced Oozie to allow Data Units to be Hive Partitions in Qubole (where originally they were only allowed to be Files/Directories)
- We allowed users to specify dependencies using macros that could be populated with JavaScript expressions at runtime and that could also be substituted into (templated) commands.
However – we observed that one of the most common use cases for Scheduling workflows was to process data incrementally from the data set. Even where multiple tables are joined (for example) as part of a workflow – only one of them is typically the fact table (for which data needs to be processed incrementally). Thinking of processing in terms of dependencies seemed like an ill fit for this pattern. Some usability issues we observed were as follows:
- Partially-filled Data Units would lead to missed data. Users had to make an unnecessary choice between processing data promptly – or completely.
- Enumerating dependencies for hourly processing was nearly impossible. In some cases involving DST – we found that the dependency specification had to be revised whenever DST boundaries were crossed for correctness.
Thinking Incrementally
Going back to the drawing board, we tried to sketch out what a good incremental processing system would look like:
- The Scheduling framework should keep track of processed data and feed new data to the User’s workflows
- The Framework should automatically take care of cases where a lot of data is backlogged and/or catch-up processing needs to happen
- Since data consumed can often be reloaded (to fix corruptions due to upstream processing for example) – we must provide an easy way to re-process specific ranges of data.
- It should be possible to specify incremental processing both at the file granularity (by monitoring a directory for example) or at the level of partitions (to kick off processing whenever a new partition shows up in a partitioned Hive table).
Introducing Incremental Hive
To begin with – we have launched incremental processing support for Hive Queries in Qubole. In the following sections – we detail this feature, how we built it, and some of the remaining challenges.
While creating a periodic workflow, a user can specify if they want to schedule a job incrementally on a particular hive table. Qubole will then keep track of all the files/partitions processed in each instance and calculates the new data arrived for the current instance. The list of new data is passed on to Hive which will then run the queries only on the given subset of data.
Incremental workflows have various cool features:
Workflow Management
Incremental workflow divides the work into a number of chunks, where each instance processes a chunk of data. The user can easily monitor what data was processed in an instance. Qubole also reverse indexes the data per instance, so the user can efficiently search the instance which processed some data. If the user wants to modify/fix some already processed data and rerun an instance to compute the results on the updated data.
Limiting Load
The continuous influx of logs can easily lead to a large data size which can result in large running queries. The problem becomes worse when there is a lot of old data lying around. Large queries are usually unreliable because they have a larger chance of failing. Smaller-sized queries run faster (hence the lesser chance of error) and work better with transient errors as they only impact a subset of data.
Incremental workflows allow a user to completely ignore legacy data if one chooses to. Even when one wants to process the entire data, the workflow applies its internal limits to ensure that data only up to a certain limit is processed in one instance. The rest of the data will be deferred until the next run.
Applications
One of the popular applications where this would be useful is in logs cleanup or re-partitioning. A number of workflows involve taking a dump of data (for example browser logs) from a certain location, running hive queries on top of it to clean up the data and re-partition it into another hive table. By scheduling an incremental workflow, the user knows that any query will run only on newer data (whether the data is dumped into a new partition or enters late into an existing one) so the queries are written are much simpler, and the management of the workflow much easier.
Another useful application is running workflows for dumping out partial aggregates/counters over log files (counting browser stats for example) is another popular use case where incremental workflows — combined with our data export technology to import hive queries into relational databases — make the whole process simpler.
Ease of Use
By getting rid of all the dependencies, it is much easier to schedule and manage an incremental workflow compared to its corresponding workflow with dependencies. In the figure below, we show a comparison between the oozie config files generated for an incremental workflow and its corresponding workflow with dependencies
Roadmap
Incremental Hive is just the start for making incremental workflows easier. We are working on providing incremental functionality for Map-Reduce jobs as well as processing data in external data stores like MySQL and MongoDB.
Conclusion
Data Analysis today is a lot harder than it should be, and Qubole has done a lot of work to simplify this process. Today we presented our exclusive ‘Incremental Hive’ technology which keeps track of users’ data to ensure that only new data is run on every instance. It immensely simplifies the process of writing workflows, managing them, and reducing the chances of errors in the process.