|
7 | 7 | from flask import Flask, jsonify, request |
8 | 8 | from flask_jwt_extended.blacklist import _get_token_ttl, get_stored_token |
9 | 9 | from flask_jwt_extended.utils import _encode_refresh_token, _decode_jwt, \ |
10 | | - fresh_jwt_required, get_jwt_identity |
| 10 | + fresh_jwt_required, get_jwt_identity, get_raw_jwt |
11 | 11 |
|
12 | 12 | from flask_jwt_extended import JWTManager, create_access_token, \ |
13 | 13 | get_all_stored_tokens, get_stored_tokens, revoke_token, unrevoke_token, \ |
@@ -71,6 +71,14 @@ def refresh(): |
71 | 71 | ret = {'access_token': create_access_token(username, fresh=False)} |
72 | 72 | return jsonify(ret), 200 |
73 | 73 |
|
| 74 | + @self.app.route('/auth/logout', methods=['POST']) |
| 75 | + @jwt_required |
| 76 | + def logout(): |
| 77 | + jti = get_raw_jwt()['jti'] |
| 78 | + revoke_token(jti) |
| 79 | + ret = {"msg": "Successfully logged out"} |
| 80 | + return jsonify(ret), 200 |
| 81 | + |
74 | 82 | @self.app.route('/protected', methods=['POST']) |
75 | 83 | @jwt_required |
76 | 84 | def protected(): |
@@ -284,6 +292,28 @@ def test_revoked_refresh_token(self): |
284 | 292 | self.assertEqual(status, 200) |
285 | 293 | self.assertIn('access_token', data) |
286 | 294 |
|
| 295 | + def test_login_logout(self): |
| 296 | + # Check access and refresh tokens |
| 297 | + self.app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = 'all' |
| 298 | + |
| 299 | + # Login |
| 300 | + access_token, refresh_token = self._login('test12345') |
| 301 | + |
| 302 | + # Verify we can access the protected endpoint |
| 303 | + status, data = self._jwt_post('/protected', access_token) |
| 304 | + self.assertEqual(status, 200) |
| 305 | + self.assertEqual(data, {'hello': 'world'}) |
| 306 | + |
| 307 | + # Logout |
| 308 | + status, data = self._jwt_post('/auth/logout', access_token) |
| 309 | + self.assertEqual(status, 200) |
| 310 | + self.assertEqual(data, {'msg': 'Successfully logged out'}) |
| 311 | + |
| 312 | + # Verify that we cannot access the protected endpoint anymore |
| 313 | + status, data = self._jwt_post('/protected', access_token) |
| 314 | + self.assertEqual(status, 401) |
| 315 | + self.assertEqual(data, {'msg': 'Token has been revoked'}) |
| 316 | + |
287 | 317 | def test_bad_blacklist_settings(self): |
288 | 318 | app = Flask(__name__) |
289 | 319 | app.testing = True # Propagate exceptions |
|
0 commit comments