本文介绍JWT的组成、请求过程、优缺点以及django通过模块实现JWT认证的源码
组成
参考:https://jwt.io/introduction
调试:https://jwt.io/#debugger-io
JSON Web Token由.
间隔的3个部分组成,形式为xxxxx.yyyyy.zzzzz
,分别是:
- Header 头部
- Playload 负载
- Signature 签名
一个实际的JWT样例:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaXNzIjoiUXQ2MmFoMFZOWHJ6RHNIZzNUTGllSnVHZkpJQkJWcHUiLCJleHAiOjE2NjIwMDI0MTd9.koVrJVzyV-bBopaj5USS9kcI2w5lyNDeoenAqfUPsuQ
Header 头部
第一部分Header信息通常由两部分组成,分别是令牌类型和签名算法类型:
{
"alg": "HS256",
"typ": "JWT"
}
这个JSON信息被Base64Url编码,组成第一部分。
Payload 负载
第二部分Playload信息通常用于存放一些非敏感的信息(不做加密,只编码),claims(声明)分为三种类型:
- 注册声明(建议使用的预定义声明)
- iss:发行人
- exp:到期时间
- sub:主题
- aud:受众(哪些服务可以使用该令牌)
- nbf:在此之前不可用
- iat:发布时间
- jti:JWT ID(用于标识该JWT)
- 公开声明(避免冲突,官方已经定义好的一些字段)
参考:https://www.iana.org/assignments/jwt/jwt.xhtml - 私人声明(用户自定义的字段)
实际例子:
{
"sub": "test",
"name": "John Doe",
"iss": "Qt62ah0VNXrzDsHg3TLieJuGfJIBBVpu",
"exp": 1662002417
}
这个JSON信息被Base64Url编码,组成第二部分。
Signature 签名
第三部分Signature是对第一、第二部分的信息与一串私钥进行签名后的结果。
如果第一部分选择的签名算法为HS256
,则第三部分生成方式如下:
HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
secret
)
最终产生的签名,拼接第一、第二部分,组成JWT。
rest_framework_jwt/utils
源码:
使用JWT发送请求
每当用户想要访问受保护的路由或资源时,用户代理应该发送 JWT,通常在Authorization标头中使用Bearer类型(其它类型)。标头的内容应如下所示:
Authorization: Bearer <token>
使用curl请求:
curl https://xxxxx.com -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaXNzIjoiUXQ2MmFoMFZOWHJ6RHNIZzNUTGllSnVHZkpJQkJWcHUiLCJleHAiOjE2NjIwMjM0MTd9.B4m23TAZ3eMVg3TUuKtmYi-jaWGXg0IZS0X3UhOIijE"
请求流程
- 应用使用正确的认证凭据(用户名密码)请求认证服务器。
- 认证通过,认证服务器返回一串访问令牌。
- 应用程序使用访问令牌访问受保护的资源(如 API)。
优点
- json格式的通用性,所以JWT可以跨语言支持,比如Java、JavaScript、PHP、Node等等。
- 可以利用Payload存储一些非敏感的信息用于交换信息,有效地使用 JWT,可以降低服务器查询数据库的次数。
- 便于传输,JWT结构简单,字节占用小。
- 不需要在服务端保存会话信息,易于应用的扩展。
缺陷
- 签发了JWT后,无法修改里面的内容,想要修改必须重新签发。
- 无法废弃,一旦签发了JWT就是有效的,除非到达过期时间。(可以通过额外的逻辑做黑名单来阻止JWT通过)
- 续签比较麻烦,到达过期时间需要重新签发新的JWT。
- JWT如果携带了太多信息,会使请求体变大,甚至比请求的实际内容(body)还大。
源码解析
模块:
djangorestframework-jwt==1.11.0
获取Playload
def jwt_payload_handler(user):
username_field = get_username_field()
username = get_username(user)
warnings.warn(
'The following fields will be removed in the future: '
'`email` and `user_id`. ',
DeprecationWarning
)
payload = {
'user_id': user.pk,
'username': username,
'exp': datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA
}
if hasattr(user, 'email'):
payload['email'] = user.email
if isinstance(user.pk, uuid.UUID):
payload['user_id'] = str(user.pk)
payload[username_field] = username
# Include original issued at time for a brand new token,
# to allow token refresh
if api_settings.JWT_ALLOW_REFRESH:
payload['orig_iat'] = timegm(
datetime.utcnow().utctimetuple()
)
if api_settings.JWT_AUDIENCE is not None:
payload['aud'] = api_settings.JWT_AUDIENCE
if api_settings.JWT_ISSUER is not None:
payload['iss'] = api_settings.JWT_ISSUER
return payload
获取私钥
def jwt_get_secret_key(payload=None):
if api_settings.JWT_GET_USER_SECRET_KEY:
User = get_user_model() # noqa: N806
user = User.objects.get(pk=payload.get('user_id'))
key = str(api_settings.JWT_GET_USER_SECRET_KEY(user))
return key
return api_settings.JWT_SECRET_KEY
生成签名
def jwt_encode_handler(payload):
key = api_settings.JWT_PRIVATE_KEY or jwt_get_secret_key(payload)
return jwt.encode(
payload,
key,
api_settings.JWT_ALGORITHM
).decode('utf-8')
这里可以看出,私钥可以通过配置JWT_PRIVATE_KEY或JWT_SECRET_KEY还可以使用用户的SECRET_KEY来获取,而
JWT_SECRET_KEY
默认值就是settings
里面的SECRET_KEY
解码
def jwt_decode_handler(token):
options = {
'verify_exp': api_settings.JWT_VERIFY_EXPIRATION,
}
# get user from token, BEFORE verification, to get user secret key
unverified_payload = jwt.decode(token, None, False)
secret_key = jwt_get_secret_key(unverified_payload)
return jwt.decode(
token,
api_settings.JWT_PUBLIC_KEY or secret_key,
api_settings.JWT_VERIFY,
options=options,
leeway=api_settings.JWT_LEEWAY,
audience=api_settings.JWT_AUDIENCE,
issuer=api_settings.JWT_ISSUER,
algorithms=[api_settings.JWT_ALGORITHM]
)
编码与解码
模块:
pyjwt==1.7.1
编码步骤
def encode(self,
payload, # type: Union[Dict, bytes]
key, # type: str
algorithm='HS256', # type: str
headers=None, # type: Optional[Dict]
json_encoder=None # type: Optional[Callable]
):
# Check that we get a mapping
if not isinstance(payload, Mapping):
raise TypeError('Expecting a mapping object, as JWT only supports '
'JSON objects as payloads.')
# Payload
for time_claim in ['exp', 'iat', 'nbf']:
# Convert datetime to a intDate value in known time-format claims
if isinstance(payload.get(time_claim), datetime):
payload[time_claim] = timegm(payload[time_claim].utctimetuple()) # type: ignore
json_payload = json.dumps(
payload,
separators=(',', ':'),
cls=json_encoder
).encode('utf-8')
return super(PyJWT, self).encode(
json_payload, key, algorithm, headers, json_encoder
)
解码步骤
def decode(self,
jwt, # type: str
key='', # type: str
verify=True, # type: bool
algorithms=None, # type: List[str]
options=None, # type: Dict
**kwargs):
if verify and not algorithms:
warnings.warn(
'It is strongly recommended that you pass in a ' +
'value for the "algorithms" argument when calling decode(). ' +
'This argument will be mandatory in a future version.',
DeprecationWarning
)
payload, _, _, _ = self._load(jwt)
if options is None:
options = {'verify_signature': verify}
else:
options.setdefault('verify_signature', verify)
decoded = super(PyJWT, self).decode(
jwt, key=key, algorithms=algorithms, options=options, **kwargs
)
try:
payload = json.loads(decoded.decode('utf-8'))
except ValueError as e:
raise DecodeError('Invalid payload string: %s' % e)
if not isinstance(payload, Mapping):
raise DecodeError('Invalid payload string: must be a json object')
if verify:
merged_options = merge_dict(self.options, options)
self._validate_claims(payload, merged_options, **kwargs)
return payload
签名算法
代码在/Lib/site-packages/jwt/algorithms.py
,这里不展开