This project and by extension article are an on-going piece of work.
Firstly, lets define some things.
When I say site I mean a website with RBAC and user sign up, however, there should be no requirement for a password. This will hopefully be achieved through a phone which combines ‘something you have’ (The phone) and ‘something you know’ (Your phone creds).
So I want to make a site that allows for user sign up, but you don’t need a password essentially. I want to be able to go anywhere, log onto the internet on a machine and be able to use a service without the need to pull out Bitwarden and manually type in a 40 something character password.
Disclaimer.
I don’t claim that this is extremely secure. Rolling your own crypto is basically always a bad idea, and while this does use RSA so I’m not rolling my own crypto this form of authentication is provided as more of a proof of concept for me to use on some hobby sites where I am happy to treat the stored data as ‘public’ should shortcomings be found in this implementation.
Using this on your own projects comes at your own risk, you have been warned. Now, onto the fun parts!
Initial Proof of Concept Link to heading
Firstly lets make the most simple POC I can think of to see if this idea is even feasible.
Our requirements.txt
file.
fastapi
uvicorn
The POC server.
from fastapi import FastAPI
from starlette import status
from starlette.requests import Request
from starlette.responses import RedirectResponse, Response
app = FastAPI()
waiting_for_auth: dict[str, str] = {"1": "a_key"}
authed: set[str] = set()
@app.middleware("http")
async def enforce_auth(request: Request, call_next):
"""Ensures all routes come under the global ratelimit"""
user_id = request.cookies.get("user_id")
if user_id in authed:
return await call_next(request)
if request.url.path in ("/", "/login"):
return await call_next(request)
return RedirectResponse("/")
@app.get("/")
def read_root():
return {"status": "No auth lol"}
@app.get("/check")
def check_auth():
return {"status": "Authed"}
@app.get("/login")
def login(response: Response, user_id: str, key: str):
if user_id not in waiting_for_auth:
return Response(status_code=status.HTTP_400_BAD_REQUEST)
if waiting_for_auth[user_id] != key:
return Response(status_code=status.HTTP_400_BAD_REQUEST)
authed.add(user_id)
waiting_for_auth.pop(user_id)
response.set_cookie(key="user_id", value=user_id)
return Response(status_code=status.HTTP_200_OK)
Now this POC doesn’t take into account much in the way of security, any in theory could brute force a login. Next up:
- Key encryption for secure data exchange
POC for RSA based auth Link to heading
The auth flow is now essentially:
1. Client asks server to auth on behalf of USER
2. Server responds with an expected request VALUE encrypted using USER's registered public key
3. Client decrypts the VALUE using it's private key
4. Client encrypts VALUE using the SERVER's public key and sends it to the server
5. Server decrypts the VALUE using it's private key, if they match auth is a success
The requirements.txt
for this stage are:
fastapi
uvicorn
pycryptodome
httpx
The following file needs to be run once in order to generate keys.
from Crypto.PublicKey import RSA
server_key_pair = RSA.generate(4096)
server_pub_key = server_key_pair.public_key()
client_key_pair = RSA.generate(4096)
client_pub_key = client_key_pair.public_key()
with open("keys/server_key_pair.txt", "wb") as o:
o.write(server_key_pair.export_key())
with open("keys/server_pub_key.txt", "wb") as o:
o.write(server_pub_key.export_key())
with open("keys/client_key_pair.txt", "wb") as o:
o.write(client_key_pair.export_key())
with open("keys/client_pub_key.txt", "wb") as o:
o.write(client_pub_key.export_key())
After that, here is the client & server code. Run the server, once it’s up, run the client.
server.py
import secrets
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from fastapi import FastAPI
from starlette import status
from starlette.requests import Request
from starlette.responses import RedirectResponse, Response
with open("poc/keys/client_pub_key.txt", "rb") as f:
client_pub_key = RSA.import_key(f.read())
with open("poc/keys/server_key_pair.txt", "rb") as f:
server_key_pair = RSA.import_key(f.read())
app = FastAPI()
authed: set[str] = set()
# A list of possible items the server is waiting for
# a client to return for successful auth
waiting_for_auth: dict[str, set[bytes]] = {"1": set()}
@app.middleware("http")
async def enforce_auth(request: Request, call_next):
"""Ensures all routes come under the global ratelimit"""
user_id = request.cookies.get("user_id")
if user_id in authed:
return await call_next(request)
if request.url.path in ("/", "/login", "/chal"):
return await call_next(request)
return RedirectResponse("/")
@app.get("/")
def read_root():
return {"status": "No auth lol"}
@app.get("/check")
def check_auth():
return {"status": "Authed"}
@app.get("/chal")
def get_chal_for(user_id: str):
if user_id not in waiting_for_auth:
return Response(status_code=status.HTTP_400_BAD_REQUEST)
return_value: bytes = secrets.token_bytes(32)
waiting_for_auth[user_id].add(return_value)
encryptor = PKCS1_OAEP.new(client_pub_key)
output = encryptor.encrypt(return_value)
return {"expected_value": output.hex()}
@app.get("/login")
def login(response: Response, user_id: str, value: str):
if user_id not in waiting_for_auth:
return Response(status_code=status.HTTP_400_BAD_REQUEST)
value_bytes = bytes.fromhex(value)
decrypt = PKCS1_OAEP.new(server_key_pair)
value_to_check = decrypt.decrypt(value_bytes)
if value_to_check not in waiting_for_auth[user_id]:
return Response(status_code=status.HTTP_400_BAD_REQUEST)
authed.add(user_id)
waiting_for_auth.pop(user_id)
response.set_cookie(key="user_id", value=user_id)
return Response(status_code=status.HTTP_200_OK)
client.py
import asyncio
import httpx
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
with open("keys/client_key_pair.txt", "rb") as f:
client_key_pair = RSA.import_key(f.read())
with open("keys/server_pub_key.txt", "rb") as f:
server_pub_key = RSA.import_key(f.read())
async def main():
port = "8000"
base_url = f"http://localhost:{port}"
async with httpx.AsyncClient(cookies={"user_id": "1"}) as client:
r_1 = await client.get(f"{base_url}/check")
assert r_1.status_code == 307
r_2 = await client.get(f"{base_url}/chal?user_id=1")
assert r_2.status_code == 200
data = r_2.json()
v = data["expected_value"]
v = bytes.fromhex(v)
c_d = PKCS1_OAEP.new(client_key_pair)
data_d = c_d.decrypt(v)
encryptor = PKCS1_OAEP.new(server_pub_key)
request_value = encryptor.encrypt(data_d)
r_3 = await client.get(
f"{base_url}/login?user_id=1&value={request_value.hex()}"
)
assert r_3.status_code == 200
r_4 = await client.get(f"{base_url}/check")
assert r_4.status_code == 200
if __name__ == "__main__":
asyncio.run(main())
This now handles secure auth exchange but still requires hard coded keys from users. The next POC should:
- Allow for dynamic user sign up
- Remove sensitive data from cacheable areas such as urls
- Move sensitive data routes to POST
- Likely hook it into a DB
Continued in part 2 where I take this POC and turn it into a fully functioning website.