見出し画像

FastAPIチュートリアルの「OAuth2、JWTトークンによるBearer」がよくわからなかったので、調べてみた

FastAPI公式サイトにて、学習している際、「セキュリティ入門」で躓いたので、つまりどういうことだってばよ?といった内容をLLMに質問し回答結果をまとめた記事になります。前提として、以下URLのチュートリアルを実施済みという条件にて、記事を作成しているため、FastAPI使ったことない!FastAPIのセキュリティってなんだ?という人は以下のチュートリアルを先に実施しておくことをおすすめします。

今回、解説するコードは以下のとおりです。以下のコードに基づいて自分の疑問に思った点を列挙しQA形式で、疑問に回答することで理解を深めます。

from datetime import datetime, timedelta, timezone
from typing import Union

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# JWTトークンの設定
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# ダミーのユーザーデータベース
fake_users_db = {
   "johndoe": {
       "username": "johndoe",
       "full_name": "John Doe",
       "email": "johndoe@example.com",
       "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
       "disabled": False,
   }
}

# トークンのデータモデル
class Token(BaseModel):
   access_token: str
   token_type: str

# トークンに含まれるデータモデル
class TokenData(BaseModel):
   username: Union[str, None] = None

# ユーザーのデータモデル
class User(BaseModel):
   username: str
   email: Union[str, None] = None
   full_name: Union[str, None] = None
   disabled: Union[bool, None] = None

# データベース内のユーザーデータモデル
class UserInDB(User):
   hashed_password: str

# パスワードのハッシュ化に使用するコンテキスト
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2スキーマの定義
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# FastAPIアプリケーションの初期化
app = FastAPI()

# パスワードの検証関数
def verify_password(plain_password, hashed_password):
   return pwd_context.verify(plain_password, hashed_password)

# パスワードのハッシュ化関数
def get_password_hash(password):
   return pwd_context.hash(password)

# ユーザー名からユーザー情報を取得する関数
def get_user(db, username: str):
   if username in db:
       user_dict = db[username]
       return UserInDB(**user_dict)

# ユーザーの認証関数
def authenticate_user(fake_db, username: str, password: str):
   user = get_user(fake_db, username)
   if not user:
       return False
   if not verify_password(password, user.hashed_password):
       return False
   return user

# アクセストークンの生成関数
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
   to_encode = data.copy()
   if expires_delta:
       expire = datetime.now(timezone.utc) + expires_delta
   else:
       expire = datetime.now(timezone.utc) + timedelta(minutes=15)
   to_encode.update({"exp": expire})
   encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
   return encoded_jwt

# 現在のユーザーを取得する関数
async def get_current_user(token: str = Depends(oauth2_scheme)):
   credentials_exception = HTTPException(
       status_code=status.HTTP_401_UNAUTHORIZED,
       detail="Could not validate credentials",
       headers={"WWW-Authenticate": "Bearer"},
   )
   try:
       payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
       username: str = payload.get("sub")
       if username is None:
           raise credentials_exception
       token_data = TokenData(username=username)
   except JWTError:
       raise credentials_exception
   user = get_user(fake_users_db, username=token_data.username)
   if user is None:
       raise credentials_exception
   return user

# 現在のアクティブなユーザーを取得する関数
async def get_current_active_user(current_user: User = Depends(get_current_user)):
   if current_user.disabled:
       raise HTTPException(status_code=400, detail="Inactive user")
   return current_user

# アクセストークンを取得するためのエンドポイント
@app.post("/token")
async def login_for_access_token(
   form_data: OAuth2PasswordRequestForm = Depends()
) -> Token:
   user = authenticate_user(fake_users_db, form_data.username, form_data.password)
   if not user:
       raise HTTPException(
           status_code=status.HTTP_401_UNAUTHORIZED,
           detail="Incorrect username or password",
           headers={"WWW-Authenticate": "Bearer"},
       )
   access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
   access_token = create_access_token(
       data={"sub": user.username}, expires_delta=access_token_expires
   )
   return Token(access_token=access_token, token_type="bearer")

# 現在のユーザー情報を取得するエンドポイント
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
   return current_user

# 現在のユーザーのアイテムを取得するエンドポイント
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
   return [{"item_id": "Foo", "owner": current_user.username}]

# アプリケーションの起動
if __name__ == "__main__":
   import uvicorn
   uvicorn.run("main:app", host="0.0.0.0", port=8001,reload=True)

Q:そもそもTokenってなに?

A: ユーザー認証に使用されるJWT(JSON Web Token)のアクセストークンを指します。
JWTは、ユーザーの認証情報を安全に伝達するために使用される標準的なトークンフォーマットです。JWTは、署名されたJSONオブジェクトであり、トークンの発行者、対象ユーザー、有効期限などの情報を含みます。

このコードでは、ユーザーがユーザー名とパスワードを使用して認証を行うと、サーバーはcreate_access_token関数を使用してJWTアクセストークンを生成します。
このトークンには、ユーザーの識別情報(通常はユーザーID)が含まれます。

生成されたアクセストークンは、/tokenエンドポイントを通じてクライアントに返されます。クライアントは、このトークンを後続のリクエストのAuthorizationヘッダーに含めることで、サーバーに対して認証を行います。

サーバーは、受け取ったトークンを検証し、トークンに含まれるユーザー情報を抽出します。これにより、リクエストを行ったユーザーを識別し、適切な権限を付与することができます。

トークンの主な役割は以下のとおりです:

  1. ユーザーの認証: トークンは、ユーザーが正常に認証されたことを証明します。

  2. ステートレス認証: サーバーは、トークンを使用してユーザーを識別できるため、セッション情報をサーバー側で保持する必要がありません。

  3. セキュリティ: トークンは署名されているため、改ざんを防ぐことができます。また、トークンの有効期限を設定することで、不正アクセスのリスクを軽減できます。

  4. スケーラビリティ: トークンベースの認証は、サーバーのスケーラビリティを向上させます。サーバーはセッション情報を保持する必要がないため、水平スケーリングが容易になります。

上記のコードでは、ACCESS_TOKEN_EXPIRE_MINUTES定数を使用してトークンの有効期限を設定しています。これにより、一定時間が経過した後、トークンが無効になり、ユーザーは再度認証を行う必要があります。

トークンは、ユーザー認証とセキュリティを向上させるための重要な要素であり、モダンなWebアプリケーションやAPIで広く使用されています。

Q:このコードでは、どのようにパスワード認証を行っていますか?

A:このコードでは、パスワード認証は以下の手順で行われています:

  1. ユーザーがユーザー名とパスワードを /token エンドポイントに送信します。

  2. /token エンドポイントでは、OAuth2PasswordRequestForm を使用してユーザー名とパスワードを受け取ります。

  3. 受け取ったユーザー名とパスワードを authenticate_user 関数に渡します。

  4. authenticate_user 関数では、以下の処理が行われます:

    • get_user 関数を使用して、受け取ったユーザー名に対応するユーザー情報を fake_users_db から取得します。

    • ユーザーが存在しない場合は、False を返します。

    • verify_password 関数を使用して、受け取ったパスワードとデータベースに保存されているハッシュ化されたパスワードを比較します。

    • パスワードが一致しない場合は、False を返します。

    • ユーザーが存在し、パスワードが一致する場合は、ユーザー情報を返します。

  5. authenticate_user 関数から返されたユーザー情報が存在する場合、create_access_token 関数を使用してJWTアクセストークンを生成します。

  6. 生成されたアクセストークンをクライアントに返します。

パスワードの比較には、verify_password 関数が使用されます。この関数は、passlib ライブラリの CryptContext を使用してパスワードのハッシュ化と検証を行います。

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
   return pwd_context.verify(plain_password, hashed_password)

pwd_context は、bcrypt ハッシュアルゴリズムを使用するように設定されています。verify_password 関数は、平文のパスワードとハッシュ化されたパスワードを受け取り、それらが一致するかどうかを検証します。

パスワードのハッシュ化は、get_password_hash 関数を使用して行われます。

def get_password_hash(password):
   return pwd_context.hash(password)

この関数は、平文のパスワードを受け取り、pwd_context を使用してハッシュ化されたパスワードを返します。

パスワードのハッシュ化は、パスワードを平文で保存するのではなく、一方向の暗号化アルゴリズムを使用して変換することを意味します。これにより、データベースに保存されているパスワードが漏洩した場合でも、元のパスワードを復元することが困難になります。

上記のコードでは、ユーザー情報とハッシュ化されたパスワードがダミーのデータベース(fake_users_db)に保存されています。実際のアプリケーションでは、パスワードのハッシュ化を行い、安全にデータベースに保存する必要があります。

Q:'authenticate_user'にてユーザーの入力したパスワードとDBのパスワードをどのように照合していますか?

A: authenticate_user 関数は、ユーザーから入力されたユーザー名とパスワードを引数として受け取ります。
パスワードは、ユーザーが /token エンドポイントにログイン情報を送信したときに提供されます。
以下は、authenticate_user 関数の定義です:

def authenticate_user(fake_db, username: str, password: str):
   user = get_user(fake_db, username)
   if not user:
       return False
   if not verify_password(password, user.hashed_password):
       return False
   return user

この関数では、以下の手順が行われます:

  1. get_user 関数を使用して、指定されたユーザー名に対応するユーザー情報を取得します。

  2. ユーザーが存在しない場合は、False を返します。

  3. verify_password 関数を呼び出します。この関数には、以下の2つの引数が渡されます:

    • password: ユーザーが提供した平文のパスワード

    • user.hashed_password: データベースに保存されているハッシュ化されたパスワード

  4. verify_password 関数は、平文のパスワードとハッシュ化されたパスワードを比較し、一致する場合は True、一致しない場合は False を返します。

  5. パスワードが一致しない場合は、False を返します。

  6. ユーザーが存在し、パスワードが一致する場合は、ユーザー情報を返します。

ユーザーがログイン情報を送信する /token エンドポイントでは、OAuth2PasswordRequestForm を使用してユーザー名とパスワードを取得します。

Q:本コードにおいて、Webアプリケーションとして、ユーザーによるログインを実装するためには具体的にどのような使い方が適当か?

A:ユーザーによるログインを実装するための具体的な使い方とプロセスは以下のようになります:

  1. ユーザー登録:

    • ユーザー登録用のエンドポイントを追加します(例: /users/register/)。

    • ユーザーは、登録フォームに必要な情報(ユーザー名、メールアドレス、パスワードなど)を入力します。

    • サーバーは、ユーザーの入力を検証し、get_password_hash 関数を使用してパスワードをハッシュ化し、ユーザー情報をデータベースに保存します。

  2. ログインフォームの表示:

    • フロントエンドにログインフォームを作成します。

    • フォームには、ユーザー名(またはメールアドレス)とパスワードの入力欄を用意します。

  3. ログイン情報の送信:

    • ユーザーは、ログインフォームにユーザー名とパスワードを入力し、送信ボタンをクリックします。

    • フロントエンドは、ユーザーの入力を /token エンドポイントにPOSTリクエストとして送信します。

  4. アクセストークンの取得:

    • /token エンドポイントでは、OAuth2PasswordRequestForm を使用してユーザー名とパスワードを取得します。

    • authenticate_user 関数を使用して、提供されたユーザー名とパスワードを認証します。

    • 認証が成功した場合、create_access_token 関数を使用してアクセストークン(JWT)を生成します。

    • 生成されたアクセストークンをクライアントに返却します。

  5. アクセストークンの保存:

    • フロントエンドは、サーバーから返却されたアクセストークンを保存します(通常はブラウザのローカルストレージやCookieに保存)。

  6. 認証済みリクエスト:

    • フロントエンドは、以降のリクエストでアクセストークンを使用して、サーバーに認証済みのリクエストを送信します。

    • リクエストのヘッダーに、アクセストークンを Authorization ヘッダーに Bearer トークンとして含めます。

  7. トークンの検証:

    • サーバーは、get_current_user 関数を使用して、受け取ったリクエストのアクセストークンを検証します。

    • トークンの有効性を確認し、トークンに含まれるユーザー情報を取得します。

  8. 認証済みの処理:

    • トークンが有効で、ユーザーが認証済みである場合、サーバーは要求された処理を実行します。

    • 例えば、/users/me/ エンドポイントでは、現在のユーザー情報を返します。

    • /users/me/items/ エンドポイントでは、現在のユーザーのアイテム情報を返します。

  9. エラーハンドリング:

    • 認証エラーやその他のエラーが発生した場合、適切なHTTPステータスコードとエラーメッセージをクライアントに返します。

    • フロントエンドは、エラーメッセージを適切に処理し、ユーザーにわかりやすく表示します。

  10. ログアウト:

    • フロントエンドにログアウト機能を実装します。

    • ユーザーがログアウトをリクエストした場合、フロントエンドはアクセストークンを削除します。

    • 必要に応じて、サーバー側でもトークンを無効化するエンドポイントを用意します。

上記のプロセスを踏まえて、フロントエンドとバックエンドを連携させることで、ユーザーによるログイン機能を実装できます。フロントエンドは、ログインフォームの表示、ユーザー入力の取得、サーバーとの通信を担当し、バックエンドは、ユーザー認証、トークンの生成と検証、保護されたリソースへのアクセス制御を担当します。

Q:SECRET_KEYはどうやって生成すべき?

A: Pythonで暗号学的に安全な SECRET_KEY を生成する方法の一つは、secrets モジュールを使用することです。以下は、secrets モジュールを使用して SECRET_KEY を生成する例です:

import secrets

# 32バイトの秘密鍵を生成
SECRET_KEY = secrets.token_hex(32)

上記のコードでは、secrets.token_hex(32) を使用して、32バイト(256ビット)のランダムな16進数文字列を生成しています。この値を SECRET_KEY として使用できます。

この記事が気に入ったらサポートをしてみませんか?