Workers¶
In this chapter we will explore the topic of concurrency and how to use Textual's Worker API to make it easier.
The Worker API was added in version 0.18.0
Concurrency¶
There are many interesting uses for Textual which require reading data from an internet service. When an app requests data from the network it is important that it doesn't prevent the user interface from updating. In other words, the requests should be concurrent (happen at the same time) as the UI updates.
This is also true for anything that could take a significant time (more than a few milliseconds) to complete. For instance, reading from a subprocess or doing compute heavy work.
Managing this concurrency is a tricky topic, in any language or framework. Even for experienced developers, there are gotchas which could make your app lock up or behave oddly. Textual's Worker API makes concurrency far less error prone and easier to reason about.
Workers¶
Before we go into detail, let's see an example that demonstrates a common pitfall for apps that make network requests.
The following app uses httpx to get the current weather for any given city, by making a request to wttr.in.
import httpx
from rich.text import Text
from textual.app import App, ComposeResult
from textual.containers import VerticalScroll
from textual.widgets import Input, Static
class WeatherApp(App):
"""App to display the current weather."""
CSS_PATH = "weather.tcss"
def compose(self) -> ComposeResult:
yield Input(placeholder="Enter a City")
with VerticalScroll(id="weather-container"):
yield Static(id="weather")
async def on_input_changed(self, message: Input.Changed) -> None:
"""Called when the input changes"""
await self.update_weather(message.value)
async def update_weather(self, city: str) -> None:
"""Update the weather for the given city."""
weather_widget = self.query_one("#weather", Static)
if city:
# Query the network API
url = f"https://wttr.in/{city}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
weather = Text.from_ansi(response.text)
weather_widget.update(weather)
else:
# No city, so just blank out the weather
weather_widget.update("")
if __name__ == "__main__":
app = WeatherApp()
app.run()
If you were to run this app, you should see weather information update as you type. But you may find that the input is not as responsive as usual, with a noticeable delay between pressing a key and seeing it echoed in screen. This is because we are making a request to the weather API within a message handler, and the app will not be able to process other messages until the request has completed (which may be anything from a few hundred milliseconds to several seconds later).
To resolve this we can use the run_worker method which runs the update_weather coroutine (async def function) in the background. Here's the code:
import httpx
from rich.text import Text
from textual.app import App, ComposeResult
from textual.containers import VerticalScroll
from textual.widgets import Input, Static
class WeatherApp(App):
"""App to display the current weather."""
CSS_PATH = "weather.tcss"
def compose(self) -> ComposeResult:
yield Input(placeholder="Enter a City")
with VerticalScroll(id="weather-container"):
yield Static(id="weather")
async def on_input_changed(self, message: Input.Changed) -> None:
"""Called when the input changes"""
self.run_worker(self.update_weather(message.value), exclusive=True)
async def update_weather(self, city: str) -> None:
"""Update the weather for the given city."""
weather_widget = self.query_one("#weather", Static)
if city:
# Query the network API
url = f"https://wttr.in/{city}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
weather = Text.from_ansi(response.text)
weather_widget.update(weather)
else:
# No city, so just blank out the weather
weather_widget.update("")
if __name__ == "__main__":
app = WeatherApp()
app.run()
This one line change will make typing as responsive as you would expect from any app.
The run_worker method schedules a new worker to run update_weather, and returns a Worker object. This happens almost immediately, so it won't prevent other messages from being processed. The update_weather function is now running concurrently, and will finish a second or two later.
Tip
The Worker object has a few useful methods on it, but you can often ignore it as we did in weather02.py.
The call to run_worker also sets exclusive=True which solves an additional problem with concurrent network requests: when pulling data from the network, there is no guarantee that you will receive the responses in the same order as the requests.
For instance, if you start typing "Paris", you may get the response for "Pari" after the response for "Paris", which could show the wrong weather information.
The exclusive flag tells Textual to cancel all previous workers before starting the new one.
Work decorator¶
An alternative to calling run_worker manually is the work decorator, which automatically generates a worker from the decorated method.
Let's use this decorator in our weather app:
import httpx
from rich.text import Text
from textual import work
from textual.app import App, ComposeResult
from textual.containers import VerticalScroll
from textual.widgets import Input, Static
class WeatherApp(App):
"""App to display the current weather."""
CSS_PATH = "weather.tcss"
def compose(self) -> ComposeResult:
yield Input(placeholder="Enter a City")
with VerticalScroll(id="weather-container"):
yield Static(id="weather")
async def on_input_changed(self, message: Input.Changed) -> None:
"""Called when the input changes"""
self.update_weather(message.value)
@work(exclusive=True)
async def update_weather(self, city: str) -> None:
"""Update the weather for the given city."""
weather_widget = self.query_one("#weather", Static)
if city:
# Query the network API
url = f"https://wttr.in/{city}"
async with httpx.AsyncClient() as client:
response = await client.get(url)