A lot goes on in the backend when a person clicks the Pin It button. Thumbnails of all sizes are generated, the board thumbnail is updated, and a Pin is fanned out to those who follow the Pinner or the board. We also evaluate if a Pin should be added to a category feed, check for spam, index for search, and so on.
These jobs are critically important but don’t all need to happen before we can acknowledge success back to the user. This is where an asynchronous job execution system comes in, where we need to enqueue one or more jobs to execute these actions at a later time and rest assured they will eventually be executed. Another use case is when a large batch of jobs needs to be scheduled and executed with retries for resiliency toward temporary backend system unavailability, such as a workflow to generate and send emails to millions of Pinners each week. Here’s a look at how we developed an asynchronous job execution system in-house, which we call PinLater.
We had originally implemented a solution based on Pyres for this purpose, however it had several limitations:
- Job execution was best effort, i.e. there was no success acknowledgement (ACK) mechanism.
- There was a lack of visibility into the status of individual job types, since jobs were all clubbed into a single set of nine priority queues.
- The system wasn’t entirely configurable or manageable, e.g. no ability to throttle job execution or configure retries.
- It was tied to Redis as the storage backend, and only worked for jobs written in Python, both of which were restrictions that would not continue to be acceptable for us.
- It didn’t have built-in support for scheduled execution of jobs at a specific time in the future, a feature that some of our jobs needed.
We looked at a few other open source queue or publish/subscribe system implementations, but none provided the minimum feature set we needed, such as time-based scheduling with priorities and reliable ACKs, or could properly scale. Amazon Simple Queue Service (SQS) would likely meet many of our requirements, but for such a critical piece of infrastructure, we wanted to operate it ourselves and extend the feature set as needed, which is why we developed PinLater.
Designing for execution of asynchronous jobs
In building PinLater, we kept the following design points in mind:
- PinLater is a Thrift service to manage scheduling and execution of asynchronous jobs. It provides three actions via its API: enqueue, dequeue and ACK that make up the core surface area.
- PinLater is agnostic to the details of a job. From its point of view, the job body is just an opaque sequence of bytes. Each job is associated with a queue and a priority level, as well as a timestamp called run_after that defines the minimum time at which the job is eligible to run (by default, jobs are eligible to run immediately, but this can be overridden to be a time in the future).
- When a job is enqueued, PinLater sends it to a backend store to keep track of it. When a dequeue request comes in, it satisfies the request by returning the highest priority jobs that are eligible to run at that time, based on run_after timestamps. Typically there are one or more worker pools associated with each PinLater cluster, which are responsible for executing jobs belonging to some subset of queues in that cluster. Workers continuously grab jobs, execute them and then reply to PinLater with a positive or negative ACK, depending on whether the execution succeeded or failed.
- In our use of PinLater, each job type maps 1:1 to a specific queue. The interpretation of the job body is a contract between the enqueuing client(s) and the worker pool responsible for that queue. This 1:1 mapping isn’t mandated by PinLater, but we have found it to be operationally very useful in terms of managing jobs and having good visibility into their states.
Job state machine
A newly enqueued job starts in state PENDING. When it becomes eligible for execution (based on priority and its run_after timestamp), it can be dequeued by a worker, at which point its state changes to RUNNING.
If the worker completed the execution successfully, it will send a success ACK back, and the job will move to a terminal SUCCEEDED state. Succeeded jobs are retained in PinLater for diagnostics purposes for a short period of time (usually a day) and then garbage collected.
If the job execution failed, the worker will send a failure ACK back, at which point PinLater will check if the job has any retries available. If so, it will move the job back to PENDING. If not, the job goes into a terminal FAILED state. Failed jobs stay around in PinLater for diagnostics purposes (and potentially manual retries) for a few days. When a job is first enqueued, a numAttemptsAllowed parameter is set to control how many retries are allowed. PinLater allows the worker to optionally specify a delay when it sends a failure ACK. This delay can be used to implement arbitrary retry policies per job, e.g. constant delay retry, exponential backoff, or a combination thereof.
If a job was dequeued (claimed) by a worker and it didn’t send back an ACK within a few minutes, PinLater considers the job lost and treats it as a failure. At this point, it will automatically move the job to PENDING or FAILED state depending on whether retries are available.
The garbage collection of terminal jobs as well as the claim timeout handling is done by a scheduled executor within the PinLater thrift server. This executor also logs statistics for each run, as well as exports metrics for longer term analysis.
PinLater’s Python worker framework
In addition to the PinLater service, we provide a Python worker framework that implements the PinLater dequeue/ACK protocol and manages execution of python jobs. Adding a new job involves a few lines of configuration to tell the system which PinLater cluster the job should run in, which queue it should use, and any custom job configuration (e.g. retry policy, number of execution attempts). After this step, the engineer can focus on implementing the job logic itself.
While the Python framework has enabled smooth transition of jobs from the earlier system and continues to support the vast majority of new jobs, some of our clients have implemented PinLater workers in other languages like Java and C++. PinLater’s job agnostic design and simple Thrift protocol have made this relatively straight forward to do.
The PinLater Thrift server is written in Java and leverages Twitter’s Finagle RPC framework. We currently provide two storage backends: MySQL and Redis. MySQL is used for relatively low throughput use cases and those that schedule jobs over long periods and thus can benefit from storing jobs on disk rather than purely in memory. Redis is used for high throughput job queues that are normally drained in real time.
MySQL was chosen for the disk-backed backend since it provides the transactional querying capability needed to implement a scheduled job queue. As one might expect, lock contention is an issue and we use several strategies to mitigate it including a separate table for each priority level , use of UPDATE … LIMIT instead of SELECT FOR UPDATE for the dequeue selection query, and carefully tuned schemas and secondary indexes to fit this type of workload.
Redis was chosen for the in-memory backend due to the sophisticated support it has for data structures like sorted sets. Being single threaded, lock contention is not an issue with Redis, but we did have to implement optimizations to make this workload efficient, including the use of Lua scripting to reduce unnecessary round trips.
Horizontal scaling is provided by sharding the backend stores across a number of servers. Both backend implementations use a “free” sharding scheme (shards are chosen at random when enqueueing jobs). This makes adding new shards trivial and ensures well balanced load across shards. We implement a shard health monitor that keeps track of the health of each individual shard and pulls out of rotation shards that are misbehaving either due to machine failure, network issues or even deadlock (in the case of MySQL). This monitor has proven invaluable in automatically handling operational issues that could otherwise result in high error rates and paging an on-call operator.
PinLater has been in use in production for months now, and our legacy Pyres based system was fully deprecated in Q1 2014. PinLater runs hundreds of job types at aggregate processing rates of over 100,000 per second. These jobs vary significantly on multiple parameters including running time, frequency, CPU vs. network intensive, job body size, programming language, enqueued online vs. offline, and needing near real time execution instead being scheduled hours in advance. It would be fair to say nearly every action taken on Pinterest or notification sent relies on PinLater at some level. The service has grown to be one of Pinterest’s most mission critical and widely used pieces of infrastructure.
Our operational model for PinLater is to deploy independent clusters for each engineering team or logical groupings of jobs. There are currently around 10 clusters, including one dedicated for testing and another for ad hoc one-off jobs. The cluster-per-team model allows better job isolation and, most importantly, allows each team to configure alerting thresholds and other operational parameters as appropriate for their use case. Nearly every operational issue that arises with PinLater tends to be job specific or due to availability incidents with one of our backend services. Thus having alerts handled directly by the teams owning the jobs usually leads to faster resolution.
Observability and manageability
One of the biggest pain points of our legacy job queuing system was that it was hard to manage and operate. As a result, when designing PinLater, we paid considerable attention to how we could improve on that aspect.
Like every service at Pinterest, PinLater exports a number of useful stats about the health of the service that we incorporate into operational dashboards and graphs. In addition, PinLater has a cluster status dashboard that provides a quick snapshot of how the cluster is doing.
PinLater also provides two features that have greatly helped improve manageability: per-queue rate limiting and configurable retry policies. Per-queue rate limiting allows an operator to limit the dequeue rate on any queue in the system, or even stop dequeues completely, which can help alleviate load quickly on a struggling backend system, or prevent a slow high priority job from starving other jobs. Support for configurable retry policies allows deployment of a policy that’s appropriate to each use case. Our default policy allows 10 retries, with the first five using linear delay, and the rest using exponential backoff. This policy allows the system to recover automatically from most types of sustained backend failures and outages. Job owners can configure arbitrary other policies as suitable to their use case as well.
We hope to open source PinLater this year. Stay tuned!
Want an opportunity to build and own large scale systems like this? We’re hiring!
Raghavendra Prabhu is a software engineer at Pinterest.
Acknowledgements: The core contributors to PinLater were Raghavendra Prabhu, Kevin Lo, Jiacheng Hong and Cole Rottweiler. A number of engineers across the company provided useful feedback, either directly about the design or indirectly through their usage, that was invaluable in improving the service.