At Mobfox we help developers monetize their mobile applications. We provide them a service to request ads from advertisers and get paid for showing them inside the app. The process of matching ad requests with ads is quite resource intensive since we receive 1 billion incoming requests per hour. Moreover, those requests need to be processed under some very tight time constraints (around 300 ms) to provide ads while they are still relevant to the viewer.
Since this is out of the scope of this post I won’t go into details on what happens during an ad request. But on a very high level, for each request we stage an auction, asking several advertising platforms (on average around 15) to bid how much they are willing to pay for a particular ad view. Thus each billion of incoming ad request is fanned out into an average of 15 billion outgoing bid requests to our advertising partners.
High-level overview of a typical ad request cycle. Each incoming ad request gets fanned out into multiple bid
requests to advertising partners.
Of course, not all apps are interesting to advertisers all the time. There are some that only receive ads at a specific time of the day. Imagine, the advertisers have figured out that an app for coffee lovers shows much better ad engagement (more clicks) when ads are served in the morning and not late in the evening. So they decide to pay for ad views from that app only once they happen early in the day.
For us being able to predict such patterns means saving a lot of processing time and cutting down on network traffic since we are able to a priori know that a request will not result in an ad. This means we don’t need to go through expensive communication with our advertising partners to confirm that. However, we need to be very careful with such predictions, as false positives directly translate into lost revenue. This is especially tricky since the strategy of advertisers might change frequently and on a very short notice. While none of the advertisers is willing to pay for a particular ad view at the current point in time, this behavior might already be different in the next minute. The rest of the post explains how we use Kafka, Spark Streaming and Aerospike to implement a real-time version of an algorithm for such a callout optimization.
A typical ad request
When an ad request hits our load balancers it first gets routed to one of our servers responsible for staging auctions between advertisers. We run different optimizations for each request that all follow the same basic premises. A small fraction of the traffic is used to train parameters for our optimization algorithms, while the rest of it gets the optimizations applied.
The optimizations can roughly be classified into two distinct categories. In the first one, we have those that don’t require a quick reaction time. For them, we simply dump training data into our Hadoop cluster and run batch processing jobs several times per day. The trained models used during the auction could thus be delivered with a delay of a few hours. On the other hand, we have optimizations that require a quick reaction. In such cases a delay of minutes could already have a negative impact on the platform, so we need to ingest and process data as fast as possible. Those algorithms cannot rely on training data from our data warehouse but need to get it from a different source.
Collecting training data
Let us first have a look at what happens when an ad request gets flagged for training. While the normal ad auction process is being executed, some additional data, which is needed for callout optimization, gets collected. In general, we want to save when the ad was requested and served, as well as the segmentation of the request, telling us the country of origin, ad type, some identification of the device and application performing it. This data needs to be saved somewhere and made available to the process training the callout optimization parameters. In our case, the data gets saved to Apache Kafka.
Unoptimized traffic is used to collect training data and ingest it into Apache Kafka.
Kafka is a distributed event-log that is designed to be fast, scalable, and durable. It can be used for building real-time streaming pipelines capable of reliably passing data between different systems. To achieve this, Kafka runs as a cluster of one or more servers (brokers), organizing records in categories called topics. Topics are composed of partitions, each one containing records in an ordered and immutable commit log. The records can be durably persisted by setting up a configurable retention policy. Using the partitions Kafka can simply scale a topic beyond a size that fits on a single server.
There are different approaches for picking a partition for a particular message. While the simplest implementation uses round-robin load balancing, we have implemented a semantically richer key-based approach. The idea is to use a predefined key to determine the partition for writing the data. In our case, the key simply contains the concatenated segments string. This allows us to evenly spread messages between partitions and avoid overloading a broker due to hot keys. It also ensures that the same segments will always be written to the same partition, which means we will be able to process them by a single consumer in one go (more on that later).
Kafka can achieve its high throughput by retaining only consumer offsets per partition and leaving all the offset management to the consumer itself, who can decide how and when to increase it. As a consequence, Kafka consumers are very cheap and can be added and removed to the cluster without a big impact on the system or on the other consumers.
Processing training data
Once the messages are dumped to Kafka a separate system needs to consume and process them. In our case, the requirement is to perform this processing every minute on small slices of data to learn if advertisers were willing to bid on a specific segment in the very recent past. To implement such a workflow we use Apache Spark Streaming. Spark Streaming is the extension of the core Spark API that enables scalable, high-throughput and fault-tolerant processing of live data streams.
In our architecture, a Spark Streaming job consumes data from Kafka. There are two different approaches for doing that, an older one using receivers and a more recent one using a DirectAPI. We have opted for the second one due to better performance and stronger fault-tolerance guarantees. Using the DirectAPI eliminates the need for having separate receivers to read data from Kafka and does not require Write Ahead Logs to achieve exactly-once processing semantics. Our Spark Streaming job reads data from Kafka in non-overlapping micro-batches of 60 seconds. Once the data is read to memory it is reduced by key to get the number of ads requested and served for a particular segment.
Similarly as when producing to Kafka, the key gets generated from the dimensions defining a segment. We apply a simple heuristic to the number of requests and serves to determine if a segment should be currently throttled or sent out to the advertisers. In layman’s terms, the heuristic detects segments that receive less than a certain percent of served ads (in our case 1%) and marks them for throttling in the optimized traffic. While the 1% threshold works for us (we have a very long tail of segments with served rate under that value) the number needs to be tuned per environment and use case. Furthermore, it is worth noting that a more complex machine learning algorithm can be plugged-in here (either using sparks MLlib or some other framework) but is out of scope for this post.
Finally, the parameters on segments to be throttled need to be persisted for use on ad requests. In our setup, we use Aerospike as a persistence layer between Spark Streaming and our auction servers. Aerospike is a high-performance, scalable and distributed key-value store. To achieve very low latency on read/write operations Aerospike is optimized for storing data in memory and on SSD disks. On a higher level the data is organized in namespaces, sets, and records.
Each record is addressable by a key and can contain different values in multiple bins. Similarly, as in Kafka, the key in Aerospike is used to determine nodes where a record physically resides. After our Spark Streaming job processes data for a particular segment, it uses the segment name as a key and puts the throttling status as a bin in the record. Additionally, a separate bin is used to store the timestamp of when the value was last recalculated. We add this information to prevent throttling segments based on stale data.
When a new micro-batch of events will be processed, the segments in Aerospike will simply be overwritten with current throttling information and the timestamps will be updated to reflect that the records are not stale. We also put a Time-To-Live (TTL) on each record to automatically garbage collect segments that are not currently in use. The records get stored in a dedicated set that is accessed directly by auction servers.
A Spark Streaming job reads micro-batches of data, reduces them per segment and applies a heuristic to decide if
the segment should be throttled. The throttling information gets saved into Aerospike.
Let’s finish the post where we started it, by explaining what happens if an ad request hits our auction servers and gets flagged to be optimized. The end goal of this step is to decide if for an ad request we want to multiplex bid requests to our advertising partners or should we just say upfront there is no ad available. To do that we first need to reconstruct the segment key from the request context and access throttling information in Aerospike. If the record is not stale and the segment should be throttled, processing for this request stops immediately and no ad is returned. In such case, we are also able to provide that information to ad requester significantly faster since the request doesn’t need to go through the whole end-to-end flow.
However, due to throttling information not changing on every request, we additionally cache it in memory of each auction server for a short time (in our case a minute) and only go to Aerospike once the cache entry has expired. By adding local caching the callout throttling implementation only ads a single additional aerospike read operation per request segment and auction machine.
Each request in the optimized traffic is checked for being throttled. The auction between advertising partners is staged only if the request is not throttled.
With the described real-time processing pipeline we are able to reduce outgoing network traffic by 20%. For us, this number is quite significant since network traffic is a big part of our infrastructure costs. However, this is not the only optimization where we use such an approach for processing data. In fact, the architecture described in this post can be generalized to any use case where there is a massive stream of incoming events that needs to be processed in near real-time. If you are interested in what additional optimizations we do to scale our platform, apply for one of our open positions and help MobFox become even more efficient. =)