Twitter system design

Dilip Kumar
10 min readJul 7, 2024

--

Design Twitter application with minimal feature as below

  1. User can post tweet text, url or photos
  2. User should be able to follow others
  3. User should see the tweet feed based on followings tweets and should be sorted based on timestamp (latest on the top)

Complexities of Twitter system

  1. Support 1+ billion users.
  2. Support daily active users of 200 million
  3. On avg every users makes 1 tweet in two days
  4. On avg, each user follows 200 other users
  5. Latency to load the feed should be in the range of ~200ms.

High level system design

Based on the requirement we can come up with following high level system design.

Post Tweets Service

API Design to post tweet

This service will be used to post a Tweet. Following is API design for this service.

POST /tweets
Request Payload: {
userId: xxx
content: xxxx
}
Response Payload: {
TweetId: xxxx
}

Tweets Table design

Tweet will be written into Tweets table with following schema.

TweetId  Content  CreateTimestamp  UserId
xxxx xxxx xxxxx xxxx
xxxx xxxx xxxxx xxxxx
  1. TweetId will be generated by separate UUID Generation service.
  2. TweetId will be the primary key for this table.
  3. Generally we search all tweets for given userId therefore we will use UserId as ShardKey.
  4. To support finding all the tweets for given UserId, we need secondary index on UserId column.
  5. A secondary index on CreateTimestamp will also be created to support chronological sorting of tweets.

Storage Calculation

Size of one row = TweetId(int64 ~ 8 bytes) + Content (~4KB) + CreateTimestamp (int64 ~ 8bytes) + UserId(int64 ~ 8 bytes) = 4.024KB ~ 4KB

Number of posts per user = 1 tweets in a two days ~ 15 tweets per month ~ 180 tweets per year ~ 900 tweets in 5 years

Number of tweets in 5 years = 1 billion users & 900 tweets = 900 billion tweets

Total tweets size = 900 billion * 4kb = 3.6 Peta Bytes ~ 4 Peta bytes

Avg size of shards ~ 400GB

Total number of shards = 4000000gb/400gb ~ 40000 shards

Tweet with media

User can also post tweet with images or videos etc. To support media, we will need a separate Upload Service. Client will first make RPC call to the upload service which will return the Url for the given media. Then same Url will be used as Tweet body to call Post service to create the tweet. Following is API design for upload service.

POST /media/upload
RequestPayload: {
UserId:xxxx
MultipartFile: yyyyy
}
ResponsePayload: {
Url: xxxx
}

Note: Since application will only support a small files in range of few KB to post as tweet therefore upload service will follow simple design to store entire file as single file. In future if there would be need to support larger file of size in range of GB then we will have to redesign upload service to support chunks mechanism.

Size of media ~ 100kb

Total number of media = 1% of tweets

Total size of media in 5 years = 900 billion tweets * 0.01 * 100kb = 9 billion * 100kb = 900 tera bytes

Avg size of shard = 400GB

Number of shards = 900000gb/400gb = 2250

Follower Service

Follower service will expose following API to allow user to start following other user.

POST /following
RequestPayload {
FollowingUserId: xxxx // This is target user that current user want to follow
UserId: yyyy // This is current user
}
ResponsePayload {
}

Data will be stored into UserFollowing table. Following will be schema for this table.

UserId        FollowingUserId
xxx xxxx
x_user Trump_user

UserId+FollowingUserId will be the composite primary key. To keep all following data at single machine, we will shard based on UserId

UserFollowing table will be used to find following

  1. List of user’s followings
  2. List of user’s followers

List User Following API

We will need following API for user followings

GET /users/followings
RequestPayload: {
UserId: xxx
}
ResponsePayload: {
data: [xxx,xxx,xxx,xxx] // List of userId
pageSize: xxx
startPage: xxxx
}

We need to run following SQL to get the list of followers.

SELECT FollowingUserId // Trump_user, Biden_user, Pichai_user, y_user
FROM UserFollowings
WHERE UserId=xxx; // For user_xxx, get the list of following users

Above query will run efficiently due to following reasons

  1. sharded based on UserId therefore query to be executed on single machine.
  2. Index onUserId will help to run faster.

User Followers API

We will need following API for user followings.

GET /users/followers
RequestPayload: {
UserId: xxx
}
ResponsePayload: {
data: [xxx,xxx,xxx,xxx] // List of userId
pageSize: xxx
startPage: xxxx
}

We need to run following SQL to get the list of followers.

SELECT UserId // user_xx, user_yy
FROM UserFollowings
WHERE FollowingUserId=xxxx; // for trump_user, get the list of followers

Above query will not run efficiently due to following reasons.

  1. Since sharding is based on UserId but query is based on FollowingId therefore query can be executed on more than one machine.
  2. UserId+FollowingUserId as composite primary key only allow UserId based query to use partial index. Running query on FollowerId will not use any index i.e. it will do full table scan.
  3. We can try to add secondary index on FollowerId but it will still be inefficient

We can take following approaches to optimize this query.

Approach #1: Denorm table into UserFollowers and UserFollowings

Along with keeping data in UserFollowings table, we can also write into UserFollowers table.

UserId    FollowerId
xxx xxxx
xxx xxxx
x_user Trump_user
x_user Biden_user

We can shard data based on UserId and choose UserId+FollowerId as composite primary key. Following will be query to read the list of followings.

SELECT FollowerId // Trump_user, Biden_user, Pichai_user
FROM UserFollowers
WHERE UserId=xxxx; // user_x

There are two ways we can write data into new table UserFollowers.

  1. Use Transaction to write into both tables: Database like Spanner does support distributed transaction across the shard. Therefore we can leverage it and write into these two table in same transaction. If Spanner is not available then this approach will not work.
  2. We can use CDC (Change Data Capture) on UserFollowings table and stream changes on Kafka topic. Subscriber of Kafka topic will take care of writing into UserFollowers table. This solution will lead to eventual consistencies. Therefore might not be a good fit.

Approach #2: User Graph database

We can use Graph database to store relationship. Though it will make both query efficient but it will create problem for feed generation as Tweets table will still lives in relational database. Therefore not a good choice for this application.

Approach #3: User Relationship column

Add Relationship column into UserFollowers table and store data for both followers and followings. This will help to write data in same transaction and also shard on UserId will help to run query on the same machine.

Based on #4 approach, following will be schema for this UserFollowers table.

SourceUserId  TargetUserId  Relationship
A B FOLLOWER # Use enum, 0: FOLLOWER
B A FOLLOWING # 1: FOLLOWING
A C FOLLOWER
C A FOLLOWING

Now followers query can be written as below.

SELECT TargetUserId
FROM UserFollowers
WHERE SourceUserId=xxx // Trump_user
AND Relationship = 0;

Following query can be written as below.

SELECT TargetUserId // Trump_user, Biden_user, Pichai_user
FROM UserFollowers
WHERE SourceUserId=xxx // x_user
AND Relationship = 1;

Feed Service

This service will be used to generate the feed for given user. This will use Tweets and UserFollowers table to generate the feed. Following is high level algorithm to generate the feeds.

  1. Get the list of users the given user is following.
  2. Get the list of tweets posted by those following users.
  3. Merge all the tweets.
  4. Sort all tweets based on CreateTimestamp descending order to return latest tweet first.
  5. Return paginated data.

Following is API design for this service.

GET /feed/<user_id>
{
data: [xxx,xxx,xxx] // TweetId
Pagesize: xxx
StartPage: xxx
}

Following is sample SQL paginated query to fetch the last 20 tweets feed for user.

SELECT T.TweetId, T.Content, T.UserId, T.CreateTimestamp 
FROM Tweets T, UserFollowers F
WHERE F.SourceUserId = <user_id>
AND F.Relationship = 1 // Following
AND T.UserId = F.TargetUserId
AND T.CreateTimestamp >= $TIMESTAMP
ORDER BY T.CreateTimestamp DESC
LIMIT 20

Following are problem with this design

  1. Read on Tweets table is expensive due to join on two table
  2. Read on UsersFollowers can be optimized but may create problem in future if following size grows
  3. Recent posts will be missing in the feed.

Offline feed generation

Instead of generating feed at runtime, we can generate it offline and pre cache into memory to serve the request.

  1. Batch processing: We can run a Map/Reduce job to run on bounded data to produce feed in specific intervals for each user.
  2. Stream processing: Every time a user post a tweet, we can run stream processing to update the cache feed for each followers.

We can use Google Dataflow or Apache Flink which provides to use same pipeline to write both Batch processing and Stream processing.

In this case, we will use Batch processing as we need to scan bounded data written into Tweets and UserFollowers table. This job will run once in every 24 hours and update the cache.

Use batch processing to generate feed and write into cache

Use Map/Reduce batch processing job to generate feed for each user and write into cache to guarantee that user will always have feed available to read from in-memory distributed cache. Following is structure for cache.

UserId Feed                                                   FeedTimestamp
U1 [{TweetId:xxx, Content: xxx, UserId: xxx, Timestamp:xxx}},{},{}] T1
U1 [{TweetId:xxx, Content: xxx, UserId: xxx, Timestamp:xxx}},{},{}] T2
U1 [{TweetId:xxx, Content: xxx, UserId: xxx, Timestamp:xxx}},{},{}] T3
U1 [{TweetId:xxx, Content: xxx, UserId: xxx, Timestamp:xxx}},{},{}] T4
U1 [{TweetId:xxx, Content: xxx, UserId: xxx, Timestamp:xxx}},{},{}] T5
  1. Batch feed generation will maintain the last timestamp when feed was generated for given user in cache.
  2. Whenever batch job need to generate the feed for a user, they will first query to see what was the last time the feed was generated for that user. Then, new feed data would be generated from that time onwards.
  3. On cache miss it will query the backend.

How many feed items should we store in memory for a user’s feed?

  1. If we assume that one page of a user’s feed has 20 posts and most of the users never browse more than ten pages of their feed, we can decide to store only 200 posts per user.
  2. For any user who wants to see more posts (more than what is stored in memory), we can always query backend servers.

Should we generate feeds for all users?

  1. There will be a lot of users that don’t log-in frequently.
  2. A more straightforward approach could be, to use an LRU based cache that can remove users from memory that haven’t accessed their feed for a long time
  3. A smarter solution can figure out the login pattern of users to pre-generate their feed, e.g., at what time of the day a user is active and which days of the week does a user access their feed? etc

Drawback to read the feed from cache

Since batch processing is a time taking process therefore feed read from cache will miss the recently posted tweets from other users.

Push posted tweets to follower at realtime

Once a user has published a post, we can immediately push this post to all the followers.

Client and server can establish a bidirectional streaming connection. This will allow server to push Tweets to connected clients.

Drawback of realtime push

There are many users with millions of followers. For example Trump has 100 million followers. If we consider that 20% of Trump’s follower will be online then we need to dispatch tweet posted by Trump to 20 million users.

Above design will not scale to publish events to 20 million users at realtime.

Handle Celebrity user

We can skip Push model for Celebrity user. I.e. if Celebrity user makes a post then we will not push to their followers at realtime.

Instead, we will allow followers to directly pull celebrity user’s tweets from server.

Feed service will take care of merging result and returning it.

Instead of reading the celebrity user’s tweets directly from database, we can also pre-cache the celebrity user’s post and let feed service to read from cache. We can use stream processing to update the cache for celbrety user’s tweets. For stream processing, we can either use Flink or Google Dataflow.

Since Celebrity Feed cache is specific to celebrity user only i.e. all of it’s follower will be consuming same celebrity feed therefore to optimize it further, we can use the Edge server to distribute the celebrity cache to it’s follower.

Overall strategies to publish feed to user’s timeline

  1. One first time page load, server will make call to Feed Service to get the first page of feed data that will be used to render the first screen for user. This will also include the session server details client needs to establish connection for streaming.
  2. Client will render the feed.
  3. Client will establish bidirectional streaming connection to given target session server.
  4. On establishing the streaming connection, server will start publishing realtime tweets for this user.
  5. If user scrolls the page then it will make HTTP API call to pull the next page of feed from Feed Service.

Hope you learned something. Happy designing systems :-)

--

--

Dilip Kumar
Dilip Kumar

Written by Dilip Kumar

With 18+ years of experience as a software engineer. Enjoy teaching, writing, leading team. Last 4+ years, working at Google as a backend Software Engineer.

No responses yet