Introduction
JSON Web Tokens (or JWTs) provide a means of transmitting information from the client to the server in a stateless, secure way.
On the server, JWTs are generated by signing user information via a secret key, which are then securely stored on the client. This form of auth works well with modern, single page applications. For more on this, along with the pros and cons of using JWTs vs. session and cookie-based auth, please review the following articles:
- Cookies vs Tokens: The Definitive Guide
- Token Authentication vs. Cookies
- How do sessions work in Flask?
NOTE: Keep in mind that since a JWT is signed rather than encrypted it should never contain sensitive information like a user’s password.
Getting Started
Enough theory, let’s start implementing some code!
Project Setup
Start by cloning the project boilerplate and then create a new branch:
$ git clone https://github.com/realpython/flask-jwt-auth.git
$ cd flask-jwt-auth
$ git checkout tags/1.0.0 -b jwt-auth
Create and activate a virtualenv and install the dependencies:
$ python3.6 -m venv env
$ source env/bin/activate
(env)$ pip install -r requirements.txt
This is optional, but it’s a good idea to create a new Github repository and update the remote:
(env)$ git remote set-url origin <newurl>
Database Setup
Let’s set up Postgres.
NOTE: If you’re on a Mac, check out Postgres app.
Once the local Postgres server is running, create two new databases from
psql
that share the same name as your project name:(env)$ psql
# create database flask_jwt_auth;
CREATE DATABASE
# create database flask_jwt_auth_test;
CREATE DATABASE
# \q
NOTE: There may be some variation on the above commands, for creating a database, based upon your version of Postgres. Check for the correct command in the Postgres documentation.
Before applying the database migrations we need to update the config file found in project/server/config.py. Simply update the
database_name
:database_name = 'flask_jwt_auth'
Set the environment variables in the terminal:
(env)$ export APP_SETTINGS="project.server.config.DevelopmentConfig"
Update the following tests in project/tests/test__config.py:
class TestDevelopmentConfig(TestCase):
def create_app(self):
app.config.from_object('project.server.config.DevelopmentConfig')
return app
def test_app_is_development(self):
self.assertTrue(app.config['DEBUG'] is True)
self.assertFalse(current_app is None)
self.assertTrue(
app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth'
)
class TestTestingConfig(TestCase):
def create_app(self):
app.config.from_object('project.server.config.TestingConfig')
return app
def test_app_is_testing(self):
self.assertTrue(app.config['DEBUG'])
self.assertTrue(
app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth_test'
)
Run them to ensure they still pass:
(env)$ python manage.py test
You should see:
test_app_is_development (test__config.TestDevelopmentConfig) ... ok
test_app_is_production (test__config.TestProductionConfig) ... ok
test_app_is_testing (test__config.TestTestingConfig) ... ok
----------------------------------------------------------------------
Ran 3 tests in 0.007s
OK
Migrations
Add a models.py file to the “server” directory:
# project/server/models.py
import datetime
from project.server import app, db, bcrypt
class User(db.Model):
""" User Model for storing user related details """
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
email = db.Column(db.String(255), unique=True, nullable=False)
password = db.Column(db.String(255), nullable=False)
registered_on = db.Column(db.DateTime, nullable=False)
admin = db.Column(db.Boolean, nullable=False, default=False)
def __init__(self, email, password, admin=False):
self.email = email
self.password = bcrypt.generate_password_hash(
password, app.config.get('BCRYPT_LOG_ROUNDS')
).decode()
self.registered_on = datetime.datetime.now()
self.admin = admin
In the above snippet, we define a basic user model, which uses the Flask-Bcrypt extension to hash the password.
Install psycopg2 to connect to Postgres:
(env)$ pip install psycopg2==2.6.2
(env)$ pip freeze > requirements.txt
Within manage.py change-
from project.server import app, db
To-
from project.server import app, db, models
Apply the migration:
(env)$ python manage.py create_db
(env)$ python manage.py db init
(env)$ python manage.py db migrate
Sanity Check
Did it work?
(env)$ psql
# \c flask_jwt_auth
You are now connected to database "flask_jwt_auth" as user "michael.herman".
# \d
List of relations
Schema | Name | Type | Owner
--------+-----------------+----------+----------
public | alembic_version | table | postgres
public | users | table | postgres
public | users_id_seq | sequence | postgres
(3 rows)
JWT Setup
The auth workflow works as follows:
- Client provides email and password, which is sent to the server
- Server then verifies that email and password are correct and responds with an auth token
- Client stores the token and sends it along with all subsequent requests to the API
- Server decodes the token and validates it
This cycle repeats until the token expires or is revoked. In the latter case, the server issues a new token.
The tokens themselves are divided into three parts:
- Header
- Payload
- Signature
We’ll dive a bit deeper into the payload, but if you’re curious, you can read more about each part from the Introduction to JSON Web Tokens article.
To work with JSON Web Tokens in our app, install the PyJWT package:
(env)$ pip install pyjwt==1.4.2
(env)$ pip freeze > requirements.txt
Encode Token
Add the following method to the
User()
class in project/server/models.py:def encode_auth_token(self, user_id):
"""
Generates the Auth Token
:return: string
"""
try:
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=5),
'iat': datetime.datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
app.config.get('SECRET_KEY'),
algorithm='HS256'
)
except Exception as e:
return e
Don’t forget to add the import:
import jwt
So, given a user id, this method creates and returns a token from the payload and the secret key set in the config.py file. The payload is where we add metadata about the token and information about the user. This info is often referred to as JWT Claims. We utilize the following “claims”:
exp
: expiration date of the tokeniat
: the time the token is generatedsub
: the subject of the token (the user whom it identifies)
The secret key must be random and only accessible server-side. Use the Python interpreter to generate a key:
>>> import os
>>> os.urandom(24)
b"\xf9'\xe4p(\xa9\x12\x1a!\x94\x8d\x1c\x99l\xc7\xb7e\xc7c\x86\x02MJ\xa0"
Set the key as an environment variable:
(env)$ export SECRET_KEY="\xf9'\xe4p(\xa9\x12\x1a!\x94\x8d\x1c\x99l\xc7\xb7e\xc7c\x86\x02MJ\xa0"
Add this key to the
SECRET_KEY
within the BaseConfig()
class in project/server/config.py:SECRET_KEY = os.getenv('SECRET_KEY', 'my_precious')
Update the tests within project/tests/test__config.py to ensure the variable is set correctly:
def test_app_is_development(self):
self.assertFalse(app.config['SECRET_KEY'] is 'my_precious')
self.assertTrue(app.config['DEBUG'] is True)
self.assertFalse(current_app is None)
self.assertTrue(
app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth'
)
class TestTestingConfig(TestCase):
def create_app(self):
app.config.from_object('project.server.config.TestingConfig')
return app
def test_app_is_testing(self):
self.assertFalse(app.config['SECRET_KEY'] is 'my_precious')
self.assertTrue(app.config['DEBUG'])
self.assertTrue(
app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth_test'
)
Before moving on, let’s write a quick unit test for the user model. Add the following code to a new file called test_user_model.py in “project/tests”:
# project/tests/test_user_model.py
import unittest
from project.server import db
from project.server.models import User
from project.tests.base import BaseTestCase
class TestUserModel(BaseTestCase):
def test_encode_auth_token(self):
user = User(
email='test@test.com',
password='test'
)
db.session.add(user)
db.session.commit()
auth_token = user.encode_auth_token(user.id)
self.assertTrue(isinstance(auth_token, bytes))
if __name__ == '__main__':
unittest.main()
Run the tests. They all should pass.
Decode Token
Similarly, to decode a token, add the following method to the
User()
class:@staticmethod
def decode_auth_token(auth_token):
"""
Decodes the auth token
:param auth_token:
:return: integer|string
"""
try:
payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
We need to decode the auth token with every API request and verify its signature to be sure of the user’s authenticity. To verify the
auth_token
, we used the same SECRET_KEY
used to encode a token.
If the
auth_token
is valid, we get the user id from the sub
index of the payload. If invalid, there could be two exceptions:- Expired Signature: When the token is used after it’s expired, it throws a
ExpiredSignatureError
exception. This means the time specified in the payload’sexp
field has expired. - Invalid Token: When the token supplied is not correct or malformed, then an
InvalidTokenError
exception is raised.
NOTE: We have used a static method since it does not relate to the class’s instance.
Add a test to test_user_model.py:
def test_decode_auth_token(self):
user = User(
email='test@test.com',
password='test'
)
db.session.add(user)
db.session.commit()
auth_token = user.encode_auth_token(user.id)
self.assertTrue(isinstance(auth_token, bytes))
self.assertTrue(User.decode_auth_token(auth_token) == 1)
Make sure the tests pass before moving on.
NOTE: We will handle invalid tokens by blacklisting them later.
Route Setup
Now we can configure the auth routes using a test-first approach:
/auth/register
/auth/login
/auth/logout
/auth/user
Start by creating a new folder called “auth” in “project/server”. Then, within “auth” add two files, __init__.py and views.py. Finally, add the following code to views.py:
# project/server/auth/views.py
from flask import Blueprint, request, make_response, jsonify
from flask.views import MethodView
from project.server import bcrypt, db
from project.server.models import User
auth_blueprint = Blueprint('auth', __name__)
To register the new Blueprint with the app, add the following to the bottom of project/server/__init__.py:
from project.server.auth.views import auth_blueprint
app.register_blueprint(auth_blueprint)
Now, add a new file called test_auth.py within “project/tests” to hold all our tests for this Blueprint:
# project/tests/test_auth.py
import unittest
from project.server import db
from project.server.models import User
from project.tests.base import BaseTestCase
class TestAuthBlueprint(BaseTestCase):
pass
if __name__ == '__main__':
unittest.main()
Register Route
Start with a test:
def test_registration(self):
""" Test for user registration """
with self.client:
response = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully registered.')
self.assertTrue(data['auth_token'])
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 201)
Make sure to add the import:
import json
Run the tests. You should see the following error:
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Now, let’s write the code to get the test to pass. Add the following to project/server/auth/views.py:
class RegisterAPI(MethodView):
"""
User Registration Resource
"""
def post(self):
# get the post data
post_data = request.get_json()
# check if user already exists
user = User.query.filter_by(email=post_data.get('email')).first()
if not user:
try:
user = User(
email=post_data.get('email'),
password=post_data.get('password')
)
# insert the user
db.session.add(user)
db.session.commit()
# generate the auth token
auth_token = user.encode_auth_token(user.id)
responseObject = {
'status': 'success',
'message': 'Successfully registered.',
'auth_token': auth_token.decode()
}
return make_response(jsonify(responseObject)), 201
except Exception as e:
responseObject = {
'status': 'fail',
'message': 'Some error occurred. Please try again.'
}
return make_response(jsonify(responseObject)), 401
else:
responseObject = {
'status': 'fail',
'message': 'User already exists. Please Log in.',
}
return make_response(jsonify(responseObject)), 202
# define the API resources
registration_view = RegisterAPI.as_view('register_api')
# add Rules for API Endpoints
auth_blueprint.add_url_rule(
'/auth/register',
view_func=registration_view,
methods=['POST']
)
Here, we register a new user and generate a new auth token for further requests, which we send back to the client.
Run the tests to ensure they all pass:
Ran 6 tests in 0.132s
OK
Next, let’s add one more test to ensure the registration fails if the user already exists:
def test_registered_with_already_registered_user(self):
""" Test registration with already registered email"""
user = User(
email='joe@gmail.com',
password='test'
)
db.session.add(user)
db.session.commit()
with self.client:
response = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(
data['message'] == 'User already exists. Please Log in.')
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 202)
Run the tests again before moving on to the next route. All should pass.
Login Route
Again, start with a test. To verify the login API, let’s test for two cases:
- Registered user login
- Non-registered user login
Registered user login
def test_registered_user_login(self):
""" Test for login of registered-user login """
with self.client:
# user registration
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json',
)
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.'
)
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# registered user login
response = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully logged in.')
self.assertTrue(data['auth_token'])
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 200)
In this test case, the registered user tries to log in and, as expected, our application should allow this.
Run the tests. They should fail. Now write the code:
class LoginAPI(MethodView):
"""
User Login Resource
"""
def post(self):
# get the post data
post_data = request.get_json()
try:
# fetch the user data
user = User.query.filter_by(
email=post_data.get('email')
).first()
auth_token = user.encode_auth_token(user.id)
if auth_token:
responseObject = {
'status': 'success',
'message': 'Successfully logged in.',
'auth_token': auth_token.decode()
}
return make_response(jsonify(responseObject)), 200
except Exception as e:
print(e)
responseObject = {
'status': 'fail',
'message': 'Try again'
}
return make_response(jsonify(responseObject)), 500
Don’t forget to convert the class to a view function:
# define the API resources
registration_view = RegisterAPI.as_view('register_api')
login_view = LoginAPI.as_view('login_api')
# add Rules for API Endpoints
auth_blueprint.add_url_rule(
'/auth/register',
view_func=registration_view,
methods=['POST']
)
auth_blueprint.add_url_rule(
'/auth/login',
view_func=login_view,
methods=['POST']
)
Run the tests again. Do they pass? They should. Don’t move on until all tests pass.
Non-Registered user login
Add the test:
def test_non_registered_user_login(self):
""" Test for login of non-registered user """
with self.client:
response = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'User does not exist.')
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 404)
In this case, a non-registered user attempts to log in and, as expected, our application should not allow this.
Run the tests, and then update the code:
class LoginAPI(MethodView):
"""
User Login Resource
"""
def post(self):
# get the post data
post_data = request.get_json()
try:
# fetch the user data
user = User.query.filter_by(
email=post_data.get('email')
).first()
if user and bcrypt.check_password_hash(
user.password, post_data.get('password')
):
auth_token = user.encode_auth_token(user.id)
if auth_token:
responseObject = {
'status': 'success',
'message': 'Successfully logged in.',
'auth_token': auth_token.decode()
}
return make_response(jsonify(responseObject)), 200
else:
responseObject = {
'status': 'fail',
'message': 'User does not exist.'
}
return make_response(jsonify(responseObject)), 404
except Exception as e:
print(e)
responseObject = {
'status': 'fail',
'message': 'Try again'
}
return make_response(jsonify(responseObject)), 500
What did we change? Do the tests pass? What if the email is correct but the password is incorrect? What happens? Write a test for this!
User Status Route
In order to get the user details of the currently logged in user, the auth token must be sent with the request within the header.
Start with a test:
def test_user_status(self):
""" Test for user status """
with self.client:
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
response = self.client.get(
'/auth/status',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_register.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['data'] is not None)
self.assertTrue(data['data']['email'] == 'joe@gmail.com')
self.assertTrue(data['data']['admin'] is 'true' or 'false')
self.assertEqual(response.status_code, 200)
The test should fail. Now, in the handler class, we should:
- extract the auth token and check its validity
- grab the user id from the payload and get the user details (if the token is valid, of course)
class UserAPI(MethodView):
"""
User Resource
"""
def get(self):
# get the auth token
auth_header = request.headers.get('Authorization')
if auth_header:
auth_token = auth_header.split(" ")[1]
else:
auth_token = ''
if auth_token:
resp = User.decode_auth_token(auth_token)
if not isinstance(resp, str):
user = User.query.filter_by(id=resp).first()
responseObject = {
'status': 'success',
'data': {
'user_id': user.id,
'email': user.email,
'admin': user.admin,
'registered_on': user.registered_on
}
}
return make_response(jsonify(responseObject)), 200
responseObject = {
'status': 'fail',
'message': resp
}
return make_response(jsonify(responseObject)), 401
else:
responseObject = {
'status': 'fail',
'message': 'Provide a valid auth token.'
}
return make_response(jsonify(responseObject)), 401
So, if the token is valid and not expired, we get the user id from the token’s payload, which is then used to get the user data from the database.
NOTE: We still need to check if a token is blacklisted. We’ll get to this shortly.
Make sure to add:
user_view = UserAPI.as_view('user_api')
And:
auth_blueprint.add_url_rule(
'/auth/status',
view_func=user_view,
methods=['GET']
)
The tests should pass:
Ran 10 tests in 0.240s
OK
One more route to go!
Logout Route Tests
Tests valid logout:
def test_valid_logout(self):
""" Test for logout before token expires """
with self.client:
# user registration
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json',
)
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.')
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# user login
resp_login = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data_login = json.loads(resp_login.data.decode())
self.assertTrue(data_login['status'] == 'success')
self.assertTrue(data_login['message'] == 'Successfully logged in.')
self.assertTrue(data_login['auth_token'])
self.assertTrue(resp_login.content_type == 'application/json')
self.assertEqual(resp_login.status_code, 200)
# valid token logout
response = self.client.post(
'/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_login.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully logged out.')
self.assertEqual(response.status_code, 200)
In this first test, we register a new user, log them in, and then attempt to log them out before the token expires.
Test invalid logout:
def test_invalid_logout(self):
""" Testing logout after the token expires """
with self.client:
# user registration
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json',
)
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.')
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# user login
resp_login = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data_login = json.loads(resp_login.data.decode())
self.assertTrue(data_login['status'] == 'success')
self.assertTrue(data_login['message'] == 'Successfully logged in.')
self.assertTrue(data_login['auth_token'])
self.assertTrue(resp_login.content_type == 'application/json')
self.assertEqual(resp_login.status_code, 200)
# invalid token logout
time.sleep(6)
response = self.client.post(
'/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_login.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(
data['message'] == 'Signature expired. Please log in again.')
self.assertEqual(response.status_code, 401)
Like the last test, we register a user, log them in, and then attempt to log them out. In this case, the token is invalid since it has expired.
Add the import:
import time
Now, the code must:
- validate the auth token
- blacklist the token (if valid, of course)
Before writing the route handler, let’s create a new model for blacklisting tokens…
Blacklist
Add the following code to project/server/models.py:
class BlacklistToken(db.Model):
"""
Token Model for storing JWT tokens
"""
__tablename__ = 'blacklist_tokens'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
token = db.Column(db.String(500), unique=True, nullable=False)
blacklisted_on = db.Column(db.DateTime, nullable=False)
def __init__(self, token):
self.token = token
self.blacklisted_on = datetime.datetime.now()
def __repr__(self):
return '<id: token: {}'.format(self.token)
Then create and apply the migrations. Once done, your database should have the following tables:
Schema | Name | Type | Owner
--------+-------------------------+----------+----------
public | alembic_version | table | postgres
public | blacklist_tokens | table | postgres
public | blacklist_tokens_id_seq | sequence | postgres
public | users | table | postgres
public | users_id_seq | sequence | postgres
(5 rows)
With that, we can add the logout handler…
Logout Route Handler
Update the views:
class LogoutAPI(MethodView):
"""
Logout Resource
"""
def post(self):
# get auth token
auth_header = request.headers.get('Authorization')
if auth_header:
auth_token = auth_header.split(" ")[1]
else:
auth_token = ''
if auth_token:
resp = User.decode_auth_token(auth_token)
if not isinstance(resp, str):
# mark the token as blacklisted
blacklist_token = BlacklistToken(token=auth_token)
try:
# insert the token
db.session.add(blacklist_token)
db.session.commit()
responseObject = {
'status': 'success',
'message': 'Successfully logged out.'
}
return make_response(jsonify(responseObject)), 200
except Exception as e:
responseObject = {
'status': 'fail',
'message': e
}
return make_response(jsonify(responseObject)), 200
else:
responseObject = {
'status': 'fail',
'message': resp
}
return make_response(jsonify(responseObject)), 401
else:
responseObject = {
'status': 'fail',
'message': 'Provide a valid auth token.'
}
return make_response(jsonify(responseObject)), 403
# define the API resources
registration_view = RegisterAPI.as_view('register_api')
login_view = LoginAPI.as_view('login_api')
user_view = UserAPI.as_view('user_api')
logout_view = LogoutAPI.as_view('logout_api')
# add Rules for API Endpoints
auth_blueprint.add_url_rule(
'/auth/register',
view_func=registration_view,
methods=['POST']
)
auth_blueprint.add_url_rule(
'/auth/login',
view_func=login_view,
methods=['POST']
)
auth_blueprint.add_url_rule(
'/auth/status',
view_func=user_view,
methods=['GET']
)
auth_blueprint.add_url_rule(
'/auth/logout',
view_func=logout_view,
methods=['POST']
)
Update the imports:
from project.server.models import User, BlacklistToken
When a users logs out, the token is no longer valid so we add it to the blacklist.
NOTE: Often, larger applications have a way to renew blacklisted tokens every now and then so that the system does not run out of valid tokens.
Run the tests:
Ran 12 tests in 6.418s
OK
Refactoring
Finally, we need to ensure that a token has not been blacklisted, right after the token has been decoded -
decode_auth_token()
- within the logout and user status routes.
First, let’s write a test for the logout route:
def test_valid_blacklisted_token_logout(self):
""" Test for logout after a valid token gets blacklisted """
with self.client:
# user registration
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json',
)
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.')
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# user login
resp_login = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data_login = json.loads(resp_login.data.decode())
self.assertTrue(data_login['status'] == 'success')
self.assertTrue(data_login['message'] == 'Successfully logged in.')
self.assertTrue(data_login['auth_token'])
self.assertTrue(resp_login.content_type == 'application/json')
self.assertEqual(resp_login.status_code, 200)
# blacklist a valid token
blacklist_token = BlacklistToken(
token=json.loads(resp_login.data.decode())['auth_token'])
db.session.add(blacklist_token)
db.session.commit()
# blacklisted valid token logout
response = self.client.post(
'/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_login.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'Token blacklisted. Please log in again.')
self.assertEqual(response.status_code, 401)
In this test, we blacklist the token just before the logout route gets hit which makes our valid token unusable.
Update the imports:
from project.server.models import User, BlacklistToken
The test should fail with the following exception:
psycopg2.IntegrityError: duplicate key value violates unique constraint "blacklist_tokens_token_key"
DETAIL: Key (token)=(eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE0ODUyMDgyOTUsImlhdCI6MTQ4NTIwODI5MCwic3ViIjoxfQ.D9annoyh-VwpI5RY3blaSBX4pzK5UJi1H9dmKg2DeLQ) already exists.
Now update the
decode_auth_token
function to handle already blacklisted tokens right after the decode and respond with appropriate message.@staticmethod
def decode_auth_token(auth_token):
"""
Validates the auth token
:param auth_token:
:return: integer|string
"""
try:
payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
if is_blacklisted_token:
return 'Token blacklisted. Please log in again.'
else:
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
Finally, add the
check_blacklist()
function to project/server/models.py in the BlacklistToken
class:@staticmethod
def check_blacklist(auth_token):
# check whether auth token has been blacklisted
res = BlacklistToken.query.filter_by(token=str(auth_token)).first()
if res:
return True
else:
return False
Before you run the test, update
test_decode_auth_token
to convert the bytes object to a string:def test_decode_auth_token(self):
user = User(
email='test@test.com',
password='test'
)
db.session.add(user)
db.session.commit()
auth_token = user.encode_auth_token(user.id)
self.assertTrue(isinstance(auth_token, bytes))
self.assertTrue(User.decode_auth_token(
auth_token.decode("utf-8") ) == 1)
Run the tests:
Ran 13 tests in 9.557s
OK
In a similar fashion, add one more test for the user status route.
def test_valid_blacklisted_token_user(self):
""" Test for user status with a blacklisted valid token """
with self.client:
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
# blacklist a valid token
blacklist_token = BlacklistToken(
token=json.loads(resp_register.data.decode())['auth_token'])
db.session.add(blacklist_token)
db.session.commit()
response = self.client.get(
'/auth/status',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_register.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'Token blacklisted. Please log in again.')
self.assertEqual(response.status_code, 401)
Similar to the last test, we blacklisted the token before the user status route gets hit.
Run the tests for one final time:
Ran 14 tests in 10.206s
OK
Code Smell
Finally, take a look at test_auth.py. Notice the duplicate code? For example:
self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json',
)
There are eight occurrences of this. To fix, add the following helper at the top of the file:
def register_user(self, email, password):
return self.client.post(
'/auth/register',
data=json.dumps(dict(
email=email,
password=password
)),
content_type='application/json',
)
Now, anywhere you need to register a user, you can call the helper:
register_user(self, 'joe@gmail.com', '123456')
How about logging in a user? Refactor it on your own. What else can you refactor? Comment below.
Refactor
For the PyBites Challenge, let’s refactor some code to correct an issue added to the GitHub repo. Start by adding the following test to test_auth.py:
def test_user_status_malformed_bearer_token(self):
""" Test for user status with malformed bearer token"""
with self.client:
resp_register = register_user(self, 'joe@gmail.com', '123456')
response = self.client.get(
'/auth/status',
headers=dict(
Authorization='Bearer' + json.loads(
resp_register.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'Bearer token malformed.')
self.assertEqual(response.status_code, 401)
Essentially, an error is thrown if the
Authorization
header is formatted incorrectly - e.g., no space between Bearer
and the token value. Run the tests to ensure they fail, and then update the UserAPI
class in project/server/auth/views.py:class UserAPI(MethodView):
"""
User Resource
"""
def get(self):
# get the auth token
auth_header = request.headers.get('Authorization')
if auth_header:
try:
auth_token = auth_header.split(" ")[1]
except IndexError:
responseObject = {
'status': 'fail',
'message': 'Bearer token malformed.'
}
return make_response(jsonify(responseObject)), 401
else:
auth_token = ''
if auth_token:
resp = User.decode_auth_token(auth_token)
if not isinstance(resp, str):
user = User.query.filter_by(id=resp).first()
responseObject = {
'status': 'success',
'data': {
'user_id': user.id,
'email': user.email,
'admin': user.admin,
'registered_on': user.registered_on
}
}
return make_response(jsonify(responseObject)), 200
responseObject = {
'status': 'fail',
'message': resp
}
return make_response(jsonify(responseObject)), 401
else:
responseObject = {
'status': 'fail',
'message': 'Provide a valid auth token.'
}
return make_response(jsonify(responseObject)), 401
Run the tests one final time.
Conclusion
In this tutorial, we went through the process of adding authentication to a Flask app with JSON Web Tokens. Turn back to the objectives from the beginning of this tutorial. Can you put each one into action? What did you learn?
What’s next? How about the client-side. Check out Token-Based Authentication With Angularfor adding Angular into the mix.