This project and by extension article are an on-going piece of work.

Firstly, lets define some things.

When I say site I mean a website with RBAC and user sign up, however, there should be no requirement for a password. This will hopefully be achieved through a phone which combines ‘something you have’ (The phone) and ‘something you know’ (Your phone creds).

So I want to make a site that allows for user sign up, but you don’t need a password essentially. I want to be able to go anywhere, log onto the internet on a machine and be able to use a service without the need to pull out Bitwarden and manually type in a 40 something character password.


Disclaimer.

I don’t claim that this is extremely secure. Rolling your own crypto is basically always a bad idea, and while this does use RSA so I’m not rolling my own crypto this form of authentication is provided as more of a proof of concept for me to use on some hobby sites where I am happy to treat the stored data as ‘public’ should shortcomings be found in this implementation.

Using this on your own projects comes at your own risk, you have been warned. Now, onto the fun parts!


Initial Proof of Concept Link to heading

Firstly lets make the most simple POC I can think of to see if this idea is even feasible.

Our requirements.txt file.

fastapi
uvicorn

The POC server.

from fastapi import FastAPI
from starlette import status
from starlette.requests import Request
from starlette.responses import RedirectResponse, Response

app = FastAPI()
waiting_for_auth: dict[str, str] = {"1": "a_key"}
authed: set[str] = set()


@app.middleware("http")
async def enforce_auth(request: Request, call_next):
    """Ensures all routes come under the global ratelimit"""
    user_id = request.cookies.get("user_id")
    if user_id in authed:
        return await call_next(request)

    if request.url.path in ("/", "/login"):
        return await call_next(request)

    return RedirectResponse("/")


@app.get("/")
def read_root():
    return {"status": "No auth lol"}


@app.get("/check")
def check_auth():
    return {"status": "Authed"}


@app.get("/login")
def login(response: Response, user_id: str, key: str):
    if user_id not in waiting_for_auth:
        return Response(status_code=status.HTTP_400_BAD_REQUEST)

    if waiting_for_auth[user_id] != key:
        return Response(status_code=status.HTTP_400_BAD_REQUEST)

    authed.add(user_id)
    waiting_for_auth.pop(user_id)
    response.set_cookie(key="user_id", value=user_id)
    return Response(status_code=status.HTTP_200_OK)

Now this POC doesn’t take into account much in the way of security, any in theory could brute force a login. Next up:

  • Key encryption for secure data exchange

POC for RSA based auth Link to heading

The auth flow is now essentially:

1. Client asks server to auth on behalf of USER
2. Server responds with an expected request VALUE encrypted using USER's registered public key
3. Client decrypts the VALUE using it's private key
4. Client encrypts VALUE using the SERVER's public key and sends it to the server
5. Server decrypts the VALUE using it's private key, if they match auth is a success

The requirements.txt for this stage are:

fastapi
uvicorn
pycryptodome
httpx

The following file needs to be run once in order to generate keys.

from Crypto.PublicKey import RSA


server_key_pair = RSA.generate(4096)
server_pub_key = server_key_pair.public_key()
client_key_pair = RSA.generate(4096)
client_pub_key = client_key_pair.public_key()
with open("keys/server_key_pair.txt", "wb") as o:
    o.write(server_key_pair.export_key())

with open("keys/server_pub_key.txt", "wb") as o:
    o.write(server_pub_key.export_key())

with open("keys/client_key_pair.txt", "wb") as o:
    o.write(client_key_pair.export_key())

with open("keys/client_pub_key.txt", "wb") as o:
    o.write(client_pub_key.export_key())

After that, here is the client & server code. Run the server, once it’s up, run the client. server.py

import secrets

from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from fastapi import FastAPI
from starlette import status
from starlette.requests import Request
from starlette.responses import RedirectResponse, Response


with open("poc/keys/client_pub_key.txt", "rb") as f:
    client_pub_key = RSA.import_key(f.read())

with open("poc/keys/server_key_pair.txt", "rb") as f:
    server_key_pair = RSA.import_key(f.read())

app = FastAPI()
authed: set[str] = set()
# A list of possible items the server is waiting for
# a client to return for successful auth
waiting_for_auth: dict[str, set[bytes]] = {"1": set()}


@app.middleware("http")
async def enforce_auth(request: Request, call_next):
    """Ensures all routes come under the global ratelimit"""
    user_id = request.cookies.get("user_id")
    if user_id in authed:
        return await call_next(request)

    if request.url.path in ("/", "/login", "/chal"):
        return await call_next(request)

    return RedirectResponse("/")


@app.get("/")
def read_root():
    return {"status": "No auth lol"}


@app.get("/check")
def check_auth():
    return {"status": "Authed"}


@app.get("/chal")
def get_chal_for(user_id: str):
    if user_id not in waiting_for_auth:
        return Response(status_code=status.HTTP_400_BAD_REQUEST)

    return_value: bytes = secrets.token_bytes(32)
    waiting_for_auth[user_id].add(return_value)

    encryptor = PKCS1_OAEP.new(client_pub_key)
    output = encryptor.encrypt(return_value)
    return {"expected_value": output.hex()}


@app.get("/login")
def login(response: Response, user_id: str, value: str):
    if user_id not in waiting_for_auth:
        return Response(status_code=status.HTTP_400_BAD_REQUEST)

    value_bytes = bytes.fromhex(value)
    decrypt = PKCS1_OAEP.new(server_key_pair)
    value_to_check = decrypt.decrypt(value_bytes)
    if value_to_check not in waiting_for_auth[user_id]:
        return Response(status_code=status.HTTP_400_BAD_REQUEST)

    authed.add(user_id)
    waiting_for_auth.pop(user_id)
    response.set_cookie(key="user_id", value=user_id)
    return Response(status_code=status.HTTP_200_OK)

client.py

import asyncio


import httpx
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA

with open("keys/client_key_pair.txt", "rb") as f:
    client_key_pair = RSA.import_key(f.read())

with open("keys/server_pub_key.txt", "rb") as f:
    server_pub_key = RSA.import_key(f.read())


async def main():
    port = "8000"
    base_url = f"http://localhost:{port}"
    async with httpx.AsyncClient(cookies={"user_id": "1"}) as client:
        r_1 = await client.get(f"{base_url}/check")
        assert r_1.status_code == 307

        r_2 = await client.get(f"{base_url}/chal?user_id=1")
        assert r_2.status_code == 200
        data = r_2.json()
        v = data["expected_value"]
        v = bytes.fromhex(v)
        c_d = PKCS1_OAEP.new(client_key_pair)
        data_d = c_d.decrypt(v)

        encryptor = PKCS1_OAEP.new(server_pub_key)
        request_value = encryptor.encrypt(data_d)

        r_3 = await client.get(
            f"{base_url}/login?user_id=1&value={request_value.hex()}"
        )
        assert r_3.status_code == 200

        r_4 = await client.get(f"{base_url}/check")
        assert r_4.status_code == 200


if __name__ == "__main__":
    asyncio.run(main())

This now handles secure auth exchange but still requires hard coded keys from users. The next POC should:

  • Allow for dynamic user sign up
  • Remove sensitive data from cacheable areas such as urls
  • Move sensitive data routes to POST
  • Likely hook it into a DB

Continued in part 2 where I take this POC and turn it into a fully functioning website.