in

Узнайте, как эффективно использовать JWT с FastAPI

Если вы хотите узнать больше, ознакомьтесь с документами FastAPI. В этой статье мы поговорим о том, как можно защитить конечные точки FastAPI с помощью JWT.

FastAPI – это современный веб-фреймворк на Python для создания API. Он быстр, прост в использовании и позволяет создавать надежные и масштабируемые API.

Вот некоторые из его функций:

  • Использует “под капотом” Starlette и Pydantic для обеспечения потрясающе высокой производительности.
  • Минимизирует шаблонность с помощью простого объявления маршрутов и входных данных.
  • Отличная поддержка редактора и простой в запоминании синтаксис.
  • Он автоматически генерирует схемы OpenAPI и документы.

JWT (веб-токен JSON) предоставляет очень безопасный способ аутентификации конечных точек. Некоторые из его функций:

  • Токены доступа в кодировке JSON, имеющие криптографическую подпись.
  • Содержит такие утверждения, как эмитент, срок действия, тема и т.д.
  • Они поддаются проверке, поскольку подписаны секретным ключом.
  • Полезно для безопасной передачи информации между сторонами.

Если вы хотите узнать больше о JWT и посмотреть, как они работают, ознакомьтесь с jwt

После изучения этих основ давайте приступим к части программирования.

Установите fastapi, uvicorn, python-jose и passlib.

$ pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt]

Uvicorn – веб-сервер ASGI для Python. Это рекомендуемый сервер для FastAPI.

Python-jose для генерации и проверки токенов JWT. Вы также можете использовать PyJWT.

Passlib обрабатывает хэши паролей.

Импортируйте необходимые пакеты:

from datetime import datetime, timedelta
from typing import Union
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from typing_extensions import Annotated

Определите тестовую базу данных и секрет.

test_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "testuser@example.com",
        "hashed_password": "$2b$12$PQO42dMponRoPGWyt5co5OiZCQJcpKDFxet8n5MnkUHNNHAg/mioS",
    }
}

# openssl rand -hex 32
SECRET_KEY = "380e91edcddfe3c0733585523c7bf4b80c9e6d720787142ed9da5e6c37a29185"
ALGORITHM = "HS256"
TOKEN_EXPIRE_MINUTES = 30  

Это SECRET_KEY будет использоваться для подписи JWT. Не используйте этот ключ; сгенерируйте новый. Чтобы сгенерировать новый ключ, запустите это в терминале.

$ openssl rand -hex 32

Алгоритмом будет HS256(HMAC с SHA-256).

Создание моделей:

class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Union[str, None] = None


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None


class UserInDB(User):
    hashed_password: str

Прежде чем двигаться дальше, давайте сначала поговорим о рабочем процессе ввода паролей с точки зрения непрофессионала.

  • Пользователь введет имя пользователя и пароль и нажмет войти.
  • Клиент выполнит вызов API с этим именем пользователя и паролем, а серверная часть проверит, существует ли имя пользователя и совпадает ли пароль с тем, что есть в базе данных.
  • Серверная часть сгенерирует токен (jwt для этой статьи) с датой истечения срока действия и вернет его клиенту.
  • После входа в систему пользователь будет использовать этот токен в заголовке авторизации для выполнения последующих вызовов API.

Теперь создайте экземпляр CryptContext в passlib и схему oauth.

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")

Когда мы создаем экземпляр OAuth2PasswordBearer класса, мы передаем в tokenUrl параметр. Этот параметр содержит URL-адрес, который клиент (интерфейс, запущенный в браузере пользователя) будет использовать для отправки username и password для получения токена.

Безопасность FastAPI

Здесь tokenUrl указан относительный URL.

Теперь создайте конечную точку входа. Клиент вызовет эту конечную точку для аутентификации.

@app.post("/login", response_model=Token)
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    """Login method to authenticate"""
    
    user = authenticate_user(test_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}


def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return False
    return user


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def create_access_token(data: dict, expires_delta: timedelta):
    to_encode = data.copy()
    expire = datetime.utcnow() + expires_delta
    to_encode.update({"exp": expire})

    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

Здесь мы используем OAuth2PasswordRequestForm, который является зависимостью от класса, предоставляемой FastAPI. У него есть тело формы с именем пользователя, паролем и некоторыми другими полями, которые нам не нужны для этой статьи.

Давайте пройдемся по остальной части кода.

  • Клиент вызывает метод login, который принимает имя пользователя и пароль и пытается найти пользователя в БД, затем сопоставляет пароль с паролем в БД с помощью passlib.
  • После проверки запроса создается токен jwt с именем пользователя и сроком действия, который возвращается пользователю.

create_access_token Метод создает закодированный jwt, который использует данные SECRET_KEYALGORITHM и to_encode, у которых есть имя пользователя и срок действия для генерации токена.

Давайте попробуем получить текущего пользователя после входа в систему.

@app.get("/user/current/", response_model=User)
async def current_user_profile(current_user: Annotated[User, Depends(get_current_user)]):
    """Fetch current user"""

    return current_user

Эта конечная точка будет возвращать текущие пользовательские данные.

Пока все выглядит хорошо, но нам нужен способ это протестировать.

Запустите это в терминале:

$ uvicorn main:app --reload

 

Вы должны увидеть что-то вроде этого:

 

Swagger UI

 

Swagger UI

Нажмите “Авторизоваться”, и откроется всплывающее окно.

 

Всплывающее окно авторизации

 

Всплывающее окно авторизации

Добавьте testuser и password в качестве пароля и нажмите Авторизовать. Это позволит авторизовать пользователя. После этого попробуйте /user/current/ конечную точку для получения текущих пользовательских данных. Нажмите на Try it Out, чтобы запустить его. Вы можете открыть инструменты разработчика Chrome и проверить, что на этот раз он отправил только токен на предъявителя.

Если вам это удалось, поздравляем!

Вот полный код ниже:

from datetime import datetime, timedelta
from typing import Union
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from typing_extensions import Annotated

# openssl rand -hex 32
SECRET_KEY = "380e91edcddfe3c0733585523c7bf4b80c9e6d720787142ed9da5e6c37a29185"
ALGORITHM = "HS256"
TOKEN_EXPIRE_MINUTES = 30

test_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "testuser@example.com",
        "hashed_password": "$2b$12$PQO42dMponRoPGWyt5co5OiZCQJcpKDFxet8n5MnkUHNNHAg/mioS",
    }
}

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Union[str, None] = None

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None

class UserInDB(User):
    hashed_password: str

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")

app = FastAPI()

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta):
    to_encode = data.copy()
    expire = datetime.utcnow() + expires_delta
    to_encode.update({"exp": expire})

    # use jwt to create a token   
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            return status.HTTP_401_UNAUTHORIZED
        token_data = TokenData(username=username)
    except JWTError:
        return JWTError
    user = get_user(test_db, username=token_data.username)
    return user

@app.post("/login", response_model=Token)
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    """Login method to authenticate"""

    user = authenticate_user(test_db, form_data.username, form_data.password)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/user/current/", response_model=User)
async def current_user_profile(current_user: Annotated[User, Depends(get_current_user)]):
    """Fetch current user"""
    
    return current_user

Не стесняйтесь обращаться в комментариях с любыми вопросами или предложениями.

Автор истории Анкит Анчлия @aanchlia.

What do you think?

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

GIPHY App Key not set. Please check settings