Skip to content
65 changes: 40 additions & 25 deletions plexapi/myplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -2146,23 +2146,45 @@ def _encodeClientJWT(self):
headers=headers
)

def _decodePlexJWT(self):
""" Returns the decoded and verified Plex JWT using the Plex public JWK. """
return jwt.decode(
self.jwtToken,
key=jwt.PyJWK.from_dict(self._getPlexPublicJWK()),
algorithms=['EdDSA'],
options={
'require': ['aud', 'iss', 'exp', 'iat', 'thumbprint']
},
audience=['plex.tv', self._clientIdentifier],
issuer='plex.tv',
)
def decodePlexJWT(self, verify_signature=True):
""" Returns the decoded Plex JWT with optional signature verification using the Plex public JWK.

Parameters:
verify_signature (bool): Whether to verify the JWT signature and required claims.
Defaults to True. Set to False to skip signature verification and required-claim enforcement.
"""
kwargs = {
'jwt': self.jwtToken,
'algorithms': ['EdDSA'],
'options': {'verify_signature': verify_signature},
'audience': ['plex.tv', self._clientIdentifier],
'issuer': 'plex.tv',
}

if not verify_signature:
return jwt.decode(**kwargs)

kwargs['options']['require'] = ['aud', 'iss', 'exp', 'iat', 'thumbprint']

for plexJWK in reversed(self._getPlexPublicJWK()):
try:
return jwt.decode(
key=jwt.PyJWK.from_dict(plexJWK),
**kwargs
)
except jwt.InvalidSignatureError:
continue
except jwt.InvalidTokenError as e:
log.warning('Invalid Plex JWT: %s', str(e))
raise

log.warning('Plex JWT signature could not be verified with any known Plex JWKs')
raise jwt.InvalidSignatureError

@property
def decodedJWT(self):
""" Returns the decoded Plex JWT. """
return self._decodePlexJWT()
""" Returns the decoded Plex JWT with signature verification and required-claim enforcement. """
return self.decodePlexJWT()

def _registerPlexDevice(self):
""" Registers the public JWK with Plex. """
Expand All @@ -2185,10 +2207,10 @@ def _exchangePlexJWT(self):
return data['auth_token']

def _getPlexPublicJWK(self):
""" Gets the Plex public JWK. """
""" Gets the Plex public JWKs. """
url = f'{self.AUTH}/keys'
data = self._query(url, method=self._session.get)
return data['keys'][0]
return data['keys']

def registerDevice(self):
""" Registers the device with Plex using the provided token and private/public keypair.
Expand Down Expand Up @@ -2234,14 +2256,7 @@ def verifyJWT(self, refreshWithinDays=1):
"""
try:
decodedJWT = self.decodedJWT
except jwt.ExpiredSignatureError:
log.warning('Existing JWT has expired')
return False
except jwt.InvalidSignatureError:
log.warning('Existing JWT has invalid signature')
return False
except jwt.InvalidTokenError as e:
log.warning(f'Existing JWT is invalid: {e}')
except jwt.InvalidTokenError:
return False
else:
if decodedJWT['thumbprint'] != self._keyID:
Expand Down Expand Up @@ -2429,7 +2444,7 @@ def _query(self, url, method=None, headers=None, **kwargs):
codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ')
raise BadRequest(f'({response.status_code}) {codename} {response.url}; {errtext}')
if 'application/json' in response.headers.get('Content-Type', ''):
if 'application/json' in response.headers.get('Content-Type', '') and len(response.content):
return response.json()
return utils.parseXMLString(response.text)

Expand Down
38 changes: 37 additions & 1 deletion tests/test_myplex.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
import jwt

import pytest
from plexapi.exceptions import BadRequest, NotFound, Unauthorized
from plexapi.myplex import MyPlexInvite
from plexapi.myplex import MyPlexAccount, MyPlexInvite, MyPlexJWTLogin

from . import conftest as utils
from .payloads import MYPLEX_INVITE
Expand Down Expand Up @@ -366,3 +368,37 @@ def test_myplex_geoip(account):

def test_myplex_ping(account):
assert account.ping()


def test_myplex_jwt_login(account, tmp_path, monkeypatch):
jwtlogin = MyPlexJWTLogin(
token=account.authToken,
scopes=['username', 'email', 'friendly_name']
)
jwtlogin.generateKeypair(keyfiles=(tmp_path / 'private.key', tmp_path / 'public.key'), overwrite=True)
with pytest.raises(FileExistsError):
jwtlogin.generateKeypair(keyfiles=(tmp_path / 'private.key', tmp_path / 'public.key'))
jwtlogin.registerDevice()
jwtToken = jwtlogin.refreshJWT()
assert jwtlogin.decodedJWT['user']['username'] == account.username
assert MyPlexAccount(token=jwtToken) == account

jwtlogin = MyPlexJWTLogin(
jwtToken=jwtToken,
keypair=(tmp_path / 'private.key', tmp_path / 'public.key'),
scopes=['username', 'email', 'friendly_name']
)
assert jwtlogin.verifyJWT()
newjwtToken = jwtlogin.refreshJWT()
assert newjwtToken != jwtToken
assert MyPlexAccount(token=newjwtToken) == account

plexPublicJWKs = jwtlogin._getPlexPublicJWK()
invalidJWK = plexPublicJWKs[0].copy()
invalidJWK['x'] += 'invalid'
monkeypatch.setattr(MyPlexJWTLogin, "_getPlexPublicJWK", lambda self: plexPublicJWKs + [invalidJWK])
assert jwtlogin.decodePlexJWT()

monkeypatch.setattr(MyPlexJWTLogin, "_getPlexPublicJWK", lambda self: [invalidJWK])
with pytest.raises(jwt.InvalidSignatureError):
jwtlogin.decodePlexJWT()
Loading