Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions ably/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ def dump_body(self, body):
else:
return json.dumps(body, separators=(',', ':'))

def get_rest_hosts(self):
hosts = self.options.get_rest_hosts()
host = self.__host or self.options.fallback_realtime_host
def get_hosts(self):
hosts = self.options.get_hosts()
host = self.__host or self.options.fallback_host
if host is None:
return hosts

Expand Down Expand Up @@ -186,7 +186,7 @@ async def make_request(self, method, path, version=None, headers=None, body=None
http_max_retry_duration = self.http_max_retry_duration
requested_at = time.time()

hosts = self.get_rest_hosts()
hosts = self.get_hosts()
for retry_count, host in enumerate(hosts):
def should_stop_retrying(retry_count=retry_count):
time_passed = time.time() - requested_at
Expand Down Expand Up @@ -229,7 +229,7 @@ def should_stop_retrying(retry_count=retry_count):
continue

# Keep fallback host for later (RSC15f)
if retry_count > 0 and host != self.options.get_rest_host():
if retry_count > 0 and host != self.options.get_host():
self.__host = host
self.__host_expires = time.time() + (self.options.fallback_retry_timeout / 1000.0)

Expand Down Expand Up @@ -277,7 +277,7 @@ def options(self):

@property
def preferred_host(self):
return self.options.get_rest_host()
return self.options.get_host()

@property
def preferred_port(self):
Expand Down
4 changes: 2 additions & 2 deletions ably/realtime/connectionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def __init__(self, realtime: AblyRealtime, initial_state):
self.retry_timer: Timer | None = None
self.connect_base_task: asyncio.Task | None = None
self.disconnect_transport_task: asyncio.Task | None = None
self.__fallback_hosts: list[str] = self.options.get_fallback_realtime_hosts()
self.__fallback_hosts: list[str] = self.options.get_fallback_hosts()
self.queued_messages: deque[PendingMessage] = deque()
self.__error_reason: AblyException | None = None
self.msg_serial: int = 0
Expand Down Expand Up @@ -532,7 +532,7 @@ async def connect_with_fallback_hosts(self, fallback_hosts: list) -> Exception |

async def connect_base(self) -> None:
fallback_hosts = self.__fallback_hosts
primary_host = self.options.get_realtime_host()
primary_host = self.options.get_host()
try:
await self.try_host(primary_host)
return
Expand Down
4 changes: 4 additions & 0 deletions ably/realtime/realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ def __init__(self, key: Optional[str] = None, loop: Optional[asyncio.AbstractEve
You can set this to false and explicitly connect to Ably using the
connect() method. The default is true.
**kwargs: client options
endpoint: str
Endpoint specifies either a routing policy name or fully qualified domain name to connect to Ably.
realtime_host: str
Deprecated: this property is deprecated and will be removed in a future version.
Enables a non-default Ably host to be specified for realtime connections.
For development environments only. The default value is realtime.ably.io.
environment: str
Deprecated: this property is deprecated and will be removed in a future version.
Enables a custom environment to be used with the Ably service. Defaults to `production`
realtime_request_timeout: float
Timeout (in milliseconds) for the wait of acknowledgement for operations performed via a realtime
Expand Down
10 changes: 8 additions & 2 deletions ably/rest/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ def __init__(self, key: Optional[str] = None, token: Optional[str] = None,

**Optional Parameters**
- `client_id`: Undocumented
- `rest_host`: The host to connect to. Defaults to rest.ably.io
- `environment`: The environment to use. Defaults to 'production'
- `endpoint`: Endpoint specifies either a routing policy name or
fully qualified domain name to connect to Ably.
- `rest_host`: Deprecated: this property is deprecated and will
be removed in a future version. The host to connect to.
Defaults to rest.ably.io
- `environment`: Deprecated: this property is deprecated and
will be removed in a future version. The environment to use.
Defaults to 'production'
- `port`: The port to connect to. Defaults to 80
- `tls_port`: The tls_port to connect to. Defaults to 443
- `tls`: Specifies whether the client should use TLS. Defaults
Expand Down
48 changes: 31 additions & 17 deletions ably/transport/defaults.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
class Defaults:
protocol_version = "2"
fallback_hosts = [
"a.ably-realtime.com",
"b.ably-realtime.com",
"c.ably-realtime.com",
"d.ably-realtime.com",
"e.ably-realtime.com",
]

rest_host = "rest.ably.io"
realtime_host = "realtime.ably.io" # RTN2

connectivity_check_url = "https://internet-up.ably-realtime.com/is-the-internet-up.txt"
environment = 'production'
endpoint = 'main'

port = 80
tls_port = 443
Expand Down Expand Up @@ -53,11 +44,34 @@ def get_scheme(options):
return "http"

@staticmethod
def get_environment_fallback_hosts(environment):
def get_hostname(endpoint):
if "." in endpoint or "::" in endpoint or "localhost" in endpoint:
return endpoint

if endpoint.startswith("nonprod:"):
return endpoint[len("nonprod:"):] + ".realtime.ably-nonprod.net"

return endpoint + ".realtime.ably.net"

@staticmethod
def get_fallback_hosts(endpoint="main"):
if "." in endpoint or "::" in endpoint or "localhost" in endpoint:
return []

if endpoint.startswith("nonprod:"):
root = endpoint.replace("nonprod:", "")
return [
root + ".a.fallback.ably-realtime-nonprod.com",
root + ".b.fallback.ably-realtime-nonprod.com",
root + ".c.fallback.ably-realtime-nonprod.com",
root + ".d.fallback.ably-realtime-nonprod.com",
root + ".e.fallback.ably-realtime-nonprod.com",
]

return [
environment + "-a-fallback.ably-realtime.com",
environment + "-b-fallback.ably-realtime.com",
environment + "-c-fallback.ably-realtime.com",
environment + "-d-fallback.ably-realtime.com",
environment + "-e-fallback.ably-realtime.com",
endpoint + ".a.fallback.ably-realtime.com",
endpoint + ".b.fallback.ably-realtime.com",
endpoint + ".c.fallback.ably-realtime.com",
endpoint + ".d.fallback.ably-realtime.com",
endpoint + ".e.fallback.ably-realtime.com",
]
4 changes: 2 additions & 2 deletions ably/transport/websockettransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ async def on_protocol_message(self, msg):
self.max_idle_interval = max_idle_interval + self.options.realtime_request_timeout
self.on_activity()
self.is_connected = True
if self.host != self.options.get_realtime_host(): # RTN17e
self.options.fallback_realtime_host = self.host
if self.host != self.options.get_host(): # RTN17e
self.options.fallback_host = self.host
self.connection_manager.on_connected(connection_details, connection_id, reason=exception)
elif action == ProtocolMessageAction.DISCONNECTED:
error = msg.get('error')
Expand Down
145 changes: 61 additions & 84 deletions ably/types/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,22 @@ def decode(self, delta: bytes, base: bytes) -> bytes:

class Options(AuthOptions):
def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realtime_host=None, port=0,
tls_port=0, use_binary_protocol=True, queue_messages=True, recover=False, environment=None,
http_open_timeout=None, http_request_timeout=None, realtime_request_timeout=None,
http_max_retry_count=None, http_max_retry_duration=None, fallback_hosts=None,
fallback_retry_timeout=None, disconnected_retry_timeout=None, idempotent_rest_publishing=None,
loop=None, auto_connect=True, suspended_retry_timeout=None, connectivity_check_url=None,
tls_port=0, use_binary_protocol=True, queue_messages=True, recover=False, endpoint=None,
environment=None, http_open_timeout=None, http_request_timeout=None,
realtime_request_timeout=None, http_max_retry_count=None, http_max_retry_duration=None,
fallback_hosts=None, fallback_retry_timeout=None,
disconnected_retry_timeout=None, idempotent_rest_publishing=None, loop=None, auto_connect=True,
suspended_retry_timeout=None, connectivity_check_url=None,
channel_retry_timeout=Defaults.channel_retry_timeout, add_request_ids=False,
vcdiff_decoder: VCDiffDecoder = None, transport_params=None, **kwargs):

super().__init__(**kwargs)

# REC1b1: endpoint is incompatible with deprecated options
if endpoint is not None:
if environment is not None or rest_host is not None or realtime_host is not None:
raise ValueError('endpoint is incompatible with any of environment, rest_host or realtime_host')

# TODO check these defaults
if fallback_retry_timeout is None:
fallback_retry_timeout = Defaults.fallback_retry_timeout
Expand Down Expand Up @@ -64,25 +70,39 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realti
from ably import api_version
idempotent_rest_publishing = api_version >= '1.2'

if environment is None:
environment = Defaults.environment
if environment is not None and endpoint is None:
log.warning("environment client option is deprecated, please use endpoint instead")
endpoint = environment

# REC1d: restHost or realtimeHost option
# REC1d1: restHost takes precedence over realtimeHost
if rest_host is not None and endpoint is None:
log.warning("rest_host client option is deprecated, please use endpoint instead")
endpoint = rest_host
elif realtime_host is not None and endpoint is None:
# REC1d2: realtimeHost if restHost not specified
log.warning("realtime_host client option is deprecated, please use endpoint instead")
endpoint = realtime_host

if endpoint is None:
endpoint = Defaults.endpoint

self.__client_id = client_id
self.__log_level = log_level
self.__tls = tls
self.__rest_host = rest_host
self.__realtime_host = realtime_host
self.__port = port
self.__tls_port = tls_port
self.__use_binary_protocol = use_binary_protocol
self.__queue_messages = queue_messages
self.__recover = recover
self.__environment = environment
self.__endpoint = endpoint
self.__http_open_timeout = http_open_timeout
self.__http_request_timeout = http_request_timeout
self.__realtime_request_timeout = realtime_request_timeout
self.__http_max_retry_count = http_max_retry_count
self.__http_max_retry_duration = http_max_retry_duration
# Field for internal use only
self.__fallback_host = None
self.__fallback_hosts = fallback_hosts
self.__fallback_retry_timeout = fallback_retry_timeout
self.__disconnected_retry_timeout = disconnected_retry_timeout
Expand All @@ -93,13 +113,10 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realti
self.__connection_state_ttl = connection_state_ttl
self.__suspended_retry_timeout = suspended_retry_timeout
self.__connectivity_check_url = connectivity_check_url
self.__fallback_realtime_host = None
self.__add_request_ids = add_request_ids
self.__vcdiff_decoder = vcdiff_decoder
self.__transport_params = transport_params or {}

self.__rest_hosts = self.__get_rest_hosts()
self.__realtime_hosts = self.__get_realtime_hosts()
self.__hosts = self.__get_hosts()

@property
def client_id(self):
Expand All @@ -125,23 +142,6 @@ def tls(self):
def tls(self, value):
self.__tls = value

@property
def rest_host(self):
return self.__rest_host

@rest_host.setter
def rest_host(self, value):
self.__rest_host = value

# RTC1d
@property
def realtime_host(self):
return self.__realtime_host

@realtime_host.setter
def realtime_host(self, value):
self.__realtime_host = value

@property
def port(self):
return self.__port
Expand Down Expand Up @@ -183,8 +183,8 @@ def recover(self, value):
self.__recover = value

@property
def environment(self):
return self.__environment
def endpoint(self):
return self.__endpoint

@property
def http_open_timeout(self):
Expand Down Expand Up @@ -268,12 +268,18 @@ def connectivity_check_url(self):
return self.__connectivity_check_url

@property
def fallback_realtime_host(self):
return self.__fallback_realtime_host
def fallback_host(self):
"""
For internal use only, can be deleted in future
"""
return self.__fallback_host

@fallback_realtime_host.setter
def fallback_realtime_host(self, value):
self.__fallback_realtime_host = value
@fallback_host.setter
def fallback_host(self, value):
"""
For internal use only, can be deleted in future
"""
self.__fallback_host = value

@property
def add_request_ids(self):
Expand All @@ -287,37 +293,20 @@ def vcdiff_decoder(self):
def transport_params(self):
return self.__transport_params

def __get_rest_hosts(self):
def __get_hosts(self):
"""
Return the list of hosts as they should be tried. First comes the main
host. Then the fallback hosts in random order.
The returned list will have a length of up to http_max_retry_count.
"""
# Defaults
host = self.rest_host
if host is None:
host = Defaults.rest_host

environment = self.environment
host = Defaults.get_hostname(self.endpoint)
# REC2: Determine fallback hosts
fallback_hosts = self.get_fallback_hosts()

http_max_retry_count = self.http_max_retry_count
if http_max_retry_count is None:
http_max_retry_count = Defaults.http_max_retry_count

# Prepend environment
if environment != 'production':
host = f'{environment}-{host}'

# Fallback hosts
fallback_hosts = self.fallback_hosts
if fallback_hosts is None:
if host == Defaults.rest_host:
fallback_hosts = Defaults.fallback_hosts
elif environment != 'production':
fallback_hosts = Defaults.get_environment_fallback_hosts(environment)
else:
fallback_hosts = []

# Shuffle
fallback_hosts = list(fallback_hosts)
random.shuffle(fallback_hosts)
Expand All @@ -328,31 +317,19 @@ def __get_rest_hosts(self):
hosts = hosts[:http_max_retry_count]
return hosts

def __get_realtime_hosts(self):
if self.realtime_host is not None:
host = self.realtime_host
return [host]
elif self.environment != "production":
host = f'{self.environment}-{Defaults.realtime_host}'
else:
host = Defaults.realtime_host

return [host] + self.__fallback_hosts

def get_rest_hosts(self):
return self.__rest_hosts

def get_rest_host(self):
return self.__rest_hosts[0]

def get_realtime_hosts(self):
return self.__realtime_hosts
def get_hosts(self):
return self.__hosts

def get_realtime_host(self):
return self.__realtime_hosts[0]
def get_host(self):
return self.__hosts[0]

def get_fallback_rest_hosts(self):
return self.__rest_hosts[1:]
# REC2: Various client options collectively determine a set of fallback domains
def get_fallback_hosts(self):
# REC2a: If the fallbackHosts client option is specified
if self.__fallback_hosts is not None:
# REC2a2: the set of fallback domains is given by the value of the fallbackHosts option
return self.__fallback_hosts

def get_fallback_realtime_hosts(self):
return self.__realtime_hosts[1:]
# REC2c: Otherwise, the set of fallback domains is defined implicitly by the options
# used to define the primary domain as specified in (REC1)
return Defaults.get_fallback_hosts(self.endpoint)
Loading
Loading