Source code for pybricks.jobs

import json
import os

import requests


[docs]class JobsApi(object): """The Jobs API""" def __init__(self, hostname, token): self.hostname = "%s/api/" % hostname self.__headers = {'authorization': "Bearer %s" % token, "content-type": "application/scim+json", "accept": "application/scim+json"}
[docs] def run_now(self, job_id, notebook_params): """ Runs the job now, and returns the run_id of the triggered run. :param job_id: The job id INT number :param notebook_params: A map from keys to values for jobs with notebook task, e.g. "notebook_params": {"name": "john doe", "age": "35"} :return: """ endpoint = "2.0/jobs/run-now" data = json.dumps({'job_id': job_id, 'notebook_params': notebook_params}) url = "%s%s" % (self.hostname, endpoint) req = requests.post(url, headers=self.__headers, data=data) objects = req.json() return objects
[docs] def get(self, job_id): """Retrieves information about a single job.""" endpoint = "2.0/jobs/get" params = {'job_id': job_id} url = "%s%s" % (self.hostname, endpoint) req = requests.get(url, headers=self.__headers, params=params) objects = req.json() return objects
[docs] def list(self): """Lists all jobs.""" endpoint = "2.0/jobs/list" url = "%s%s" % (self.hostname, endpoint) req = requests.get(url, headers=self.__headers) objects = req.json()['jobs'] return objects
[docs] def get_by_name(self, job_name): """Retrieves information about a single job by name.""" for job in self.list(): if job['settings']['name'] == job_name: return self.get(int(job['job_id']))
[docs] def runs_list(self, job_id, active_only=True): """Lists runs from most recently started to least.""" endpoint = "2.0/jobs/runs/list" url = "%s%s" % (self.hostname, endpoint) params = {'job_id': job_id, 'active_only': active_only} req = requests.get(url, headers=self.__headers, params=params) objects = req.json() return objects