Ad click event aggregation

Dilip Kumar
6 min readAug 18, 2024

--

Digital advertising one of the biggest source of revenue for many media companies. As a result, tracing ad click events is very important.

Let’s design an ad click event aggregation system at Google scale.

Required Analytics Query

  1. Return the number of click events for a particular Ad in the last M minutes for given publisher.
  2. Return the top 100 most clicked ads in the past one minute for given publisher.
  3. Support filtering based on ip, user_id or country.

Complexities to handle events

  1. Late arrived events
  2. Duplicate events
  3. System recovery if something goes down
  4. Accuracy

High level system design

Click Capture Service

Every time user click on Ad, it will make rpc call to ClickCaptureService. Following will be the API design for this service.

POST /ad/clicks
ClickRequest {
AdId: xxx
ClickTimestamp: xxx
UserId: xxx
IPAddress: xxx
Location: xxx
}
ClickResponse {}

This service will capture click arrival data into Click queue and return the OK response.

ClickQueue

In order to handle the delayed events and accuracy, we will have to capture following two timestamp

  1. EventTimestamp
  2. ProcessingTimestamp

For accuracy, all aggregation and reporting should be based on EventTimestamp. Following will be schema for the queue messages.

{
EventTimestamp: xxx #ClickTimestamp
ProcessingTimestamp: xxx
Payload: {
AdId: xxx
UserId: xxx
ClickTimestamp: xxx

}
}

Aggregator Service design

There are two types of data

  1. Bounded data: All data are written to storage and then we can run analytics processing.
  2. Unbounded data: Data is keep coming as stream and we need to run analytics processing on interval of data

Batch processing (Hadoop etc) is used on bounded data. Stream processing (Apache Flink or Google Dataflow) is used on unbounded data.

Stream processing are most suitable for this design. Following are benefits of using Apache Flink and Google Dataflow

Support both Batch and Stream processing pipeline

It provides mechanism to write pipeline for both batch and stream processing as well. This helps to avoid writing Lambda architecture where engineers have to write two separate pipeline, one for batch processing and second for stream data processing.

Handle Delayed events processing

It handles delayed events processing using windowing and watermarking mechanism.

Windowing: Dataflow divides streaming data into logical chunks called windows. This allows for processing data in manageable groups and enables aggregation and transformation operations.

Watermaking: A watermark is a timestamped signal that indicates Dataflow has likely seen all data upto that point in the time. It helps Dataflow to determine when it’s safe to trigger window calculations and avoid waiting indefinitely for potential delayed data.

Late Data: Late data refers to events that arrive after their associated window’s watermark has passed. Dataflow provides mechanism to handle late data depending on the windowing strategy.

How to handle late data?

  1. Allowed Lateness: This sets a grace period after the watermark during which late data can still be processed within it’s intended window. Rest are discarded. There are only one output.
  2. Side outputs: If late data arrives beyond the allowed lateness, it can be sent to a side output for separate processing. I.e. it produce two separate outputs.
  3. Discard late data: Simply discard late data if it arrives beyond the allowed lateness.
  4. Custom Trigger: Dataflow allows defining custom triggers to control window firing based on more complex logic.

In our case, we will use Side outputs as we need to keep the high accuracy.

Handle duplicates events processing

Dataflow provides following mechanism to handle the duplicate events processing.

  1. Distinct Operators: Distinct transform can remove duplicates elements from PCollection based on specified key or entire element.
  2. State and Timers: It also provides stateful processing to maintain information about previously seen events and filter out duplicates. It can store hash of each processed events and compare new events against the stored hashes to detect duplicates.

In our case, we will go wtih State and Timers to achieve high accuracy.

Supported Windows

Fixed Windows: Data is divided into windows of fixed size. For example every 1 minute or every 1 hour. This is used to process data for tine based aggregation.

Sliding Windows: Each window having a fixed size and a fixed sliding period. It provides a smoother view of the data with more frequent update.

Sessions: Windows are dynamically sized based on gaps in the data stream.

For our design, we will go with Fixed Windows.

Apache beam to write workflow

Both Apache Flink and Google Dataflow support writing pipeline using Apache beam syntax pattern. This helps to write workflow once and gives opportunity to run it on desired stream processor.

Ad Count Aggregator Service

We need to capture click count for each Ad on every minute. For simplicity purpose, let’s use fixed window with following schema.

AdId Timestamp
xx xxxx

We can write pipeline as below.

          (A,12:00:01)(A, 12:00:02)(A, 12:00:30)(B, 12:00:34)(B, 12:01:45)(C,12:01:14)
|
V
AssignWindows(Fixed(1m))
|
V
(A, 1, 12:00:01, [12:00, 12:01))
(A, 1, 12:00:02, [12:00, 12:01))
(A, 1, 12:00:30, [12:00, 12:01))
(B, 1, 12:00:34, [12:00, 12:01))
(B, 1, 12:01:45, [12:01, 12:02))
(C, 1, 12:01:14, [12:01, 12:02))
DropTimestamp
|
V
(A, 1, [12:00, 12:01))
(A, 1, [12:00, 12:01))
(A, 1, [12:00, 12:01))
(B, 1, [12:00, 12:01))
(B, 1, [12:01, 12:02))
(C, 1, [12:01, 12:02))
GroupByWindow
|
V
( [12:00, 12:01), [{A, 1}, {A, 1}, {A, 1}, {B, 1}])
( [12:01, 12:02), [{B, 1}, {C, 1}])
GroupByKey
|
V
( [12:00, 12:01), [{A, [1,1,1]}, {B, [1]}])
( [12:01, 12:02), [{B, [1]}, {C, [1]}])
Sum
|
V
( [12:00, 12:01), [{A, 3}, {B, 1}])
( [12:01, 12:02), [{B, 1}, {C, 1}])
ExpandToElement
|
V
( 12:01, [{A, 3}, {B, 1}])
( 12:02, [{B, 1}, {C, 1}])

Output of this pipeline will be written into AdCount table. Following will be schema for this table.

Window_End_Time  AdId  Count
12:01 A 3
12:01 B 1
12:02 B 1
12:02 C 1

ShardKey = AdId

Following will be API for analytics query to run the Ad count.

/analytics/ad/count
AdCountQueryRequest {
startTimestamp: xxxx
endTimestamp: xxx
AdId: xxxx
}
AdCountQueryResponse {
AdId: xxx
Count: xxx
}

It will run following query to get the response

SELECT Sum(Count) as Total
FROM AdCount
WHERE Window_End_Time >= $startTimestamp
AND Window_End_Time < $endTimestamp
AND AdId=xxx
GROUP BY AdId

Top 100 Ad Aggregator

We can write pipeline as below to get the top 100 Ad aggregator for given Publisher.

(A,12:00:01)(A, 12:00:02)(A, 12:00:30)(B, 12:00:34)(B, 12:01:45)(C,12:01:14)
|
V
AssignWindows(Fixed(1m))
|
V
(A, 1, 12:00:01, [12:00, 12:01))
(A, 1, 12:00:02, [12:00, 12:01))
(A, 1, 12:00:30, [12:00, 12:01))
(B, 1, 12:00:34, [12:00, 12:01))
(B, 1, 12:01:45, [12:01, 12:02))
(C, 1, 12:01:14, [12:01, 12:02))
DropTimestamp
|
V
(A, 1, [12:00, 12:01))
(A, 1, [12:00, 12:01))
(A, 1, [12:00, 12:01))
(B, 1, [12:00, 12:01))
(B, 1, [12:01, 12:02))
(C, 1, [12:01, 12:02))
GroupByWindow
|
V
( [12:00, 12:01), [{A, 1}, {A, 1}, {A, 1}, {B, 1}])
( [12:01, 12:02), [{B, 1}, {C, 1}])
GroupByKey
|
V
( [12:00, 12:01), [{A, [1,1,1]}, {B, [1]}])
( [12:01, 12:02), [{B, [1]}, {C, [1]}])
TotalAndTopK
|
V
( [12:00, 12:01), [{A, 3}, {B, 1}])
( [12:01, 12:02), [{B, 1}, {C, 1}])
ExpandToElement
|
V
( 12:01, [{A, 3}, {B, 1}])
( 12:02, [{B, 1}, {C, 1}])

Output of this pipeline will be written into TopdAd table. Following will be scheam for this table.

Window_End_Time  AdId  Count PublisherId
12:01 A 3 xxx
12:01 B 1 xxx
12:02 B 1 xxx
12:02 C 1 xxx

ShardKey = PublisherId + AdId

Following will be API for analytics query to run the Top Ad .

/analytics/ad/top
TopAdQueryRequest {
startTimestamp: xxxx
endTimestamp: xxx
PublisherId: xxxx
}
TopAdQueryResponse {
[
{ AdId: xxx, Count: xxx},
]
}

It will run following query to get the response

SELECT AdId, Sum(Count) as Total
FROM TopAd
WHERE Window_End_Time >= $startTimestamp
AND Window_End_Time < $endTimestamp
AND PublisherId=xxx
GROUP BY AdId
ORDER BY Total DESC
LIMIT $TOP_K

Support filter

To support filter based on Location or Publisher, we will have to aggregate these separately and write it as separate rows in the storage.

Storage choice

Queue choices

Apache Kafka: We can use Kafka to store the Click events. Kafka needs predefined partition size therefore for big scale, it might hit the limit. Repartition in Kafka become a tedious job.

Spanner Queue: Spanner hosted on Google cloud manages parent and child tables concept to group the partition. This makes it suitable for ever growing data based on partition key.

AdCount/TopAd table

OLAP database for example BigQuery, Snoflakes, etc are suitable to store data and utilize to run the analytics query.

Since our aggregated data are generated as timeseries therefore we need a OLAP database that is also a timeseries for efficiency.

ClickHouse is best suitable for our choices as it is exceptionally well-suited for handling and analyzing time-series data.

Correctness check

We can leverage row data from Clicks table and run a batch job to verify the correctness of stream processing.

Happy system designing !!!

--

--

Dilip Kumar
Dilip Kumar

Written by Dilip Kumar

With 18+ years of experience as a software engineer. Enjoy teaching, writing, leading team. Last 4+ years, working at Google as a backend Software Engineer.

No responses yet