現(xiàn)在讓我們接著上一章繼續(xù)開發(fā),并添加缺少的部分以實(shí)現(xiàn)一個(gè)完整的安全性流程。
我們將使用 FastAPI 的安全性實(shí)用工具來獲取 username 和 password。
OAuth2 規(guī)定在使用(我們打算用的)「password 流程」時(shí),客戶端/用戶必須將 username 和 password 字段作為表單數(shù)據(jù)發(fā)送。
而且規(guī)范明確了字段必須這樣命名。因此 user-name 或 email 是行不通的。
不過不用擔(dān)心,你可以在前端按照你的想法將它展示給最終用戶。
而且你的數(shù)據(jù)庫模型也可以使用你想用的任何其他名稱。
但是對(duì)于登錄路徑操作,我們需要使用這些名稱來與規(guī)范兼容(以具備例如使用集成的 API 文檔系統(tǒng)的能力)。
規(guī)范還寫明了 username 和 password 必須作為表單數(shù)據(jù)發(fā)送(因此,此處不能使用 JSON)。
規(guī)范還提到客戶端可以發(fā)送另一個(gè)表單字段「scope」。
這個(gè)表單字段的名稱為 scope(單數(shù)形式),但實(shí)際上它是一個(gè)由空格分隔的「作用域」組成的長字符串。
每個(gè)「作用域」只是一個(gè)字符串(中間沒有空格)。
它們通常用于聲明特定的安全權(quán)限,例如:
Info
在 OAuth2 中「作用域」只是一個(gè)聲明所需特定權(quán)限的字符串。
它有沒有 : 這樣的其他字符或者是不是 URL 都沒有關(guān)系。
這些細(xì)節(jié)是具體的實(shí)現(xiàn)。
對(duì) OAuth2 來說它們就只是字符串而已。
現(xiàn)在,讓我們使用 FastAPI 提供的實(shí)用工具來處理此問題。
首先,導(dǎo)入 OAuth2PasswordRequestForm,然后在 token 的路徑操作中通過 Depends 將其作為依賴項(xiàng)使用。
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
OAuth2PasswordRequestForm 是一個(gè)類依賴項(xiàng),聲明了如下的請(qǐng)求表單:
Tip
OAuth2 規(guī)范實(shí)際上要求 grant_type 字段使用一個(gè)固定的值 password,但是 OAuth2PasswordRequestForm 沒有作強(qiáng)制約束。
如果你需要強(qiáng)制要求這一點(diǎn),請(qǐng)使用 OAuth2PasswordRequestFormStrict 而不是 OAuth2PasswordRequestForm。
Info
OAuth2PasswordRequestForm 并不像 OAuth2PasswordBearer 一樣是 FastAPI 的一個(gè)特殊的類。
OAuth2PasswordBearer 使得 FastAPI 明白它是一個(gè)安全方案。所以它得以通過這種方式添加到 OpenAPI 中。
但 OAuth2PasswordRequestForm 只是一個(gè)你可以自己編寫的類依賴項(xiàng),或者你也可以直接聲明 Form 參數(shù)。
但是由于這是一種常見的使用場(chǎng)景,因此 FastAPI 出于簡(jiǎn)便直接提供了它。
Tip
類依賴項(xiàng) OAuth2PasswordRequestForm 的實(shí)例不會(huì)有用空格分隔的長字符串屬性 scope,而是具有一個(gè) scopes 屬性,該屬性將包含實(shí)際被發(fā)送的每個(gè)作用域字符串組成的列表。
在此示例中我們沒有使用 scopes,但如果你需要的話可以使用該功能。
現(xiàn)在,使用表單字段中的 username 從(偽)數(shù)據(jù)庫中獲取用戶數(shù)據(jù)。
如果沒有這個(gè)用戶,我們將返回一個(gè)錯(cuò)誤消息,提示「用戶名或密碼錯(cuò)誤」。
對(duì)于這個(gè)錯(cuò)誤,我們使用 HTTPException 異常:
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
目前我們已經(jīng)從數(shù)據(jù)庫中獲取了用戶數(shù)據(jù),但尚未校驗(yàn)密碼。
讓我們首先將這些數(shù)據(jù)放入 Pydantic UserInDB 模型中。
永遠(yuǎn)不要保存明文密碼,因此,我們將使用(偽)哈希密碼系統(tǒng)。
如果密碼不匹配,我們將返回同一個(gè)錯(cuò)誤。
「哈?!沟囊馑际牵簩⒛承﹥?nèi)容(在本例中為密碼)轉(zhuǎn)換為看起來像亂碼的字節(jié)序列(只是一個(gè)字符串)。
每次你傳入完全相同的內(nèi)容(完全相同的密碼)時(shí),你都會(huì)得到完全相同的亂碼。
但是你不能從亂碼轉(zhuǎn)換回密碼。
如果你的數(shù)據(jù)庫被盜,小偷將無法獲得用戶的明文密碼,只有哈希值。
因此,小偷將無法嘗試在另一個(gè)系統(tǒng)中使用這些相同的密碼(由于許多用戶在任何地方都使用相同的密碼,因此這很危險(xiǎn))。
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
UserInDB(**user_dict) 表示:
直接將 user_dict 的鍵和值作為關(guān)鍵字參數(shù)傳遞,等同于:
UserInDB(
username = user_dict["username"],
email = user_dict["email"],
full_name = user_dict["full_name"],
disabled = user_dict["disabled"],
hashed_password = user_dict["hashed_password"],
)
Info
有關(guān) user_dict 的更完整說明,請(qǐng)參閱額外的模型文檔。
token 端點(diǎn)的響應(yīng)必須是一個(gè) JSON 對(duì)象。
它應(yīng)該有一個(gè) token_type。在我們的例子中,由于我們使用的是「Bearer」令牌,因此令牌類型應(yīng)為「bearer」。
并且還應(yīng)該有一個(gè) access_token 字段,它是一個(gè)包含我們的訪問令牌的字符串。
對(duì)于這個(gè)簡(jiǎn)單的示例,我們將極其不安全地返回相同的 username 作為令牌。
Tip
在下一章中,你將看到一個(gè)真實(shí)的安全實(shí)現(xiàn),使用了哈希密碼和 JWT 令牌。
但現(xiàn)在,讓我們僅關(guān)注我們需要的特定細(xì)節(jié)。
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
Tip
根據(jù)規(guī)范,你應(yīng)該像本示例一樣,返回一個(gè)帶有 access_token 和 token_type 的 JSON。
這是你必須在代碼中自行完成的工作,并且要確保使用了這些 JSON 字段。
這幾乎是唯一的你需要自己記住并正確地執(zhí)行以符合規(guī)范的事情。
其余的,F(xiàn)astAPI 都會(huì)為你處理。
現(xiàn)在我們將更新我們的依賴項(xiàng)。
我們想要僅當(dāng)此用戶處于啟用狀態(tài)時(shí)才能獲取 current_user。
因此,我們創(chuàng)建了一個(gè)額外的依賴項(xiàng) get_current_active_user,而該依賴項(xiàng)又以 get_current_user 作為依賴項(xiàng)。
如果用戶不存在或處于未啟用狀態(tài),則這兩個(gè)依賴項(xiàng)都將僅返回 HTTP 錯(cuò)誤。
因此,在我們的端點(diǎn)中,只有當(dāng)用戶存在,身份認(rèn)證通過且處于啟用狀態(tài)時(shí),我們才能獲得該用戶:
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
Info
我們?cè)诖颂幏祷氐闹禐?nbsp;Bearer 的額外響應(yīng)頭 WWW-Authenticate 也是規(guī)范的一部分。
任何的 401「未認(rèn)證」HTTP(錯(cuò)誤)狀態(tài)碼都應(yīng)該返回 WWW-Authenticate 響應(yīng)頭。
對(duì)于 bearer 令牌(我們的例子),該響應(yīng)頭的值應(yīng)為 Bearer。
實(shí)際上你可以忽略這個(gè)額外的響應(yīng)頭,不會(huì)有什么問題。
但此處提供了它以符合規(guī)范。
而且,(現(xiàn)在或?qū)恚┛赡軙?huì)有工具期望得到并使用它,然后對(duì)你或你的用戶有用處。
這就是遵循標(biāo)準(zhǔn)的好處...
打開交互式文檔:http://127.0.0.1:8000/docs。
點(diǎn)擊「Authorize」按鈕。
使用以下憑證:
用戶名:johndoe
密碼:secret
在系統(tǒng)中進(jìn)行身份認(rèn)證后,你將看到:
現(xiàn)在執(zhí)行 /users/me 路徑的 GET 操作。
你將獲得你的用戶數(shù)據(jù),如:
{
"username": "johndoe",
"email": "johndoe@example.com",
"full_name": "John Doe",
"disabled": false,
"hashed_password": "fakehashedsecret"
}
如果你點(diǎn)擊鎖定圖標(biāo)并注銷,然后再次嘗試同一操作,則會(huì)得到 HTTP 401 錯(cuò)誤:
{
"detail": "Not authenticated"
}
現(xiàn)在嘗試使用未啟用的用戶,并通過以下方式進(jìn)行身份認(rèn)證:
用戶名:alice
密碼:secret2
然后嘗試執(zhí)行 /users/me 路徑的 GET 操作。
你將得到一個(gè)「未啟用的用戶」錯(cuò)誤,如:
{
"detail": "Inactive user"
}
現(xiàn)在你掌握了為你的 API 實(shí)現(xiàn)一個(gè)基于 username 和 password 的完整安全系統(tǒng)的工具。
使用這些工具,你可以使安全系統(tǒng)與任何數(shù)據(jù)庫以及任何用戶或數(shù)據(jù)模型兼容。
唯一缺少的細(xì)節(jié)是它實(shí)際上還并不「安全」。
在下一章中,你將看到如何使用一個(gè)安全的哈希密碼庫和 JWT 令牌。
更多建議: