Explicitly allow requests from public websites via new header (#302)

- Chrome now enforces that servers on private networks explicitly
  grant access to public websites using a new header
  "Access-Control-Allow-Private-Network" that should be sent in
  responses to preflight OPTIONS requests.
- This change implements special handling for OPTIONS requests by
  sending all the existing CORS headers along with the new
  Access-Control-Allow-Private-Network header if private network
  access is being requested.
- See https://developer.chrome.com/blog/private-network-access-preflight/
  for more info.
This commit is contained in:
Raphael-Joel Lim 2022-02-18 23:08:44 -08:00 committed by GitHub
parent 7136a15ade
commit a5aecfceee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -24,7 +24,8 @@ from . import util
# #
class WebRequest: class WebRequest:
def __init__(self, headers, body): def __init__(self, method, headers, body):
self.method = method
self.headers = headers self.headers = headers
self.body = body self.body = body
@ -95,8 +96,15 @@ class WebClient:
if len(parts) == 1: if len(parts) == 1:
return None, 0 return None, 0
lines = parts[0].split('\r\n'.encode('utf-8'))
method = None
if len(lines) > 0:
request_line_parts = lines[0].split(' '.encode('utf-8'))
method = request_line_parts[0].upper() if len(request_line_parts) > 0 else None
headers = {} headers = {}
for line in parts[0].split('\r\n'.encode('utf-8')): for line in lines[1:]:
pair = line.split(': '.encode('utf-8')) pair = line.split(': '.encode('utf-8'))
headers[pair[0].lower()] = pair[1] if len(pair) > 1 else None headers[pair[0].lower()] = pair[1] if len(pair) > 1 else None
@ -108,8 +116,7 @@ class WebClient:
return None, 0 return None, 0
body = data[headerLength : totalLength] body = data[headerLength : totalLength]
return WebRequest(headers, body), totalLength return WebRequest(method, headers, body), totalLength
# #
# WebServer # WebServer
@ -154,7 +161,54 @@ class WebServer:
def handlerWrapper(self, req): def handlerWrapper(self, req):
allowed, corsOrigin = self.allowOrigin(req)
if req.method == b'OPTIONS':
body = ''.encode('utf-8')
headers = self.buildHeaders(corsOrigin, body)
if b'access-control-request-private-network' in req.headers and (
req.headers[b'access-control-request-private-network'] == b'true'):
# include this header so that if a public origin is included in the whitelist,
# then browsers won't fail requests due to the private network access check
headers.append(['Access-Control-Allow-Private-Network', 'true'])
return self.buildResponse(headers, body)
paramsError = False
try:
params = json.loads(req.body.decode('utf-8'))
except ValueError:
body = json.dumps(None).encode('utf-8')
paramsError = True
if allowed or not paramsError and params.get('action', '') == 'requestPermission':
if len(req.body) == 0:
body = 'AnkiConnect v.{}'.format(util.setting('apiVersion')).encode('utf-8')
else:
if params.get('action', '') == 'requestPermission':
params['params'] = params.get('params', {})
params['params']['allowed'] = allowed
params['params']['origin'] = b'origin' in req.headers and req.headers[b'origin'].decode() or ''
if not allowed :
corsOrigin = params['params']['origin']
body = json.dumps(self.handler(params)).encode('utf-8')
headers = self.buildHeaders(corsOrigin, body)
else :
headers = [
['HTTP/1.1 403 Forbidden', None],
['Access-Control-Allow-Origin', corsOrigin],
['Access-Control-Allow-Headers', '*']
]
body = ''.encode('utf-8')
return self.buildResponse(headers, body)
def allowOrigin(self, req):
# handle multiple cors origins by checking the 'origin'-header against the allowed origin list from the config # handle multiple cors origins by checking the 'origin'-header against the allowed origin list from the config
webCorsOriginList = util.setting('webCorsOriginList') webCorsOriginList = util.setting('webCorsOriginList')
@ -184,42 +238,21 @@ class WebServer:
else: else:
allowed = True allowed = True
resp = bytes() return allowed, corsOrigin
paramsError = False
try:
params = json.loads(req.body.decode('utf-8'))
except ValueError:
body = json.dumps(None).encode('utf-8')
paramsError = True
if allowed or not paramsError and params.get('action', '') == 'requestPermission':
if len(req.body) == 0:
body = 'AnkiConnect v.{}'.format(util.setting('apiVersion')).encode('utf-8')
else:
if params.get('action', '') == 'requestPermission':
params['params'] = params.get('params', {})
params['params']['allowed'] = allowed
params['params']['origin'] = b'origin' in req.headers and req.headers[b'origin'].decode() or ''
if not allowed :
corsOrigin = params['params']['origin']
body = json.dumps(self.handler(params)).encode('utf-8') def buildHeaders(self, corsOrigin, body):
return [
headers = [
['HTTP/1.1 200 OK', None], ['HTTP/1.1 200 OK', None],
['Content-Type', 'text/json'], ['Content-Type', 'text/json'],
['Access-Control-Allow-Origin', corsOrigin], ['Access-Control-Allow-Origin', corsOrigin],
['Access-Control-Allow-Headers', '*'], ['Access-Control-Allow-Headers', '*'],
['Content-Length', str(len(body))] ['Content-Length', str(len(body))]
] ]
else :
headers = [
['HTTP/1.1 403 Forbidden', None],
['Access-Control-Allow-Origin', corsOrigin],
['Access-Control-Allow-Headers', '*']
]
body = ''.encode('utf-8')
def buildResponse(self, headers, body):
resp = bytes()
for key, value in headers: for key, value in headers:
if value is None: if value is None:
resp += '{}\r\n'.format(key).encode('utf-8') resp += '{}\r\n'.format(key).encode('utf-8')
@ -228,7 +261,6 @@ class WebServer:
resp += '\r\n'.encode('utf-8') resp += '\r\n'.encode('utf-8')
resp += body resp += body
return resp return resp