Skip to content

textual_shell.job

Job

Bases: ABC

Base Job class. Each command should have a corresponding Job it creates.

Parameters:

Name Type Description Default
cmd str

The name of the command that created the job.

required
shell baseshell

The shell widget for posting messages.

required
Source code in src/textual_shell/job.py
class Job(ABC):
    """
    Base Job class. Each command should have a corresponding Job it creates.

    Args:
        cmd (str): The name of the command that created the job.
        shell (textual_shell.widgets.baseshell): The shell widget for posting messages.
    """

    class Status(Enum):
        """Enumeration of the Statuses."""
        PENDING = 0
        RUNNING = 1
        CANCELLED = 2
        COMPLETED = 3
        ERROR = 4


    class StatusChange(Message):
        """
        Message to notify a change in status for a job.

        Args:
            job_id (str): The job's identifier.
            status (Job.Status): The new status.
        """
        def __init__(
            self,
            job_id: Annotated[str, 'The jobs identifier.'],
            status: Annotated['Job.Status', 'The new status.']
        ) -> None:
            super().__init__()
            self.job_id = job_id
            self.status = status


    class Start(Message):
        """
        Message to notify the app that a command has started.

        Args:
            job (Job): The job that has been created and started.
        """
        def __init__(
            self,
            job: Annotated['Job', 'The job created by the command.']
        ) -> None:
            super().__init__()
            self.job = job


    class Finish(Message):
        """
        Message to notify the app that the command has finished.

        Args:
            job_id (str): The id of the job.
        """
        def __init__(
            self,
            job_id: Annotated[str, 'The id of the job.']
        ) -> None:
            super().__init__()
            self.job_id = job_id


    class Log(Message):
        """
        Logging event for jobs.

        Args:
            sender (str): The id of the Job.
            msg (str): The log message.
            severity (int): The level of the severity.

        """
        def __init__(
            self,
            sender: Annotated[str, 'The id of the Job.'],
            msg: Annotated[str, 'The log message.'],
            severity: Annotated[int, 'The level of the severity']
        ) -> None:
            super().__init__()
            self.sender = sender
            self.msg = msg
            self.severity = severity


    def __init__(
        self,
        cmd: Annotated[str, 'The name of the command'],
        shell,
        screen: Screen=None
    ) -> None:
        self.id = f'{cmd}_{self._generate_id()}'
        self.shell = shell
        self.cmd = cmd
        self.screen = screen

    def _generate_id(self):
        """
        Generate a random 6 digit string.

        Returns:
            id (str): The id for the job.
        """
        return ''.join(random.choices(string.digits, k=6))

    def pending(self) -> None:
        """Signal the Job Manager that this job is pending."""
        self.status = self.Status.PENDING
        self.shell.post_message(
            self.StatusChange(
                self.id,
                self.Status.PENDING
            )
        )

    def running(self) -> None:
        """Signal the Job Manager that this job is running."""
        self.status = self.Status.RUNNING
        self.shell.post_message(
            self.StatusChange(
                self.id,
                self.Status.RUNNING
            )
        )

    def cancelled(self) -> None:
        """Signal the Job Manager that this job was cancelled."""
        self.status = self.Status.CANCELLED
        self.shell.post_message(
            self.StatusChange(
                self.id,
                self.Status.CANCELLED
            )
        )

    def completed(self) -> None:
        """Signal the Job Manager that this job has completed."""
        self.status = self.Status.COMPLETED
        self.shell.post_message(
            self.StatusChange(
                self.id,
                self.Status.COMPLETED
            )
        )

    def error(self) -> None:
        """Signal the Job Manager that this job has Errored"""
        self.status = self.Status.ERROR
        self.shell.post_message(
            self.StatusChange(
                self.id,
                self.Status.ERROR
            )
        )

    async def wait_for_cancel(
        self,
        sleep_interval: Annotated[int, 'How long the sleep interval should be.']=10
    ) -> None:
        """
        Wait for the jobs task to be cancelled.

        Args:
            sleep_interval (int): How long to sleep for in between checks. 
        """
        while not self.task.cancelled():
            await asyncio.sleep(sleep_interval)

    def send_log(
        self,
        msg: Annotated[str, 'log message'],
        severity: Annotated[str, 'The level of severity']
    ) -> None:
        """
        Send logs to the app.

        Args:
            msg (str): The log message.
            severity (str): The severity level of the log.
        """
        self.shell.post_message(self.Log(self.cmd, msg, severity))

    async def start(self):
        """Create a asyncio task for the job and 
        schedule it for execution."""
        self.shell.post_message(self.Start(self))
        self.pending()
        self.task = asyncio.create_task(
            self.execute(),
            name=self.id
        )
        self.task.add_done_callback(self.finish)

    def finish(self, task: asyncio.Task):
        """Send a finish message to clean up the job."""
        self.shell.post_message(self.Finish(self.id))

    @abstractmethod
    async def execute(self):
        """Execute the async task for the job.
        Subclasses must implement this."""
        pass

Finish

Bases: Message

Message to notify the app that the command has finished.

Parameters:

Name Type Description Default
job_id str

The id of the job.

required
Source code in src/textual_shell/job.py
class Finish(Message):
    """
    Message to notify the app that the command has finished.

    Args:
        job_id (str): The id of the job.
    """
    def __init__(
        self,
        job_id: Annotated[str, 'The id of the job.']
    ) -> None:
        super().__init__()
        self.job_id = job_id

Log

Bases: Message

Logging event for jobs.

Parameters:

Name Type Description Default
sender str

The id of the Job.

required
msg str

The log message.

required
severity int

The level of the severity.

required
Source code in src/textual_shell/job.py
class Log(Message):
    """
    Logging event for jobs.

    Args:
        sender (str): The id of the Job.
        msg (str): The log message.
        severity (int): The level of the severity.

    """
    def __init__(
        self,
        sender: Annotated[str, 'The id of the Job.'],
        msg: Annotated[str, 'The log message.'],
        severity: Annotated[int, 'The level of the severity']
    ) -> None:
        super().__init__()
        self.sender = sender
        self.msg = msg
        self.severity = severity

Start

Bases: Message

Message to notify the app that a command has started.

Parameters:

Name Type Description Default
job Job

The job that has been created and started.

required
Source code in src/textual_shell/job.py
class Start(Message):
    """
    Message to notify the app that a command has started.

    Args:
        job (Job): The job that has been created and started.
    """
    def __init__(
        self,
        job: Annotated['Job', 'The job created by the command.']
    ) -> None:
        super().__init__()
        self.job = job

Status

Bases: Enum

Enumeration of the Statuses.

Source code in src/textual_shell/job.py
class Status(Enum):
    """Enumeration of the Statuses."""
    PENDING = 0
    RUNNING = 1
    CANCELLED = 2
    COMPLETED = 3
    ERROR = 4

StatusChange

Bases: Message

Message to notify a change in status for a job.

Parameters:

Name Type Description Default
job_id str

The job's identifier.

required
status Status

The new status.

required
Source code in src/textual_shell/job.py
class StatusChange(Message):
    """
    Message to notify a change in status for a job.

    Args:
        job_id (str): The job's identifier.
        status (Job.Status): The new status.
    """
    def __init__(
        self,
        job_id: Annotated[str, 'The jobs identifier.'],
        status: Annotated['Job.Status', 'The new status.']
    ) -> None:
        super().__init__()
        self.job_id = job_id
        self.status = status

cancelled()

Signal the Job Manager that this job was cancelled.

Source code in src/textual_shell/job.py
def cancelled(self) -> None:
    """Signal the Job Manager that this job was cancelled."""
    self.status = self.Status.CANCELLED
    self.shell.post_message(
        self.StatusChange(
            self.id,
            self.Status.CANCELLED
        )
    )

completed()

Signal the Job Manager that this job has completed.

Source code in src/textual_shell/job.py
def completed(self) -> None:
    """Signal the Job Manager that this job has completed."""
    self.status = self.Status.COMPLETED
    self.shell.post_message(
        self.StatusChange(
            self.id,
            self.Status.COMPLETED
        )
    )

error()

Signal the Job Manager that this job has Errored

Source code in src/textual_shell/job.py
def error(self) -> None:
    """Signal the Job Manager that this job has Errored"""
    self.status = self.Status.ERROR
    self.shell.post_message(
        self.StatusChange(
            self.id,
            self.Status.ERROR
        )
    )

execute() abstractmethod async

Execute the async task for the job. Subclasses must implement this.

Source code in src/textual_shell/job.py
@abstractmethod
async def execute(self):
    """Execute the async task for the job.
    Subclasses must implement this."""
    pass

finish(task)

Send a finish message to clean up the job.

Source code in src/textual_shell/job.py
def finish(self, task: asyncio.Task):
    """Send a finish message to clean up the job."""
    self.shell.post_message(self.Finish(self.id))

pending()

Signal the Job Manager that this job is pending.

Source code in src/textual_shell/job.py
def pending(self) -> None:
    """Signal the Job Manager that this job is pending."""
    self.status = self.Status.PENDING
    self.shell.post_message(
        self.StatusChange(
            self.id,
            self.Status.PENDING
        )
    )

running()

Signal the Job Manager that this job is running.

Source code in src/textual_shell/job.py
def running(self) -> None:
    """Signal the Job Manager that this job is running."""
    self.status = self.Status.RUNNING
    self.shell.post_message(
        self.StatusChange(
            self.id,
            self.Status.RUNNING
        )
    )

send_log(msg, severity)

Send logs to the app.

Parameters:

Name Type Description Default
msg str

The log message.

required
severity str

The severity level of the log.

required
Source code in src/textual_shell/job.py
def send_log(
    self,
    msg: Annotated[str, 'log message'],
    severity: Annotated[str, 'The level of severity']
) -> None:
    """
    Send logs to the app.

    Args:
        msg (str): The log message.
        severity (str): The severity level of the log.
    """
    self.shell.post_message(self.Log(self.cmd, msg, severity))

start() async

Create a asyncio task for the job and schedule it for execution.

Source code in src/textual_shell/job.py
async def start(self):
    """Create a asyncio task for the job and 
    schedule it for execution."""
    self.shell.post_message(self.Start(self))
    self.pending()
    self.task = asyncio.create_task(
        self.execute(),
        name=self.id
    )
    self.task.add_done_callback(self.finish)

wait_for_cancel(sleep_interval=10) async

Wait for the jobs task to be cancelled.

Parameters:

Name Type Description Default
sleep_interval int

How long to sleep for in between checks.

10
Source code in src/textual_shell/job.py
async def wait_for_cancel(
    self,
    sleep_interval: Annotated[int, 'How long the sleep interval should be.']=10
) -> None:
    """
    Wait for the jobs task to be cancelled.

    Args:
        sleep_interval (int): How long to sleep for in between checks. 
    """
    while not self.task.cancelled():
        await asyncio.sleep(sleep_interval)