Запуск планирования с помощью скрипта
Вы можете запустить задачу календарного планирования через API с помощью скрипта на Python. Для запуска скрипта используйте вызов команды python3 script.py request.json
.
При работе с API календарного планирования вам потребуется API-ключ и секрет. Чтобы получить секрет, обратитесь к вашему клиентскому менеджеру или напишите в службу поддержки.
Подпись к запросу вычисляется автоматически на основе тела запроса.
Запрос на запуск задачи можно отправить с помощью библиотеки requests
. Подставьте значение API-ключа в поле APIKEY
, а секрет — в поле SECRET
.
#!/usr/bin/env python3
import hashlib
import hmac
import json
import requests
import sys
import time
from urllib.parse import urlencode
APIKEY = '<ваш API-ключ>'
SECRET = '<ваш secret>'
SOLVER_URL = 'https://courier.yandex.ru'
USER_AGENT = 'RouteQ Support Agent/1.0'
CALENDAR_PLANNING_URI = '/vrs/api/v1/calendar_planning/tasks'
def gen_signature(key, parts):
HMAC = hmac.new(bytes.fromhex(key), None, digestmod=hashlib.sha256)
for part in parts:
HMAC.update(part.encode('utf-8'))
return HMAC.hexdigest()
def make_request(method, uri, **kwargs):
body = json.dumps(kwargs['json']) if kwargs.get('json') else ''
signature = gen_signature(SECRET,[USER_AGENT, method, ' ', uri, body])
url = SOLVER_URL + uri
headers = {
'X-YaCourier-Signature': signature,
'User-Agent': USER_AGENT,
}
print(f"{method} {url}")
response = requests.request(method, url, verify=False, headers=headers, **kwargs)
return response
def add_task(task):
params = {'apikey': APIKEY}
uri = f"{CALENDAR_PLANNING_URI}?{urlencode(params)}"
response = make_request('POST', uri, json=task)
response.raise_for_status()
j = response.json()
return j['id']
def get_status(task_id):
params = {'apikey': APIKEY}
uri = f"{CALENDAR_PLANNING_URI}/{task_id}/status?{urlencode(params)}"
response = make_request('GET', uri)
response.raise_for_status()
j = response.json()
print(j)
return j['status']
def get_result(task_id):
params = {'apikey': APIKEY}
uri = f"{CALENDAR_PLANNING_URI}/{task_id}/result?{urlencode(params)}"
response = make_request('GET', uri)
response.raise_for_status()
j = response.json()
return j
def main():
with open(sys.argv[1]) as fd:
task = json.load(fd)
task_id = add_task(task)
while get_status(task_id) != 'completed':
time.sleep(2)
result = get_result(task_id)
print(json.dumps(result)[:100])
if __name__ == '__main__':
main()
Была ли статья полезна?
Предыдущая
Следующая