Ad click event aggregation
Digital advertising one of the biggest source of revenue for many media companies. As a result, tracing ad click events is very important.
Let’s design an ad click event aggregation system at Google scale.
Required Analytics Query
- Return the number of click events for a particular Ad in the last M minutes for given publisher.
- Return the top 100 most clicked ads in the past one minute for given publisher.
- Support filtering based on ip, user_id or country.
Complexities to handle events
- Late arrived events
- Duplicate events
- System recovery if something goes down
- Accuracy
High level system design
Click Capture Service
Every time user click on Ad, it will make rpc call to ClickCaptureService. Following will be the API design for this service.
POST /ad/clicks
ClickRequest {
AdId: xxx
ClickTimestamp: xxx
UserId: xxx
IPAddress: xxx
Location: xxx
}
ClickResponse {}
This service will capture click arrival data into Click queue and return the OK response.
ClickQueue
In order to handle the delayed events and accuracy, we will have to capture following two timestamp
- EventTimestamp
- ProcessingTimestamp
For accuracy, all aggregation and reporting should be based on EventTimestamp. Following will be schema for the queue messages.
{
EventTimestamp: xxx #ClickTimestamp
ProcessingTimestamp: xxx
Payload: {
AdId: xxx
UserId: xxx
ClickTimestamp: xxx
}
}
Aggregator Service design
There are two types of data
- Bounded data: All data are written to storage and then we can run analytics processing.
- Unbounded data: Data is keep coming as stream and we need to run analytics processing on interval of data
Batch processing (Hadoop etc) is used on bounded data. Stream processing (Apache Flink or Google Dataflow) is used on unbounded data.
Stream processing are most suitable for this design. Following are benefits of using Apache Flink and Google Dataflow
Support both Batch and Stream processing pipeline
It provides mechanism to write pipeline for both batch and stream processing as well. This helps to avoid writing Lambda architecture where engineers have to write two separate pipeline, one for batch processing and second for stream data processing.
Handle Delayed events processing
It handles delayed events processing using windowing and watermarking mechanism.
Windowing: Dataflow divides streaming data into logical chunks called windows. This allows for processing data in manageable groups and enables aggregation and transformation operations.
Watermaking: A watermark is a timestamped signal that indicates Dataflow has likely seen all data upto that point in the time. It helps Dataflow to determine when it’s safe to trigger window calculations and avoid waiting indefinitely for potential delayed data.
Late Data: Late data refers to events that arrive after their associated window’s watermark has passed. Dataflow provides mechanism to handle late data depending on the windowing strategy.
How to handle late data?
- Allowed Lateness: This sets a grace period after the watermark during which late data can still be processed within it’s intended window. Rest are discarded. There are only one output.
- Side outputs: If late data arrives beyond the allowed lateness, it can be sent to a side output for separate processing. I.e. it produce two separate outputs.
- Discard late data: Simply discard late data if it arrives beyond the allowed lateness.
- Custom Trigger: Dataflow allows defining custom triggers to control window firing based on more complex logic.
In our case, we will use Side outputs as we need to keep the high accuracy.
Handle duplicates events processing
Dataflow provides following mechanism to handle the duplicate events processing.
- Distinct Operators:
Distinct
transform can remove duplicates elements fromPCollection
based on specified key or entire element. - State and Timers: It also provides stateful processing to maintain information about previously seen events and filter out duplicates. It can store hash of each processed events and compare new events against the stored hashes to detect duplicates.
In our case, we will go wtih State and Timers to achieve high accuracy.
Supported Windows
Fixed Windows: Data is divided into windows of fixed size. For example every 1 minute or every 1 hour. This is used to process data for tine based aggregation.
Sliding Windows: Each window having a fixed size and a fixed sliding period. It provides a smoother view of the data with more frequent update.
Sessions: Windows are dynamically sized based on gaps in the data stream.
For our design, we will go with Fixed Windows.
Apache beam to write workflow
Both Apache Flink and Google Dataflow support writing pipeline using Apache beam syntax pattern. This helps to write workflow once and gives opportunity to run it on desired stream processor.
Ad Count Aggregator Service
We need to capture click count for each Ad on every minute. For simplicity purpose, let’s use fixed window with following schema.
AdId Timestamp
xx xxxx
We can write pipeline as below.
(A,12:00:01)(A, 12:00:02)(A, 12:00:30)(B, 12:00:34)(B, 12:01:45)(C,12:01:14)
|
V
AssignWindows(Fixed(1m))
|
V
(A, 1, 12:00:01, [12:00, 12:01))
(A, 1, 12:00:02, [12:00, 12:01))
(A, 1, 12:00:30, [12:00, 12:01))
(B, 1, 12:00:34, [12:00, 12:01))
(B, 1, 12:01:45, [12:01, 12:02))
(C, 1, 12:01:14, [12:01, 12:02))
DropTimestamp
|
V
(A, 1, [12:00, 12:01))
(A, 1, [12:00, 12:01))
(A, 1, [12:00, 12:01))
(B, 1, [12:00, 12:01))
(B, 1, [12:01, 12:02))
(C, 1, [12:01, 12:02))
GroupByWindow
|
V
( [12:00, 12:01), [{A, 1}, {A, 1}, {A, 1}, {B, 1}])
( [12:01, 12:02), [{B, 1}, {C, 1}])
GroupByKey
|
V
( [12:00, 12:01), [{A, [1,1,1]}, {B, [1]}])
( [12:01, 12:02), [{B, [1]}, {C, [1]}])
Sum
|
V
( [12:00, 12:01), [{A, 3}, {B, 1}])
( [12:01, 12:02), [{B, 1}, {C, 1}])
ExpandToElement
|
V
( 12:01, [{A, 3}, {B, 1}])
( 12:02, [{B, 1}, {C, 1}])
Output of this pipeline will be written into AdCount
table. Following will be schema for this table.
Window_End_Time AdId Count
12:01 A 3
12:01 B 1
12:02 B 1
12:02 C 1
ShardKey = AdId
Following will be API for analytics query to run the Ad count.
/analytics/ad/count
AdCountQueryRequest {
startTimestamp: xxxx
endTimestamp: xxx
AdId: xxxx
}
AdCountQueryResponse {
AdId: xxx
Count: xxx
}
It will run following query to get the response
SELECT Sum(Count) as Total
FROM AdCount
WHERE Window_End_Time >= $startTimestamp
AND Window_End_Time < $endTimestamp
AND AdId=xxx
GROUP BY AdId
Top 100 Ad Aggregator
We can write pipeline as below to get the top 100 Ad aggregator for given Publisher.
(A,12:00:01)(A, 12:00:02)(A, 12:00:30)(B, 12:00:34)(B, 12:01:45)(C,12:01:14)
|
V
AssignWindows(Fixed(1m))
|
V
(A, 1, 12:00:01, [12:00, 12:01))
(A, 1, 12:00:02, [12:00, 12:01))
(A, 1, 12:00:30, [12:00, 12:01))
(B, 1, 12:00:34, [12:00, 12:01))
(B, 1, 12:01:45, [12:01, 12:02))
(C, 1, 12:01:14, [12:01, 12:02))
DropTimestamp
|
V
(A, 1, [12:00, 12:01))
(A, 1, [12:00, 12:01))
(A, 1, [12:00, 12:01))
(B, 1, [12:00, 12:01))
(B, 1, [12:01, 12:02))
(C, 1, [12:01, 12:02))
GroupByWindow
|
V
( [12:00, 12:01), [{A, 1}, {A, 1}, {A, 1}, {B, 1}])
( [12:01, 12:02), [{B, 1}, {C, 1}])
GroupByKey
|
V
( [12:00, 12:01), [{A, [1,1,1]}, {B, [1]}])
( [12:01, 12:02), [{B, [1]}, {C, [1]}])
TotalAndTopK
|
V
( [12:00, 12:01), [{A, 3}, {B, 1}])
( [12:01, 12:02), [{B, 1}, {C, 1}])
ExpandToElement
|
V
( 12:01, [{A, 3}, {B, 1}])
( 12:02, [{B, 1}, {C, 1}])
Output of this pipeline will be written into TopdAd
table. Following will be scheam for this table.
Window_End_Time AdId Count PublisherId
12:01 A 3 xxx
12:01 B 1 xxx
12:02 B 1 xxx
12:02 C 1 xxx
ShardKey = PublisherId + AdId
Following will be API for analytics query to run the Top Ad .
/analytics/ad/top
TopAdQueryRequest {
startTimestamp: xxxx
endTimestamp: xxx
PublisherId: xxxx
}
TopAdQueryResponse {
[
{ AdId: xxx, Count: xxx},
]
}
It will run following query to get the response
SELECT AdId, Sum(Count) as Total
FROM TopAd
WHERE Window_End_Time >= $startTimestamp
AND Window_End_Time < $endTimestamp
AND PublisherId=xxx
GROUP BY AdId
ORDER BY Total DESC
LIMIT $TOP_K
Support filter
To support filter based on Location or Publisher, we will have to aggregate these separately and write it as separate rows in the storage.
Storage choice
Queue choices
Apache Kafka: We can use Kafka to store the Click events. Kafka needs predefined partition size therefore for big scale, it might hit the limit. Repartition in Kafka become a tedious job.
Spanner Queue: Spanner hosted on Google cloud manages parent and child tables concept to group the partition. This makes it suitable for ever growing data based on partition key.
AdCount/TopAd table
OLAP database for example BigQuery, Snoflakes, etc are suitable to store data and utilize to run the analytics query.
Since our aggregated data are generated as timeseries therefore we need a OLAP database that is also a timeseries for efficiency.
ClickHouse
is best suitable for our choices as it is exceptionally well-suited for handling and analyzing time-series data.
Correctness check
We can leverage row data from Clicks
table and run a batch job to verify the correctness of stream processing.
Happy system designing !!!