Trending hashtag system design
Design a Tweeter scale application to find out the trending hashtags in following intervals.
- Last 5 minutes
- Last 15 minutes
- Last 30 minutes
- Last 60 minutes
Top K trending in last x mins based on database
- We can first write every events into queue.
- Queue handler will pick the events and write into Hashtags table.
- Run a SQL query to find out the top K trending hash tag as per given interval.
Following is overall design.
Let’s say following is sample HashTags
table to store the hashtags and sharded by EventId
key.
EvetId HashTag Timestamp
123 A 1
124 A 1
125 A 3
345 B 4
454 C 5
344 B 6
Following will be query to find the top k frequent hashtags in last x minutes.
SELECT HashTag, Count(*) AS Frequency
FROM HashTags
WHERE Timestamp >= (t-x minutes) AND Timestamp < t
GROUP BY HashTag
Order by Frequency DESC
LIMIT K
We have following drawback in this design.
- Aggregation query on
HashTags
always run on entire dataset. This will quickly become expensive and infeasible as data will grow faster. - Even you apply index on Timestamp, still query will be expensive as it scans entire dataset.
Top K trending in last x mins based on stream processing
Since we are looking for latest trend therefore batch processing will not work here. We will need to apply the stream processing. We can take following approach.
- We can add Stream processor for every 5minutes, 15 minutes and 30 minutes based on the requirements.
- Stream processor will calculate the top K hash tag in the given
x
minutes window and write it into corresponding snapshot table. - Trending hashtags service will read the top k trending hashtags from corresponding snapshot table.
Hashtags strorage for Queue/Topic
We can use the Kafka to store the hashtags. Following will be the schema for message in the Kafka topic.
{
EventId: xxxx # partition key
ReceiveTimestamp: xxx
Payload: {
HashTag: xxx
EventTimestamp: xxx
PostId: xxx
}
}
We will use EventId
to shard the messages in the Kafka topic.
Stream processor
We can either use Apache Flink or Google Dataflow to implement the stream processor. Google Dataflow is more powerful and supported on GCP therefore this could be a good choice.
Time Domains
When processing data relating to events in time, there are two inherent domains to consider.
Event Time — It is the time at which the event (user posted Tweet) occurred.
Processing Time — It is when an event is observed at any given point during the processing within the pipeline.
Event time for a given event essentially never changes, but processing time changes constantly for each event as it flows through the pipeline and time marches ever forward.
In this problem, we will be using EventTime for all calculation purpose.
Fixed window vs Sliding window
Windowing is required here to run the aggregation logic on the given set of hashtags. There are two popular windowing supported on unbounded data.
Fixed windows (tumbling windows) are defined by static window size, e.g., hourly or daily. They are generally aligned, i.e., every window applies across all data for the corresponding time.
Sliding windows are defined by a window size and slide period, e.g., hourly windows starting every minute. The period may be less than the size, meaning the windows overlap. Sliding windows are also typically aligned.
Since we are interested in last x minutes therefore we can choose the Sliding window option with following configuration.
Window width = xmin
Sliding interval = 1 min (This can be changed based on xmin value)
Since Fixed window is easy to understand therefore I will use FixedWindow code example in rest of the doc.
Operations
Following is sample code to showcase the operations can be applied on stream data.
with beam.Pipeline() as p:
(p
| 'Read' >> beam.io.ReadFromKafkaTopic('kafka_connection_string')
| 'AddTimestamp' >> beam.Map(lambda x: beam.window.TimestampedValue(x, int(time.time()))) # Add timestamp
| 'KeyBy' >> beam.Map(lambda event: (event.getKey(), event))
| 'WindowInto' >> beam.WindowInto(TumblingEventTimeWindows.of(300)) # 5minute windows
| 'Aggregate' >> beam.CombinePerKey(TopKAggregator())
| 'Write' >> beam.io.WriteToDatabase('SnapshotTable')) # Replace with your desired output
Following is example to showcase the stream processing steps
(A, 1, 12:00)(A, 1, 12:01)(A, 1, 12:04)(B, 1, 12:03)(B, 1, 12:08)(C, 1, 12:07)
|
V
AssignWindows(Sliding(5m))
|
V
(A, 1, 12:00, [12:00, 12:05))
(A, 1, 12:01, [12:00, 12:05))
(A, 1, 12:04, [12:00, 12:05))
(B, 1, 12:03, [12:00, 12:05))
(B, 1, 12:08, [12:05, 12:10))
(C, 1, 12:07, [12:05, 12:10))
DropTimestamp
|
V
(A, 1, [12:00, 12:05))
(A, 1, [12:00, 12:05))
(A, 1, [12:00, 12:05))
(B, 1, [12:00, 12:05))
(B, 1, [12:05, 12:10))
(C, 1, [12:05, 12:10))
GroupByWindow
|
V
( [12:00, 12:05), [{A, 1}, {A, 1}, {A, 1}, {B, 1}])
( [12:10, 12:15), [{B, 1}, {C, 1}])
GroupByKey
|
V
( [12:00, 12:05), [{A, [1,1,1]}, {B, [1]}])
( [12:10, 12:15), [{B, [1]}, {C, [1]}])
TotalAndTopK
|
V
( [12:00, 12:05), [{A, 3}, {B, 1}])
( [12:10, 12:15), [{B, 1}, {C, 1}])
ExpandToElement
|
V
( 12:10, [{A, 3}, {B, 1}])
( 12:15, [{B, 1}, {C, 1}])
Triggers & Incremental Processing
Triggering determines when, in processing time, the results of groupings are emitted as panes.
There are two ways to handle the triggering output
Discarding — Upon triggering, window contents are discarded.
Accumulating — Upon triggering, window contents are left intact in a persistent state, and later results refine previous results.
To solve the HashTag trend we can use Accumulating.
Trending HashTag Service
Timestamp TopK
12:10 [{A, 3}, {B, 1}]
12:15 [{B, 1}, {C, 1}]
Trending HashTag service will query the corresponding snapshot table to fetch the top k trending hash tag in the given interval.
This post is based on Google Dataflow paper, refer to https://www.hemantkgupta.com/p/insights-from-paper-google-the-dataflow
Happy designing scalable system :-)