API Routes

In this page we will go through all the routes we implemented with FastAPI.

Each route is basically a web URL that accept data in JSON format, performs some operations and then reply with a status code and a result.

The status code can be used to discriminate if a request has been executed successfully (2xx values) or if there was an internal error on the server (5xx values) or an error from the client (4xx values).

Data exchange

Before getting into details, we need to introduce how data is shared between the user and the application. Following the best practices, data has to be shared in a structured way (a JSON for example). FastAPI has some neat way to do this using a module called pydantic.

Pydantic defines the format of the data using Python classes, in this way it is possible to shape the information in a strict format making them easy to manage. In the following snippet of code, there are the classes we wrote to define the structure of the data. A brief explanation of what each class defines is written as a comment.

An example of a data definition class is shown below, full class implementation can be found in api/requests.py.

1class TaskStatus(BaseModel):
2    """Class that defines the output data of an inference."""
3    task_id: str
4    status : str
5    type: str

This class will let the API produce a JSON containing three fields: task_id is an unique identifier for the task started; status defines the current status of task; type is an extra field that tells the user which kind of task has been started.

Routes

Now we do a brief explanation of each route implemented in out API.

/inference/start

 1@api.post('/inference/start')
 2async def schedule_inference(user_data: requests.UserData, db: Session = Depends(get_db)):
 3    """This is the endpoint used for schedule an inference."""
 4    crud.create_event(db, 'inference_start')
 5    ud = crud.create_user_data(db, user_data.dict())
 6    task: AsyncResult = inference.delay(ud.user_id)
 7
 8    db_inf = crud.create_inference(db, task.task_id, task.status)
 9    return requests.TaskStatus(
10        task_id=db_inf.task_id,
11        status=db_inf.status,
12        type='inference'
13    )

/inference/start is used to start an inference task with the current active model.

This route receive as parameters:

  • the user’s data which contains an ID and some features,

  • a open session to a database generated by FastAPI.

First of all, we log the event in the database at line 54, then we store the received data. At line 6 we start the asynchronous process by telling Celery to spawn an inference task. user_id is then used to retrieve the data stored in the database at line 5.

At line 8 we insert a new entry in the database that help the user tracking the status of the task. Once the task is finished, the user can retrieve the predictions using another route.

The route, at the end, return the status of the task to the user.

/inference/status/{task_id}

 1@api.get('/inference/status/{task_id}', response_model=requests.TaskStatus)
 2async def get_inference_status(task_id: str, db: Session = Depends(get_db)):
 3    """This is the endpoint to get the results of an inference."""
 4    crud.create_event(db, 'status')
 5    
 6    task = AsyncResult(task_id)
 7    task_id, status = task.task_id, task.status
 8
 9    db_inf = crud.get_inference(db, task_id=task_id)
10
11    if db_inf is None:
12        raise HTTPException(status_code=404, detail='Task not found')
13
14    db_inf.status = status
15    db_inf = crud.update_inference(db, db_inf.task_id, db_inf.status)
16
17    if task.failed():
18        raise HTTPException(status_code=500, detail='Task failed')
19
20    if not task.ready():
21        return requests.TaskStatus(
22            task_id=db_inf.task_id,
23            status=db_inf.status,
24            type='inference',
25        )
26
27    return requests.TaskStatus(
28        task_id=db_inf.task_id,
29        status=db_inf.status,
30        type='inference',
31    )

/inference/status/{task_id} is used to check the status of the task running asynchronously.

This route accept as parameter the task_id associated to the process spawned by celery.

The extra code is similar to the previous route: it tracks the event and contains the logic to create the correct response.

/inference/results/{task_id}

 1@api.get('/inference/results/{task_id}')
 2async def get_inference_results(task_id: str, limit: int=10, db: Session = Depends(get_db)):
 3    """This is the endpoint to get the results with scores once the inference 
 4    task has been completed.
 5    
 6    Note: check the status of the task with the '/inference/status' endpoint.
 7    """
 8    crud.create_event(db, 'results')
 9
10    locations = crud.get_results_locations(db, task_id, limit)
11
12    crud.mark_locations_as_shown(db, task_id, locations)
13
14    return locations

/inference/results/{task_id} is used to retrieve the results of the inference once the response of /inference/status/{task_id} notify the successful completion of an inference.

Note how on line 10 we get from the database the locations with the score produced by the inference.

/inference/select/

 1@api.put('/inference/select/')
 2async def get_click(label: requests.LabelData, db: Session = Depends(get_db)):
 3    """This is the endpoint used to simulate a click on a choice.
 4    A click will be registered as a label on the data"""
 5    crud.create_event(db, 'selection')
 6
 7    if label.location_id == -1:
 8        crud.create_event(db, 'bad_inference')
 9        return
10
11    else:
12        crud.create_event(db, 'good_inference')
13        db_result = crud.update_result_label(db, label.task_id, label.location_id)
14
15        if db_result is None:
16            return HTTPException(404, f"Result not found: invalid task_id or location_id")
17
18        return db_result

Since we want to simulate the act of a user that selects the desired location from a list, we use the /inference/select route for this purpose.

The routes expects to receive an id that indicates the selection of an user. This creates an history of labelled data that can be used to train a new model.

In the case no location has been selected, we register a bad_inference event on our log. Otherwise, we update the data in the database.

/train/start

 1@api.post('/train/start')
 2async def schedule_training(db: Session = Depends(get_db)):
 3    """This is the endpoint to start the training of a new model."""
 4    crud.create_event(db, 'training')
 5
 6    task: AsyncResult = training.delay()
 7
 8    db_model = crud.create_model(db, task.task_id, task.status)
 9    return requests.TaskStatus(
10        task_id=db_model.task_id,
11        status=db_model.status,
12        type='training'
13    )

The training of a new model is an operation that can be started manually only. This routes schedule the second type of task on Celery.

/content/…

The routes that begins with /content are used to debug what is stored in the database.

These routes are reported there as a demonstrative example and can be simply ignored.

 1@api.get('/content/info')
 2async def get_content_info(db: Session = Depends(get_db)):
 3    n_locations = crud.count_locations(db)
 4    n_users = crud.count_users(db)
 5
 6    return requests.ContentInfo(
 7        locations=n_locations,
 8        users=n_users,
 9    )
10
11
12@api.get('/content/location/{location_id}')
13async def get_content_location(location_id: int, db: Session = Depends(get_db)):
14    return crud.get_location(db, location_id)
15
16
17@api.get('/content/locations')
18async def get_content_locations(db: Session = Depends(get_db)):
19    return crud.get_locations(db)
20
21
22@api.get('/content/user/{user_id}')
23async def get_content_user(user_id: int, db: Session = Depends(get_db)):
24    return crud.get_user(db, user_id)
25
26
27@api.get('/content/users')
28async def get_content_users(db: Session = Depends(get_db)):
29    return crud.get_users(db)
30
31
32@api.get('/content/result/{result_id}')
33async def get_content_result(result_id: int, db: Session = Depends(get_db)):
34    return crud.get_result(db, result_id)
35
36
37@api.get('/content/results')
38async def get_content_result(task_id: str, db: Session = Depends(get_db)):
39    return crud.get_results(db, task_id)