We have hundreds of A/B experiments running at anytime, as we launch new products and improve the quality of the user experience. To power these experiments and make timely and data-driven decisions, we built a system to scale the process.
Building the experimentation framework
We set out with three main goals for the framework:
- Extensible and generic enough so that we could easily extract the most valuable information from experiment data to support different kinds of cohort/segmented analysis.
- Scalable to process and store a large amount of data generated by ever increasing number of experiments, users and metrics to track.
- Compute and serve metrics via dashboards in batch/real time with high quality, providing the optimal statistical test modules.
Each component of the framework needed to be independently and easily scalable, and included:
- Kafka for the log transport layer
- Hadoop for running the MapReduce jobs
- Pinball for orchestrating the MapReduce workflows
- Storm for real-time experiment group validation
- HBase & MySQL to power the dashboard backend
- Redshift for interactive analysis
Front-end messages are logged to Kafka by our API and application servers. We have batch processing (on the middle-left) and real-time processing (on the middle-right) pipelines to process the experiment data. For batch processing, after daily raw log get to s3, we start our nightly experiment workflow to figure out experiment users groups and experiment metrics. We use our in-house workflow management system Pinball to manage the dependencies of all these MapReduce jobs.
The final output is inserted into HBase to serve the experiment dashboard. We also load the output data to Redshift for ad-hoc analysis. For real-time experiment data processing, we use Storm to tail Kafka and process data in real-time and insert metrics into MySQL, so we could identify group allocation problems and send out real-time alerts and metrics.
In logging, the topline goal is to make the process as lightweight as possible. There is no persistence layer involved for experiment group allocation, so that we could minimize the latency/load on our production services. We left all of the complicated metric computations to offline data processing.
To determine users’ experiment (feature) group, whenever a request comes through to the application server, we hash by experimentid and userid/requestid to assign the user the right bucket based on group size configurations, such that the same user would always be allocated to the same experiment group for consistent user experiences. For each request, we log a thrift message in the following format.
We also have separate logs for all users’ actions and context. In later steps, we join these logs against this user experiment activation log to determine the performance of each experiment group and corresponding activity metrics. Since this is a large-scale and time consuming computation, we leverage the Hadoop framework to do this.
Batch-processing MapReduce workflow
The MapReduce workflow starts to process experiment data nightly when data of the previous day is copied over from Kafka. At this time, all the raw log requests are transformed into meaningful experiment results and in-depth analysis. To populate experiment data for the dashboard, we have around 50 jobs running to do all the calculations and transforms of data.
To support ‘days-in’ analysis for separating novelty effect, the experiment workflow starts by first figuring out the first day the user joining the experiment, then that information together with the user’s other attributes (such as gender and country segmentations) are joined with the raw actions log (event/context). Later, the aggregator job will do the aggregation on these activity metrics per experimental group, per segmentation, per days-in. The segment rollup job will then generate both fully denormalized/aggregated data and insert into HBase.
To account for novelty decay effect of features, for each user in the experiment, we first figure out the earliest date when the user was first seen in the experiment group, then for each experiment group, we join experiment users with their subsequent actions on a particular day, so that user activities are grouped by experiment group and also the number of days the user was in the experiment.
To support segmented analysis of features’ impact on a particular group, e.g. iPhone activity of male active users located in United Kingdom, we categorize users into different fully denormalized attribute groups (cohorts). The final output of fully segmented metrics are inserted into HBase, exploiting the fast and scalable data write/read access provided by HBase. Previously, we were storing this data in MySQL, which had high ingestion latencies (in the order of hours). By moving to HBase we were able to slash the ingestion latency to a few mins and the reads also became much faster.
Scalable serving of experiment metrics
After the MapReduce workflow computation, we end up with around 20TB of experiment metrics data (every six months). To make that data easily accessible within sub seconds, we built a service to serve experiment metrics data using HBase as backend. HBase is essentially a key/value store, where each key consists of the following parts: row-key, column family, column and time-stamp. We store our experiment metrics, such as number of active users and number of actions, with the following schema.
The row key is constructed by multiple different components, where we classify different components into: non-aggregates, cohorts and ranges (descriptions below). One intricacy is that sometimes we need aggregated metrics across certain segments (e.g. we need metrics of the experiment across all the countries). To support that, we do aggregations over different segment cohorts based on the fully denormalized data and generate all potential extra rows for rolled up results.
Columns in HBase are grouped into column families. We group metrics in different categories to different column families. For example all user active/action metrics are grouped together in ‘t’ column family, all metrics segmented by apptype in ‘a’ column family, all funnel/context metrics grouped in ‘c’ column family, etc. Each column family contains metrics such as the total number of users in the group, the number of active users/actions across all different events/metrics so on and so forth.
Non-aggregate: These values are constant and never need to be pre-aggregated. So we put them as prefix for prefix filtering.
Cohorts: These dimensions will need to be looked up in aggregate. Extra rolled up rows will be generated for each potential combination with other cohorts. We pre-aggregate the values and generate keys where the value is rolled up and represented by ‘ALL’.
Ranges: This section of the key is for the convenience of range scan. We will need to make range requests on these values.
Real-time data processing
In addition to batch processing, we also wanted to achieve real-time data processing. For example, to improve the success rate of experiments, we needed to figure out experiment group allocations in real-time once the experiment configuration was pushed out to production. We used Storm to tail Kafka and compute aggregated metrics in real-time to provide crucial stats.
Powering data driven decisions
We rendered experiment results through an experiment dashboard, which is constituted by many useful tools to make the experiment results easily accessible.
Cohort analysis: From the dashboard, the experiment owner could compare metrics from different user group such as country, gender, user state and app type. It’s also possible to filter by the time when they join the experiment.
Days-in analysis: User actions happened in the same days-in are grouped together and compared as shown in the following graph grouped in different days joining the experiment. The days-in analysis powers us to understand user behavior under the impact of novelty decay.
Statistical significance: To provide guidance on the significance of differences across experiment groups, we used unpaired t-test for num_active and num_actions test. The test relied on the assumption that the sample data was independently and identically distributed, but not on the underlying sample distribution. The null hypothesis was that the mean values of control and treatment groups were the same. Given the null hypothesis, if the probability of our observation was extremely small, we could reject the null hypothesis under certain significant level. In the analytics dashboard, if the p-value was below significance level 0.05, we marked the result using blue/orange color, as shown in above graph.
Group validation: To help experiment launches successfully, we also support automated group allocation test to ensure that users of each segmentation are allocated as expected. For group allocation validation test, we used Pearson’s chi-square test. A red-glow box would remind experiment owners for group allocation anomalies.
This experimentation framework powers all types of data driven decisions at Pinterest, and has helped us to understand hundreds of A/B experimental results and deciding the launch of new features. If challenges like this interest you, join us on the data engineering team!
Chunyan Wang is a software engineer at Pinterest.
Acknowledgements: The core contributors to the A/B experimentation analytics framework were Chunyan Wang, Joshua Inkenbrandt, Anastassia Kornilova, Andrea Burbank, Mohammad Shahangian, Stephanie Rogers and Krishna Gade along with the rest of the data engineering team. Additionally, a number of engineers across the company provided helpful feedback.