Sai Sasank Y

Writing a HTTP Server from scratch in less than 200 lines

A TCP server

First, we create a TCP server. A socket is a networking end-point on your device allowing communication between processes over network. The TCP (Transmission Control Protocol) server builds on the socket implementing specific rules for communication. By following the rules the TCP enforces, we get in return reliability and error-checking (among other things) for our communication.

import socket
server = socket.create_server(("localhost", 4221), reuse_port=True)
client_conn, client_addr = server.accept()

Our TCP server is bound to localhost on port 4221 and waits for an incoming connection. The use of localhost implies it can only be reached from my local machine and it is identified by a port number which is a used to identify specific service on the localhost. You typically have 216 ports, so you can setup multiple servers on your machine.

server.accept() will block the program execution until a client connects to our server.

First HTTP response

HTTP is a communication protocol between two entities: client and server. Client sends requests to which the server responds. Now, we will implement a very basic and boring response saying everything is OK (response code) ignoring what the client is requesting. HTTP responses have a specific format, see HTTP responses. Response codes indicate whether a specific HTTP request from the client has been successfully completed. Read more here. We will send the 200 OK response to the client. Additionally, we can send some headers and some more information as part of response body, but we will ignore them for now.

client_conn, client_addr = server_socket.accept()    
client_conn.sendall(b"HTTP/1.1 200 OK\r\n\r\n")

Reading exact url from client request

Next, we will make a blocking call and wait to receive data from the client. We assume client will send data in a HTTP request format. We will specifically look at the request target (the url the client is requesting as part of its GET request) and respond with 200 OK if it's the root page and respond with 404 Not Found if it's anything else.

http_req = client_conn.recv(4096).decode()
req_line = http_req.split("\r\n")[0]
req_target = req_line.split(" ")[1]
if req_target == "/":
    client_conn.sendall(b"HTTP/1.1 200 OK\r\n\r\n")
else:
    client_conn.sendall(b"HTTP/1.1 404 Not Found\r\n\r\n")

Implementing a simple echo endpoint

We will extend the code to handle a simple endpoint that echoes back to the user whatever they send in the request body. This endpoint will be at /echo and can be called as follows GET /echo/abc and the user should receive abc as part of the response. Similar to HTTP requests, HTTP responses also have a specific format. See HTTP responses.

if req_target == "/":
    client_conn.sendall(b"HTTP/1.1 200 OK\r\n\r\n")
elif req_target.startswith("/echo/"):
    response_body = req_target[6:]  # extract what comes after /echo/
    response_headers = (
            f"Content-Type:text/plain\r\nContent-Length:{len(response_body)}\r\n"
    )
    response = f"HTTP/1.1 200 OK\r\n{response_headers}\r\n{response_body}"
    client_conn.sendall(response.encode())

Reading a request header

Let's implement a /user-agent endpoint that reads the user-agent request header from a client's request and responds with a 200 OK and with the header value as response body.

First, we split the http request.

http_req_parts = http_req.split("\r\n")
req_line = http_req_parts[0]
req_headers = http_req_parts[1:]
req_target = req_line.split(" ")[1]

Then, handle the case when the requested end-point is /user-agent.

elif req_target.startswith("/user-agent"):
    user_agent_req_header = list(
            filter(
                   lambda x: x.lower().startswith("user-agent:"), req_headers
            )
    )[0]
    response_body = user_agent_req_header[len("user-agent:"):].strip()
    response_headers = (
            f"Content-Type:text/plain\r\nContent-Length:{len(response_body)}\r\n"
    )
    response = f"HTTP/1.1 200 OK\r\n{response_headers}\r\n{response_body}"
    client_conn.sendall(response.encode())

Now feels like a good point to refactor the code a little bit, before adding more features.

Refactored code
import socket

HOME_ENDPOINT = "/"
ECHO_ENDPOINT = "/echo"
USER_AGENT_ENDPOINT = "/user-agent"

def handle_request(request, client_connection, client_address):
    request_parts = request.split("\r\n")
    request_line = request_parts[0]
    request_target = request_line.split(" ")[1]
    print(request_line)
    if request_target == HOME_ENDPOINT:
        response = "HTTP/1.1 200 OK\r\n\r\n"
    elif request_target.startswith(ECHO_ENDPOINT):
        response = get_response_for_echo(request_parts)
    elif request_target == USER_AGENT_ENDPOINT:
        response = get_response_for_useragent(request_parts)
    else:
        response = "HTTP/1.1 404 Not Found\r\n\r\n"
    client_connection.sendall(response.encode())


def get_response_for_echo(request_parts):
    request_line = request_parts[0]
    request_target = request_line.split(" ")[1]
    response_body = request_target[6:]  # extract what comes after /echo/
    response_headers = (
        f"Content-Type:text/plain\r\nContent-Length:{len(response_body)}\r\n"
    )
    response = f"HTTP/1.1 200 OK\r\n{response_headers}\r\n{response_body}"
    return response


def get_response_for_useragent(request_parts):
    request_headers = request_parts[1:]
    user_agent_request_header = list(
        filter(
            lambda x: x.lower().startswith("user-agent:"),
            request_headers
        )
    )[0]
    response_body = user_agent_request_header[len("user-agent:"):].strip()
    response_headers = (
        f"Content-Type:text/plain\r\nContent-Length:{len(response_body)}\r\n"
    )
    response = f"HTTP/1.1 200 OK\r\n{response_headers}\r\n{response_body}"
    return response


def main():
    server_socket = socket.create_server(("localhost", 4221), reuse_port=True)
    client_connection, client_address = server_socket.accept()

    request = client_connection.recv(4096).decode()
    handle_request(request, client_connection, client_address)


if __name__ == "__main__":
    main()

Concurrent connections to handle multiple client requests

Great! Now we will make the server a bit more useful by handling multiple client requests. The idea is to use a thread pool to asynchronously process requests. We will run an infinite loop where we submit new requests to the thread pool executor and continue listening for new requests. We will write a new helper function to handle each client request and adjust the main function to use a thread pool.

def client_handler(client_connection, client_address):
    try:
        request = client_connection.recv(4096).decode()
        handle_request(request, client_connection, client_address)
    except Exception as e:
        print(f"Error handling client {client_address}: {e}")
    finally:
        client_connection.close()
def main():
    server_socket = socket.create_server(("localhost", 4221), reuse_port=True)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        try:
            while True:
                client_connection, client_address = server_socket.accept()
                executor.submit(
                    client_handler,
                    client_connection,
                    client_address,
                )
        except KeyboardInterrupt:
            print("Shutting down server...")
        finally:
            server_socket.close()

An endpoint that serves files

Alright, now let's implement a slightly more sophisticated endpoint - \files. Basically, when user requests \files\file123.txt we look for file123.txt in a specific directory and return the contents of it. If no such file exists, we respond with 404 Not Found.

def get_response_for_files(request_parts):
    request_line = request_parts[0]
    request_target = request_line.split(" ")[1]
    filename = request_target[len(FILES_ENDPOINT):]
    filepath = DIRECTORY / Path(filename)
    if not filepath.exists():
        response = "HTTP/1.1 404 Not Found\r\n\r\n"
    else:
        with filepath.open() as f:
            response_body = f.read()
        response_headers = (
            f"Content-Type:application/octet-stream\r\nContent-Length:{len(response_body)}\r\n"
        )
        response = f"HTTP/1.1 200 OK\r\n{response_headers}\r\n{response_body}"
    return response

Receive files and persist them

Let's add support for POST request on /files endpoint, where user can submit a file to save on the server. We will respond with 201 Created.

def get_filepath(request_line):
    request_target = request_line.split(" ")[1]
    filename = request_target[len(FILES_ENDPOINT):]
    return DIRECTORY / Path(filename)

def post_files_response(request_parts):
    request_line = request_parts[0]
    filepath = get_filepath(request_line)
    request_body = request_parts[-1]
    with filepath.open("w") as f:
        f.write(request_body)
    return "HTTP/1.1 201 Created\r\n\r\n"

This feels like a reasonable place to stop and further refactor the code. I may consider extending this work later but for now I'm happy.

Complete refactored code
import argparse
import concurrent.futures
import socket
from pathlib import Path

HOME_ENDPOINT = "/"
ECHO_ENDPOINT = "/echo/"
USER_AGENT_ENDPOINT = "/user-agent"
FILES_ENDPOINT = "/files/"
DIRECTORY = Path("/tmp")

HTTP_200_OK = "HTTP/1.1 200 OK\r\n"
HTTP_201_CREATED = "HTTP/1.1 201 Created\r\n"
HTTP_404_NOT_FOUND = "HTTP/1.1 404 Not Found\r\n"

def client_handler(client_connection, client_address):
    try:
        request = client_connection.recv(4096).decode()
        handle_request(request, client_connection, client_address)
    except Exception as e:
        print(f"Error handling client {client_address}: {e}")
    finally:
        client_connection.close()


def handle_request(request, client_connection, client_address):
    request_data = parse_http_request(request)
    if request_data["target"] == HOME_ENDPOINT:
        response = build_response(HTTP_200_OK)
    elif request_data["target"].startswith(ECHO_ENDPOINT):
        response = get_echo_response(request_data)
    elif request_data["target"] == USER_AGENT_ENDPOINT:
        response = get_useragent_response(request_data)
    elif request_data["target"].startswith(FILES_ENDPOINT):
        if request_data["method"] == "GET":
            response = get_files_response(request_data)
        elif request_data["method"] == "POST":
            response = post_files_response(request_data)
    else:
        response = build_response(HTTP_404_NOT_FOUND)
    client_connection.sendall(response.encode())


def get_echo_response(request_data):
    response_body = request_data["target"][len(ECHO_ENDPOINT):]
    response_headers = {
        "Content-Type": "text/plain",
        "Content-Length": len(response_body)
    }
    return build_response(HTTP_200_OK, response_headers, response_body)


def get_useragent_response(request_data):
    request_headers = request_data["headers"]
    user_agent_request_header = list(
        filter(
            lambda x: x.lower().startswith("user-agent:"),
            request_headers
        )
    )[0]
    response_body = user_agent_request_header[len("user-agent:"):].strip()
    response_headers = {
        "Content-Type": "text/plain",
        "Content-Length": len(response_body)
    }
    return build_response(HTTP_200_OK, response_headers, response_body)


def get_files_response(request_data):
    filepath = DIRECTORY / request_data["target"][len(FILES_ENDPOINT):]
    if not filepath.exists():
        return build_response(HTTP_404_NOT_FOUND)
    else:
        with filepath.open() as f:
            response_body = f.read()
        response_headers = {
            "Content-Type": "application/octet-stream",
            "Content-Length": len(response_body)
        }
        return build_response(HTTP_200_OK, response_headers, response_body)

def post_files_response(request_data):
    filename = request_data["target"][len(FILES_ENDPOINT):]
    filepath = DIRECTORY / Path(filename)
    request_body = request_data["body"]
    with filepath.open("w") as f:
        f.write(request_body)
    return build_response(HTTP_201_CREATED)

def build_response(status_code, headers=None, body=None):
    if headers is None:
        headers = {}

    response = status_code

    for header_name, header_value in headers.items():
        response += f"{header_name}: {header_value}\r\n"

    response += "\r\n"

    if body:
        response += body

    return response

def parse_http_request(request_data):
    request_parts = request_data.split("\r\n")
    request_line = request_parts[0]
    request_line_parts = request_line.split(" ")

    if len(request_line_parts) < 3:
        return None

    method, target, _ = request_line_parts

    # Find the empty line that separates headers from body
    try:
        body_start = request_parts.index("") + 1
        headers = request_parts[1:body_start-1]
        body = request_parts[body_start] if body_start < len(request_parts) else ""
    except ValueError:
        headers = request_parts[1:]
        body = ""

    # Parse headers into a dictionary
    headers_dict = {}
    for header in headers:
        if ":" in header:
            name, value = header.split(":", 1)
            headers_dict[name.strip()] = value.strip()

    return {
        "method": method,
        "target": target,
        "headers": headers_dict,
        "body": body
    }

def main():
    server_socket = socket.create_server(("localhost", 4221), reuse_port=True)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        try:
            while True:
                client_connection, client_address = server_socket.accept()
                executor.submit(
                    client_handler,
                    client_connection,
                    client_address,
                )
        except KeyboardInterrupt:
            print("Shutting down server...")
        finally:
            server_socket.close()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        prog='HTTP Server',
        description='Runs a HTTP server',
    )
    parser.add_argument('--directory')
    args = parser.parse_args()
    DIRECTORY = Path(args.directory) if args.directory else DIRECTORY
    main()

Bonus - HTTP Compression

We can implement compression to improve network transfers. The client specifies a list of comma-separated compression schemes it supports using the Accept-Encoding request header and the server may choose (one of) the specified schemes that it supports. The response body would then be compressed using the scheme chosen by the server and this scheme is mentioned in the Content-Encoding scheme as a response header. Note that HTTP headers are case-insensitive. We will add support for one scheme - gzip. We will first enable validating the Accept-Encoding and Content-Encoding headers and actually compress the response body later.

SUPPORTED_COMPRESSION_SCHEMES = {"gzip"}

def build_response(status_code, request_headers, headers=None, body=None):
    if headers is None:
        headers = {}

    response = status_code
    if "accept-encoding" in request_headers:
        accepted_encodings = {enc.strip() for enc in request_headers["accept-encoding"].lower().split(",")}
        mutually_supported_encodings = accepted_encodings.intersection(SUPPORTED_COMPRESSION_SCHEMES)
        if len(mutually_supported_encodings):
            response += f"Content-Encoding: {next(iter(mutually_supported_encodings))}\r\n"

    for header_name, header_value in headers.items():
        response += f"{header_name}: {header_value}\r\n"

    response += "\r\n"

    if body:
        response += body

    return response

To actually implement the compression, we will use the gzip module. I have now changed the signature of build response to return encoded string instead.

import gzip

def build_response(status_code, request_headers, response_headers=None, body=None):
    if response_headers is None:
        response_headers = {}

    response = status_code
    body_compressed = None
    if "accept-encoding" in request_headers:
        accepted_encodings = {enc.strip() for enc in request_headers["accept-encoding"].lower().split(",")}
        mutually_supported_encodings = accepted_encodings.intersection(SUPPORTED_COMPRESSION_SCHEMES)
        if len(mutually_supported_encodings):
            scheme = next(iter(mutually_supported_encodings))
            if body is not None:
                if scheme == "gzip":
                    response += f"Content-Encoding: {scheme}\r\n"
                    body_compressed = gzip.compress(body.encode())
                    response_headers["Content-Length"] = len(body_compressed)
                else:
                    pass

    for header_name, header_value in response_headers.items():
        response += f"{header_name}: {header_value}\r\n"

    response += "\r\n"

    if body_compressed is not None:
        return b"".join([response.encode(), body_compressed])
    elif body is not None:
        return b"".join([response.encode() + body.encode()])
    else:
        return response.encode()
Final code
import argparse
import concurrent.futures
import gzip
import socket
from pathlib import Path


HOME_ENDPOINT = "/"
ECHO_ENDPOINT = "/echo/"
USER_AGENT_ENDPOINT = "/user-agent"
FILES_ENDPOINT = "/files/"
DIRECTORY = Path("/tmp")

HTTP_200_OK = "HTTP/1.1 200 OK\r\n"
HTTP_201_CREATED = "HTTP/1.1 201 Created\r\n"
HTTP_404_NOT_FOUND = "HTTP/1.1 404 Not Found\r\n"

SUPPORTED_COMPRESSION_SCHEMES = {"gzip"}

def client_handler(client_connection, client_address):
    try:
        request = client_connection.recv(4096).decode()
        handle_request(request, client_connection, client_address)
    except Exception as e:
        print(f"Error handling client {client_address}: {e}")
    finally:
        client_connection.close()


def handle_request(request, client_connection, client_address):
    request_data = parse_http_request(request)
    if request_data["target"] == HOME_ENDPOINT:
        response = build_response(HTTP_200_OK, request_data["headers"])
    elif request_data["target"].startswith(ECHO_ENDPOINT):
        response = get_echo_response(request_data)
    elif request_data["target"] == USER_AGENT_ENDPOINT:
        response = get_useragent_response(request_data)
    elif request_data["target"].startswith(FILES_ENDPOINT):
        if request_data["method"] == "GET":
            response = get_files_response(request_data)
        elif request_data["method"] == "POST":
            response = post_files_response(request_data)
    else:
        response = build_response(HTTP_404_NOT_FOUND, request_data["headers"])
    client_connection.sendall(response)


def get_echo_response(request_data):
    response_body = request_data["target"][len(ECHO_ENDPOINT):]
    response_headers = {
        "Content-Type": "text/plain",
        "Content-Length": len(response_body)
    }
    return build_response(HTTP_200_OK, request_data["headers"], response_headers, response_body)


def get_useragent_response(request_data):
    request_headers = request_data["headers"]
    response_body = request_headers["user-agent"]
    response_headers = {
        "Content-Type": "text/plain",
        "Content-Length": len(response_body)
    }
    return build_response(HTTP_200_OK, request_headers, response_headers, response_body)


def get_files_response(request_data):
    filepath = DIRECTORY / request_data["target"][len(FILES_ENDPOINT):]
    if not filepath.exists():
        return build_response(HTTP_404_NOT_FOUND, request_data["headers"])
    else:
        with filepath.open() as f:
            response_body = f.read()
        response_headers = {
            "Content-Type": "application/octet-stream",
            "Content-Length": len(response_body)
        }
        return build_response(HTTP_200_OK, request_data["headers"], response_headers, response_body)

def post_files_response(request_data):
    filename = request_data["target"][len(FILES_ENDPOINT):]
    filepath = DIRECTORY / Path(filename)
    request_body = request_data["body"]
    with filepath.open("w") as f:
        f.write(request_body)
    return build_response(HTTP_201_CREATED, request_headers=request_data["headers"])

def build_response(status_code, request_headers, response_headers=None, body=None):
    if response_headers is None:
        response_headers = {}

    response = status_code
    body_compressed = None
    if "accept-encoding" in request_headers:
        accepted_encodings = {enc.strip() for enc in request_headers["accept-encoding"].lower().split(",")}
        mutually_supported_encodings = accepted_encodings.intersection(SUPPORTED_COMPRESSION_SCHEMES)
        if len(mutually_supported_encodings):
            scheme = next(iter(mutually_supported_encodings))
            if body is not None:
                if scheme == "gzip":
                    response += f"Content-Encoding: {scheme}\r\n"
                    body_compressed = gzip.compress(body.encode())
                    response_headers["Content-Length"] = len(body_compressed)
                else:
                    pass

    for header_name, header_value in response_headers.items():
        response += f"{header_name}: {header_value}\r\n"

    response += "\r\n"

    if body_compressed is not None:
        return b"".join([response.encode(), body_compressed])
    elif body is not None:
        return b"".join([response.encode() + body.encode()])
    else:
        return response.encode()

def parse_http_request(request_data):
    request_parts = request_data.split("\r\n")
    request_line = request_parts[0]
    request_line_parts = request_line.split(" ")

    if len(request_line_parts) < 3:
        return None

    method, target, _ = request_line_parts

    # Find the empty line that separates headers from body
    try:
        body_start = request_parts.index("") + 1
        headers = request_parts[1:body_start-1]
        body = request_parts[body_start] if body_start < len(request_parts) else ""
    except ValueError:
        headers = request_parts[1:]
        body = ""

    # Parse headers into a dictionary
    headers_dict = {}
    for header in headers:
        if ":" in header:
            name, value = header.split(":", 1)
            headers_dict[name.strip().lower()] = value.strip()

    return {
        "method": method,
        "target": target,
        "headers": headers_dict,
        "body": body
    }

def main():
    server_socket = socket.create_server(("localhost", 4221), reuse_port=True)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        try:
            while True:
                client_connection, client_address = server_socket.accept()
                executor.submit(
                    client_handler,
                    client_connection,
                    client_address,
                )
        except KeyboardInterrupt:
            print("Shutting down server...")
        finally:
            server_socket.close()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        prog='HTTP Server',
        description='Runs a HTTP server',
    )
    parser.add_argument('--directory')
    args = parser.parse_args()
    DIRECTORY = Path(args.directory) if args.directory else DIRECTORY
    main()

PS: CodeCrafters is a fun place to build stuff from scratch!

#python #software-engineering