Jobs Usage

Use Jobs API (available as Client.jobs) for starting a job, killing it, getting list of running jobs etc. This chapter describes several common scenarios.

Here we describe the most common scenarios, see Jobs API Reference for the full list of job namespace methods.

Start a Job

To start a job use Jobs.start() method.

The method accepts image and required resources preset name as parameters and returns JobDescription with information about started job:

from neuro_sdk import get

async with get() as client:
    job = await client.jobs.start(
        image=client.parse.remote_image("ubuntu:latest"),
        preset_name="cpu-small",
        command="sleep 30",
    )

The example above starts a job using ubuntu:latest public image, cpu-small resources preset and executes sleep 30 command inside started container.

Note

After return from the call a new job is scheduled for execution but usually it’s status is pending. The Neuro Platform takes time for preparing resources for a job, pulling image from registry etc. Startup time can vary from seconds for hot start to minutes for cold one.

Check Job Status

After spawning a job we have JobDescription instance that describes job status (and other things like executed command line).

A job takes time for deployment, it can be terminated by different reasons, e.g. requested image name doesn’t exist.

The following snippet waits for job’s starting execution or failure:

# job is a JobDescription given by former client.job.run() call

while True:
    job = await client.jobs.status(job.id)
    if job.status in (JobStatus.RUNNING, JobStatus.SUCCEEDED):
        break
    elif job.status == JobStatus.FAILED:
        raise RuntimeError(f"Job {job.id} failed with {job.reason}:"
                           f"{job.history.description}")
    else:
        await asyncio.sleep(1)

Mount Neuro Storage folders

The Neuro Platform provides access to Neuro storage (storage://) by mounted folders inside a container (volumes in Docker glossary).

If you have a directory storage:folder and want to mount it inside a container under /var/data path please create a Volume and use it in Container definition:

from yarl import URL

volume = Volume(
    storage_uri=URL("storage:folder"),
    container_path="/mnt/data",
)

job = await client.jobs.run(
    Container(
        image=client.parse.remote_image("ubuntu:latest"),
        resources=Resources(memory_mb=100, cpu=0.5),
        command="sleep 30",
        volumes=[volume],
    )
)

There is a parsing helper that can construct a Volume instance from a string in format that is used in CLI:

volume = client.parse.volume("storage:folder:/var/data")

To specify read-only mount point please pass read_only=True to Volume constructor, e.g. the following code mounts public shared storage://neuro/public folder in read-only mode:

public_volume = Volume(
    storage_uri=URL("storage:neuro/public"),
    container_path="/mnt/public",
    read_only=True,
)

The same effect can be achieved by using a parser API:

public_volume = client.parse.volume(
    "storage:neuro/public:/mnt/public:ro")

Pass a list of volumes into container to support multiple mount points:

Container(
    image=...,
    resources=...,
    command=...,
    volumes=[volume, public_volume],
)

See also

Storage Usage for the storage manipulation API.

Kill a Job

Use Jobs.kill() for enforcing job to stop:

await client.jobs.kill(job.id)

Expose job’s TCP ports locally

Sometimes there is a need to access TCP endpoints exposed by a job executed on the Neuro Platform from local workstation.

For example, you’ve started a gRPC server inside a container on TCP port 12345 and want to access this service from your laptop.

You need to bridge this remote 12345 port into a local TCP namespace (e.g. 23456 local port by Jobs.port_forward() method:

from grpclib.client import Channel

# generated by protoc
from .helloworld_pb2 import HelloRequest, HelloReply
from .helloworld_grpc import GreeterStub

 async with client.jobs.port_forward(job.id, 23456, 12345):
     # open gRPC client and use it

     channel = Channel('127.0.0.1', 23456)
     greeter = GreeterStub(channel)

     reply: HelloReply = await greeter.SayHello(HelloRequest(name='Dr. Strange'))
     print(reply.message)

     channel.close()

The example uses grpclib library for make client gRPC requests.

Job preemption

Job preemption means that unlike normal jobs preemptible ones can be stopped by kernel when the system has lack of resources and restarted later. All memory and local disk changes are lost but data written to the Neuro Storage (see Mount Neuro Storage folders ) is persistent.

To support preemption job’s code should be organized in the following way: it dumps snapshots on disk periodically. On restart the code checks for last saved snapshot and continues the work from this point.

AI frameworks usually supports snapshots out of the box, see saving and loading models in pytorch or Keras ModelCheckpoint.

Preemptible job is not such convenient as regular job but it’s computational time is much cheaper (exact numbers varies on concrete computational cluster provides, e.g. Google Compute, AWS or Azure).

Jobs are non-preeptible by default, you can change this by passing preemptible_node=True flag to Jobs.run().