Skip to content

Commit

Permalink
Implement hotfix from python/cpython#118960, refactor URL, restructur…
Browse files Browse the repository at this point in the history
…e connection and cancellation and handle exceptions more gracefully. This addresses the 1006 issue.
  • Loading branch information
cjavad committed May 18, 2024
1 parent 93b4793 commit 6f0efcc
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 116 deletions.
18 changes: 10 additions & 8 deletions simplyprint_ws_client/client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import functools
import logging
import threading
from contextlib import suppress
from typing import Optional, Generic, Dict

from .config import Config, ConfigManager
Expand All @@ -12,7 +13,7 @@
from .provider import ClientProvider, BasicClientProvider, TClientProviderFactory
from ..const import APP_DIRS
from ..helpers.sentry import Sentry
from ..helpers.url_builder import SimplyPrintUrl
from ..helpers.url_builder import SimplyPrintURL
from ..utils import traceability
from ..utils.event_loop_runner import EventLoopRunner

Expand All @@ -36,7 +37,7 @@ def __init__(self, options: ClientOptions, provider_factory: Optional[TClientPro

# Set correct simplyprint version
if options.backend:
SimplyPrintUrl.set_current(options.backend)
SimplyPrintURL.set_backend(options.backend)

config_manager_class = options.config_manager_type.get_class()
instance_class = options.mode.get_class()
Expand Down Expand Up @@ -105,12 +106,13 @@ async def _register_configs():
await asyncio.gather(*[self.load(config) for config in configs],
return_exceptions=True)

async with self.instance:
# Register all clients this has to be non-blocking
# so that instance run can start polling events as we wait
# to get the connected message before we can start sending events.
_ = self.instance.event_loop.create_task(_register_configs())
await self.instance.run()
with suppress(asyncio.CancelledError):
async with self.instance:
# Register all clients this has to be non-blocking
# so that instance run can start polling events as we wait
# to get the connected message before we can start sending events.
_ = self.instance.event_loop.create_task(_register_configs())
await self.instance.run()

self.logger.debug("Client instance has stopped")

Expand Down
4 changes: 2 additions & 2 deletions simplyprint_ws_client/client/instance/multi_printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ...connection.connection import ConnectionConnectedEvent, ConnectionPollEvent
from ...events.client_events import ClientEvent
from ...events.server_events import MultiPrinterAddedEvent, MultiPrinterRemovedEvent
from ...helpers.url_builder import SimplyPrintUrl
from ...helpers.url_builder import SimplyPrintURL


class MultiPrinterException(InstanceException):
Expand Down Expand Up @@ -57,7 +57,7 @@ def __init__(self, config_manager: ConfigManager[TConfig], **kwargs) -> None:
self.event_bus.on(MultiPrinterAddedEvent, self.on_printer_added_response, priority=10)
self.event_bus.on(MultiPrinterRemovedEvent, self.on_printer_removed_response, priority=10)

self.set_url(str(SimplyPrintUrl.current().ws_url / "mp" / "0" / "0"))
self.set_url(str(SimplyPrintURL().ws_url / "mp" / "0" / "0"))

self.clients = dict()
self.pending_add_waiters = dict()
Expand Down
4 changes: 2 additions & 2 deletions simplyprint_ws_client/client/instance/single_printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
from ..instance.instance import Instance, TClient, TConfig
from ...connection.connection import ConnectionConnectedEvent
from ...events.client_events import ClientEvent
from ...helpers.url_builder import SimplyPrintUrl
from ...helpers.url_builder import SimplyPrintURL


class SinglePrinter(Instance[TClient, TConfig]):
client: Optional[Client[TConfig]] = None

async def add_client(self, client: TClient) -> None:
self.set_url(
str(SimplyPrintUrl.current().ws_url / "p" / str(client.config.id) / str(client.config.token)))
str(SimplyPrintURL().ws_url / "p" / str(client.config.id) / str(client.config.token)))

self.client = client

Expand Down
116 changes: 62 additions & 54 deletions simplyprint_ws_client/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@
import json
import logging
from asyncio import CancelledError
from contextlib import suppress
from typing import Any, Dict, Optional, Union

from aiohttp import (ClientConnectorError, ClientSession,
from aiohttp import (ClientSession,
ClientWebSocketResponse, WSMsgType,
WSServerHandshakeError, ClientOSError)
ClientResponseError, ClientError)

from ..client.client import Client
from ..events import DemandEvent, ServerEvent, EventFactory
from ..events.client_events import ClientEvent, ClientEventMode
from ..events.event import Event
from ..events.event_bus import EventBus
from ..utils import issue_118950_patch # noqa
from ..utils.cancelable_lock import CancelableLock
# from ..utils import issue_118950_patch # noqa
from ..utils.event_loop_provider import EventLoopProvider
from ..utils.traceability import traceable

Expand Down Expand Up @@ -51,81 +53,87 @@ class Connection(EventLoopProvider[asyncio.AbstractEventLoop]):

event_bus: ConnectionEventBus

socket: Optional[ClientWebSocketResponse] = None
ws: Optional[ClientWebSocketResponse] = None
session: Optional[ClientSession] = None

# Ensure only a single thread can connect at a time
connection_lock: asyncio.Lock
# And we can cancel any connection attempts to enforce
# the reconnection timeout.
connection_lock: CancelableLock

url: Optional[str] = None
timeout: float = 5.0

def __init__(self, event_loop_provider: Optional[EventLoopProvider] = None) -> None:
super().__init__(provider=event_loop_provider)
self.event_bus = ConnectionEventBus(event_loop_provider=self)
self.connection_lock = asyncio.Lock()
self.connection_lock = CancelableLock()

def is_connected(self) -> bool:
return self.socket is not None and not self.socket.closed
return self.ws is not None and not self.ws.closed

async def connect(self, url: Optional[str] = None, timeout: Optional[float] = None, allow_reconnects=False) -> None:
async with self.connection_lock:
self.use_running_loop()
with suppress(asyncio.CancelledError):
async with self.connection_lock:
self.use_running_loop()

if self.is_connected() and not allow_reconnects:
return
if self.is_connected() and not allow_reconnects:
return

reconnected = self.is_connected()
reconnected = self.is_connected()

if self.socket or self.session:
await self.close_internal()
self.socket = self.session = None
if self.ws or self.session:
await self.close_internal()
self.ws = self.session = None

self.url = url or self.url
self.timeout = timeout or self.timeout
self.session = ClientSession()
self.url = url or self.url
self.timeout = timeout or self.timeout
self.session = ClientSession()

self.logger.debug(
f"{'Connecting' if not reconnected else 'Reconnecting'} to {url or self.url}")
self.logger.debug(
f"{'Connecting' if not reconnected else 'Reconnecting'} to {url or self.url}")

if not self.url:
raise ValueError("No url specified")
if not self.url:
raise ValueError("No url specified")

socket = None
ws = None

try:
socket = await self.session.ws_connect(
self.url,
timeout=timeout,
autoclose=True,
autoping=True,
heartbeat=10,
max_msg_size=0,
compress=False,
)

except WSServerHandshakeError as e:
self.logger.info(
f"Failed to connect to {self.url} with status code {e.status}: {e.message}")
except (ClientConnectorError, ClientOSError) as e:
self.logger.error(f"Failed to connect to {self.url}", exc_info=e)
except Exception as e:
self.logger.exception(e)

# Handle disconnect in a new task.
if socket is None or socket.closed:
_ = self.event_bus.emit_task(ConnectionDisconnectEvent())
return
try:
ws = await self.session.ws_connect(
self.url,
timeout=timeout,
autoclose=True,
autoping=True,
heartbeat=10,
max_msg_size=0,
compress=False,
)

except ClientResponseError as e:
self.logger.info(f"Failed to connect to {self.url} with status code {repr(e)}")
except (ConnectionRefusedError, ClientError) as e:
self.logger.warning(f"Failed to connect to {self.url} due to a network/client error {e}.")
except Exception as e:
self.logger.error(f"Failed to connect to {self.url} due to an exception {e}.", exc_info=e)

# Handle disconnect in a new task.
if ws is None or ws.closed:
# Kick out any other connection attempts until this point
# and retry the connection via the disconnect event.
self.connection_lock.cancel()
_ = self.event_bus.emit_task(ConnectionDisconnectEvent())
return

self.ws = ws

self.socket = socket
_ = self.event_bus.emit_task(ConnectionConnectedEvent(reconnect=reconnected))

_ = self.event_bus.emit_task(ConnectionConnectedEvent(reconnect=reconnected))
self.logger.debug(f"Connected to {self.url} {reconnected=}")
self.logger.debug(f"Connected to {self.url} {reconnected=}")

async def close_internal(self):
try:
if self.socket:
await self.socket.close()
if self.ws:
await self.ws.close()
except Exception as e:
self.logger.error("An exception occurred while closing to handle a disconnect condition", exc_info=e)

Expand Down Expand Up @@ -174,7 +182,7 @@ async def send_event(self, client: Client, event: ClientEvent) -> None:

message = event.as_dict()

await self.socket.send_json(message)
await self.ws.send_json(message)

event.on_sent()

Expand All @@ -193,11 +201,11 @@ async def poll_event(self, timeout=None) -> None:
return

try:
message = await self.socket.receive(timeout=timeout)
message = await self.ws.receive(timeout=timeout)

if message.type in (WSMsgType.CLOSED, WSMsgType.CLOSING, WSMsgType.CLOSE):
self.logger.debug(
f"Websocket closed by server with code: {self.socket.close_code} and reason: {message.extra}")
f"Websocket closed by server with code: {self.ws.close_code} and reason: {message.extra}")

# An exception can be passed via the message.data
if message.data:
Expand Down
6 changes: 3 additions & 3 deletions simplyprint_ws_client/helpers/simplyprint_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import aiohttp

from ..const import VERSION
from ..helpers.url_builder import SimplyPrintUrl
from ..helpers.url_builder import SimplyPrintURL


class SimplyPrintApi:
@staticmethod
async def post_snapshot(snapshot_id: str, image_data: bytes):

endpoint = SimplyPrintUrl.current().api_url / "jobs" / "ReceiveSnapshot"
endpoint = SimplyPrintURL().api_url / "jobs" / "ReceiveSnapshot"

data = {
"id": snapshot_id,
Expand All @@ -36,7 +36,7 @@ async def post_logs(
# Request /printers/ReceiveLogs with the token as post data
# And each of the files as multipart/form-data

endpoint = SimplyPrintUrl.current().api_url / "printers" / "ReceiveLogs" % {"pid": printer_id}
endpoint = SimplyPrintURL().api_url / "printers" / "ReceiveLogs" % {"pid": printer_id}

data = {
"token": token,
Expand Down
Loading

0 comments on commit 6f0efcc

Please sign in to comment.