Core module for LeagueWizard, handling LCU connection and event processing.
This module establishes a connection to the League of Legends client (LCU) via
WebSocket, retrieves necessary authentication details, and dispatches incoming
game events to the on_message handler. It also manages the system tray icon.
find_client_full_path(exe='LeagueClient.exe')
Finds the full path of the specified executable.
Source code in src\leaguewizard\api\core.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43 | def find_client_full_path(exe: str = "LeagueClient.exe") -> Path:
"""Finds the full path of the specified executable."""
proc_path: Any = next(
(i.exe() for i in psutil.process_iter() if i.name() == exe),
None,
)
if proc_path is None:
raise LeWizardGenericError(
message=f"{exe} not found. Is client running?",
show=True,
title="Error.",
terminate=True,
)
return Path(proc_path)
|
start()
async
Initializes the application and starts listening for events.
This function sets up a system tray icon and establishes a connection to the League
Client. It retrieves the client path and lockfile, then connects to the websocket to
listen for events. The function handles incoming events and processes them using the
on_message function. It also manages exceptions related to websocket connections and
application termination.
Source code in src\leaguewizard\api\core.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 | async def start() -> None:
"""Initializes the application and starts listening for events.
This function sets up a system tray icon and establishes a connection to the League
Client. It retrieves the client path and lockfile, then connects to the websocket to
listen for events. The function handles incoming events and processes them using the
`on_message` function. It also manages exceptions related to websocket connections and
application termination.
"""
with SysTrayIcon(
str(image_path),
"LeagueWizard",
on_quit=lambda e: os._exit(0),
) as tray:
logger.debug("Tray initialized")
try:
league_client = find_client_full_path()
logger.debug(f"Client found: {league_client}")
lockfile = LockFile(league_client)
logger.debug(f"Lockfile found: {lockfile.lockfile_path}")
context = ssl_context()
assert lockfile.wss_addr is not None # noqa: S101
async with websockets.connect(
uri=lockfile.wss_addr,
additional_headers=lockfile.auth_header,
ssl=context,
) as ws:
logger.debug("Joining websocket session.")
await ws.send('[5, "OnJsonApiEvent_lol-champ-select_v1_session"]')
logger.debug(
"Subscribed to OnJsonApiEvent_lol-champ-select_v1_session.",
)
aio_client = aiohttp.ClientSession(
base_url=lockfile.https_addr,
headers=lockfile.auth_header,
)
aio_client_id = random.randint(0, 1000) # noqa: S311
async for event in ws:
logger.debug("Event received.")
min_event_length = 3
if event is not None and len(event) >= min_event_length:
await on_message(
event,
aio_client,
aio_client_id,
)
except websockets.exceptions.ConnectionClosedError as e:
logger.exception(e.args)
except (KeyboardInterrupt, asyncio.exceptions.CancelledError) as e:
logger.exception(e.args)
raise LeWizardGenericError(show=False, terminate=True) from e
finally:
tray.shutdown()
sys.exit(0)
|