Twitter system design
Design Twitter application with minimal feature as below
- User can post tweet text, url or photos
- User should be able to follow others
- User should see the tweet feed based on followings tweets and should be sorted based on timestamp (latest on the top)
Complexities of Twitter system
- Support 1+ billion users.
- Support daily active users of 200 million
- On avg every users makes 1 tweet in two days
- On avg, each user follows 200 other users
- Latency to load the feed should be in the range of ~200ms.
High level system design
Based on the requirement we can come up with following high level system design.
Post Tweets Service
API Design to post tweet
This service will be used to post a Tweet. Following is API design for this service.
POST /tweets
Request Payload: {
userId: xxx
content: xxxx
}
Response Payload: {
TweetId: xxxx
}
Tweets Table design
Tweet will be written into Tweets
table with following schema.
TweetId Content CreateTimestamp UserId
xxxx xxxx xxxxx xxxx
xxxx xxxx xxxxx xxxxx
- TweetId will be generated by separate UUID Generation service.
- TweetId will be the primary key for this table.
- Generally we search all tweets for given userId therefore we will use
UserId
as ShardKey. - To support finding all the tweets for given UserId, we need secondary index on
UserId
column. - A secondary index on
CreateTimestamp
will also be created to support chronological sorting of tweets.
Storage Calculation
Size of one row = TweetId(int64 ~ 8 bytes) + Content (~4KB) + CreateTimestamp (int64 ~ 8bytes) + UserId(int64 ~ 8 bytes)
= 4.024KB ~ 4KB
Number of posts per user = 1 tweets in a two days ~ 15 tweets per month ~ 180 tweets per year ~ 900 tweets in 5 years
Number of tweets in 5 years = 1 billion users & 900 tweets = 900 billion tweets
Total tweets size = 900 billion * 4kb = 3.6 Peta Bytes ~ 4 Peta bytes
Avg size of shards ~ 400GB
Total number of shards = 4000000gb/400gb ~ 40000 shards
Tweet with media
User can also post tweet with images or videos etc. To support media, we will need a separate Upload Service. Client will first make RPC call to the upload service which will return the Url for the given media. Then same Url will be used as Tweet body to call Post service to create the tweet. Following is API design for upload service.
POST /media/upload
RequestPayload: {
UserId:xxxx
MultipartFile: yyyyy
}
ResponsePayload: {
Url: xxxx
}
Note: Since application will only support a small files in range of few KB to post as tweet therefore upload service will follow simple design to store entire file as single file. In future if there would be need to support larger file of size in range of GB then we will have to redesign upload service to support chunks mechanism.
Size of media ~ 100kb
Total number of media = 1% of tweets
Total size of media in 5 years = 900 billion tweets * 0.01 * 100kb = 9 billion * 100kb = 900 tera bytes
Avg size of shard = 400GB
Number of shards = 900000gb/400gb = 2250
Follower Service
Follower service will expose following API to allow user to start following other user.
POST /following
RequestPayload {
FollowingUserId: xxxx // This is target user that current user want to follow
UserId: yyyy // This is current user
}
ResponsePayload {
}
Data will be stored into UserFollowing
table. Following will be schema for this table.
UserId FollowingUserId
xxx xxxx
x_user Trump_user
UserId+FollowingUserId
will be the composite primary key. To keep all following data at single machine, we will shard based on UserId
UserFollowing
table will be used to find following
- List of user’s followings
- List of user’s followers
List User Following API
We will need following API for user followings
GET /users/followings
RequestPayload: {
UserId: xxx
}
ResponsePayload: {
data: [xxx,xxx,xxx,xxx] // List of userId
pageSize: xxx
startPage: xxxx
}
We need to run following SQL to get the list of followers.
SELECT FollowingUserId // Trump_user, Biden_user, Pichai_user, y_user
FROM UserFollowings
WHERE UserId=xxx; // For user_xxx, get the list of following users
Above query will run efficiently due to following reasons
- sharded based on
UserId
therefore query to be executed on single machine. - Index on
UserId
will help to run faster.
User Followers API
We will need following API for user followings.
GET /users/followers
RequestPayload: {
UserId: xxx
}
ResponsePayload: {
data: [xxx,xxx,xxx,xxx] // List of userId
pageSize: xxx
startPage: xxxx
}
We need to run following SQL to get the list of followers.
SELECT UserId // user_xx, user_yy
FROM UserFollowings
WHERE FollowingUserId=xxxx; // for trump_user, get the list of followers
Above query will not run efficiently due to following reasons.
- Since sharding is based on
UserId
but query is based onFollowingId
therefore query can be executed on more than one machine. UserId+FollowingUserId
as composite primary key only allowUserId
based query to use partial index. Running query onFollowerId
will not use any index i.e. it will do full table scan.- We can try to add secondary index on
FollowerId
but it will still be inefficient
We can take following approaches to optimize this query.
Approach #1: Denorm table into UserFollowers and UserFollowings
Along with keeping data in UserFollowings
table, we can also write into UserFollowers
table.
UserId FollowerId
xxx xxxx
xxx xxxx
x_user Trump_user
x_user Biden_user
We can shard data based on UserId
and choose UserId+FollowerId
as composite primary key. Following will be query to read the list of followings.
SELECT FollowerId // Trump_user, Biden_user, Pichai_user
FROM UserFollowers
WHERE UserId=xxxx; // user_x
There are two ways we can write data into new table UserFollowers
.
- Use Transaction to write into both tables: Database like
Spanner
does support distributed transaction across the shard. Therefore we can leverage it and write into these two table in same transaction. If Spanner is not available then this approach will not work. - We can use CDC (Change Data Capture) on
UserFollowings
table and stream changes on Kafka topic. Subscriber of Kafka topic will take care of writing intoUserFollowers
table. This solution will lead to eventual consistencies. Therefore might not be a good fit.
Approach #2: User Graph database
We can use Graph database to store relationship. Though it will make both query efficient but it will create problem for feed generation as Tweets
table will still lives in relational database. Therefore not a good choice for this application.
Approach #3: User Relationship column
Add Relationship
column into UserFollowers
table and store data for both followers and followings. This will help to write data in same transaction and also shard on UserId
will help to run query on the same machine.
Based on #4 approach, following will be schema for this UserFollowers
table.
SourceUserId TargetUserId Relationship
A B FOLLOWER # Use enum, 0: FOLLOWER
B A FOLLOWING # 1: FOLLOWING
A C FOLLOWER
C A FOLLOWING
Now followers query can be written as below.
SELECT TargetUserId
FROM UserFollowers
WHERE SourceUserId=xxx // Trump_user
AND Relationship = 0;
Following query can be written as below.
SELECT TargetUserId // Trump_user, Biden_user, Pichai_user
FROM UserFollowers
WHERE SourceUserId=xxx // x_user
AND Relationship = 1;
Feed Service
This service will be used to generate the feed for given user. This will use Tweets and UserFollowers table to generate the feed. Following is high level algorithm to generate the feeds.
- Get the list of users the given user is following.
- Get the list of tweets posted by those following users.
- Merge all the tweets.
- Sort all tweets based on CreateTimestamp descending order to return latest tweet first.
- Return paginated data.
Following is API design for this service.
GET /feed/<user_id>
{
data: [xxx,xxx,xxx] // TweetId
Pagesize: xxx
StartPage: xxx
}
Following is sample SQL paginated query to fetch the last 20 tweets feed for user.
SELECT T.TweetId, T.Content, T.UserId, T.CreateTimestamp
FROM Tweets T, UserFollowers F
WHERE F.SourceUserId = <user_id>
AND F.Relationship = 1 // Following
AND T.UserId = F.TargetUserId
AND T.CreateTimestamp >= $TIMESTAMP
ORDER BY T.CreateTimestamp DESC
LIMIT 20
Following are problem with this design
- Read on Tweets table is expensive due to join on two table
- Read on UsersFollowers can be optimized but may create problem in future if following size grows
- Recent posts will be missing in the feed.
Offline feed generation
Instead of generating feed at runtime, we can generate it offline and pre cache into memory to serve the request.
- Batch processing: We can run a Map/Reduce job to run on bounded data to produce feed in specific intervals for each user.
- Stream processing: Every time a user post a tweet, we can run stream processing to update the cache feed for each followers.
We can use Google Dataflow or Apache Flink which provides to use same pipeline to write both Batch processing and Stream processing.
In this case, we will use Batch processing as we need to scan bounded data written into Tweets
and UserFollowers
table. This job will run once in every 24 hours and update the cache.
Use batch processing to generate feed and write into cache
Use Map/Reduce batch processing job to generate feed for each user and write into cache to guarantee that user will always have feed available to read from in-memory distributed cache. Following is structure for cache.
UserId Feed FeedTimestamp
U1 [{TweetId:xxx, Content: xxx, UserId: xxx, Timestamp:xxx}},{},{}] T1
U1 [{TweetId:xxx, Content: xxx, UserId: xxx, Timestamp:xxx}},{},{}] T2
U1 [{TweetId:xxx, Content: xxx, UserId: xxx, Timestamp:xxx}},{},{}] T3
U1 [{TweetId:xxx, Content: xxx, UserId: xxx, Timestamp:xxx}},{},{}] T4
U1 [{TweetId:xxx, Content: xxx, UserId: xxx, Timestamp:xxx}},{},{}] T5
- Batch feed generation will maintain the last timestamp when feed was generated for given user in cache.
- Whenever batch job need to generate the feed for a user, they will first query to see what was the last time the feed was generated for that user. Then, new feed data would be generated from that time onwards.
- On cache miss it will query the backend.
How many feed items should we store in memory for a user’s feed?
- If we assume that one page of a user’s feed has 20 posts and most of the users never browse more than ten pages of their feed, we can decide to store only 200 posts per user.
- For any user who wants to see more posts (more than what is stored in memory), we can always query backend servers.
Should we generate feeds for all users?
- There will be a lot of users that don’t log-in frequently.
- A more straightforward approach could be, to use an LRU based cache that can remove users from memory that haven’t accessed their feed for a long time
- A smarter solution can figure out the login pattern of users to pre-generate their feed, e.g., at what time of the day a user is active and which days of the week does a user access their feed? etc
Drawback to read the feed from cache
Since batch processing is a time taking process therefore feed read from cache will miss the recently posted tweets from other users.
Push posted tweets to follower at realtime
Once a user has published a post, we can immediately push this post to all the followers.
Client and server can establish a bidirectional streaming connection. This will allow server to push Tweets to connected clients.
Drawback of realtime push
There are many users with millions of followers. For example Trump has 100 million followers. If we consider that 20% of Trump’s follower will be online then we need to dispatch tweet posted by Trump to 20 million users.
Above design will not scale to publish events to 20 million users at realtime.
Handle Celebrity user
We can skip Push model for Celebrity user. I.e. if Celebrity user makes a post then we will not push to their followers at realtime.
Instead, we will allow followers to directly pull celebrity user’s tweets from server.
Feed service will take care of merging result and returning it.
Instead of reading the celebrity user’s tweets directly from database, we can also pre-cache the celebrity user’s post and let feed service to read from cache. We can use stream processing to update the cache for celbrety user’s tweets. For stream processing, we can either use Flink or Google Dataflow.
Since Celebrity Feed cache is specific to celebrity user only i.e. all of it’s follower will be consuming same celebrity feed therefore to optimize it further, we can use the Edge server to distribute the celebrity cache to it’s follower.
Overall strategies to publish feed to user’s timeline
- One first time page load, server will make call to Feed Service to get the first page of feed data that will be used to render the first screen for user. This will also include the session server details client needs to establish connection for streaming.
- Client will render the feed.
- Client will establish bidirectional streaming connection to given target session server.
- On establishing the streaming connection, server will start publishing realtime tweets for this user.
- If user scrolls the page then it will make HTTP API call to pull the next page of feed from Feed Service.
Hope you learned something. Happy designing systems :-)