TasksΒΆ


@worker.task(
    ignore_result=False,
    bind=True,
    base=InferenceTask,
)
def inference(self, user_id: int):
    try:
        db = SessionLocal()

        # check if there is a new model to use
        db_model = crud.get_active_model(db)

        if self.model is None or self.path != db_model.path:
            logging.info(f'Reloading model from path {db_model.path}')
            self.path = db_model.path
            self.model = Model(self.path)

        # get data to process
        user = crud.get_user(db, user_id)
        locs = crud.get_locations(db)

        locs_id = [loc.location_id for loc in locs]

        df = pd.DataFrame([user.__dict__ | loc.__dict__ for loc in locs], columns=self.model.metadata['features'])

        # apply model to data
        score = self.model(df.values)

        df['score'] = score
        df['user_id'] = user_id
        df['location_id'] = locs_id
        df['task_id'] = str(self.request.id)

        # save task id, user_id, and scores to database
        crud.create_results(db, df)

    finally:
        db.close()