Python-скрипт для загрузки RSS-канала


Для работы со скриптом необходимы знания Python.

Python-скрипт позволяет загружать RSS-канал в Вебмастер. Скрипт последовательно отправляет запросы к API Яндекс.Турбо‑страниц и сообщает о результате загрузки RSS-канала.

Для работы со скриптом достаточно указать адрес сайта, ваш OAuth-токен и содержимое RSS-канала. Остальные данные (user-id, host-id и т. д.) скрипт получает автоматически.

Настройка сжатия

Чтобы отправить RSS-канал в сжатом виде, укажите заголовок Content-Encoding: gzip в функции upload_rss.

def upload_rss(upload_path, rss_data):
    headers = {
        'Content-Type': 'application/rss+xml',
        'Content-Encoding': 'gzip'

Настройка режима загрузки

Режим загрузки задается в функции get_rss_upload_path при объявлении переменной path. По умолчанию установлен режим отладки PRODUCTION.

def get_rss_upload_path(user_id, host_id):
    path = '/user/{user_id}/hosts/{host_id}/turbo/uploadAddress/?mode={mode}'.format(
        user_id=user_id, host_id=host_id, mode='PRODUCTION')

Для отладки Турбо‑страниц установите режим DEBUG.

def get_rss_upload_path(user_id, host_id):
    path = '/user/{user_id}/hosts/{host_id}/turbo/uploadAddress/?mode={mode}'.format(
        user_id=user_id, host_id=host_id, mode='DEBUG')

Использование скрипта

Чтобы загрузить RSS-канал, добавьте в скрипт собственные данные:

  • Адрес сайта, для которого необходимо загрузить RSS-канал.

  • OAuth-токен. Подробнее о получении токена в разделе Авторизация.

  • Содержимое RSS-канала.

    Для тестового запуска можно использовать пример RSS-канала.

    <?xml version="1.0" encoding="UTF-8"?>
    <rss version="2.0" xmlns:yandex="" xmlns:turbo="">
        <item turbo="true">
          <title>Заголовок страницы</title>
                <h1>Ресторан «Полезный завтрак»</h1>
                <h2>Вкусно и полезно</h2>
                  <img src="">
                  <a href="">Пункт меню 1</a>
                  <a href="">Пункт меню 2</a>
              <p>Как хорошо начать день? Вкусно и полезно позавтракать!</p>
              <p>Приходите к нам на завтрак. Фотографии наших блюд ищите <a href="#">на нашем сайте</a>.</p>
                <img src="">
                <figcaption>Омлет с травами</figcaption>
              <p>В нашем меню всегда есть свежие, вкусные и полезные блюда.</p>
              <p>Убедитесь в этом сами.</p>
              <button formaction="tel:+7(123)456-78-90"
                data-primary="true">Заказать столик</button>
              <div data-block="widget-feedback" data-stick="false">
                <div data-block="chat" data-type="whatsapp" data-url=""></div>
                <div data-block="chat" data-type="telegram" data-url=""></div>
                <div data-block="chat" data-type="vkontakte" data-url=""></div>
                <div data-block="chat" data-type="facebook" data-url=""></div>
                <div data-block="chat" data-type="viber" data-url=""></div>
              <p>Наш адрес: <a href="#">Nullam dolor massa, porta a nulla in, ultricies vehicula arcu.</a></p>
              <p>Фотографии —</p>
import json
import pprint
import time
from urlparse import urlparse

import requests
from requests import HTTPError

HOST_ADDRESS = 'Адрес вашего сайта. Например,'
OAUTH_TOKEN = 'Ваш OAuth-токен'
RSS_STRING = 'Содержимое RSS-канала'

    'Authorization': 'OAuth %s' % OAUTH_TOKEN

SESSION = requests.Session()


def validate_api_response(response, required_key_name=None):
    content_type = response.headers['Content-Type']
    content = json.loads(response.text) if 'application/json' in content_type else None

    if response.status_code == 200:
        if required_key_name and required_key_name not in content:
            raise HTTPError('Unexpected API response. Missing required key: %s' % required_key_name, response=response)
    elif content and 'error_message' in content:
        raise HTTPError('Error API response. Error message: %s' % content['error_message'], response=response)

    return content

def url_to_host_id(url):
    parsed_url = urlparse(url)

    scheme = parsed_url.scheme
    if not scheme:
        raise ValueError('No protocol (https or http) in url')

    if scheme != 'http' and scheme != 'https':
        raise ValueError('Illegal protocol: %s' % scheme)

    port = parsed_url.port
    if not port:
        port = 80 if scheme == 'http' else 443

    hostname = parsed_url.hostname
    hostname = hostname.encode('idna').rstrip('.').lower()

    return scheme + ':' + hostname + ':' + str(port)

def get_user_id():
    r = SESSION.get(API_URL + '/user/')
    c = validate_api_response(r, 'user_id')

    return c['user_id']

def get_user_host_ids(user_id):
    path = '/user/{user_id}/hosts'.format(user_id=user_id)
    r = SESSION.get(API_URL + path)
    c = validate_api_response(r, 'hosts')

    host_ids = [host_info['host_id'] for host_info in c['hosts']]

    return host_ids

def is_user_host_id(user_id, host_id):
    host_ids = get_user_host_ids(user_id)

    return host_id in host_ids

def get_rss_upload_path(user_id, host_id):
    path = '/user/{user_id}/hosts/{host_id}/turbo/uploadAddress/?mode={mode}'.format(
        user_id=user_id, host_id=host_id, mode='PRODUCTION')

    r = SESSION.get(API_URL + path)
    c = validate_api_response(r, 'upload_address')

    parsed_url = c['upload_address']

    return parsed_url

def upload_rss(upload_path, rss_data):
    headers = {
        'Content-Type': 'application/rss+xml'

    r =, data=rss_data, headers=headers)
    c = validate_api_response(r, 'task_id')

    return c['task_id']

def get_task_info(user_id, host_id, task_id):
    path = '/user/{user_id}/hosts/{host_id}/turbo/tasks/{task_id}'.format(
        user_id=user_id, host_id=host_id, task_id=task_id)

    r = SESSION.get(API_URL + path)
    c = validate_api_response(r)

    return c

def retry_call_until(func, predicate, max_tries=5, initial_delay=60, backoff=2):
    current_delay = initial_delay

    ret_val = None
    for n_try in xrange(0, max_tries + 1):
        ret_val = func()
        if predicate(ret_val):

        print 'Will retry. Sleeping for %ds' % current_delay
        current_delay *= backoff

    return ret_val

user_id = get_user_id()
host_id = url_to_host_id(HOST_ADDRESS)
upload_path = get_rss_upload_path(user_id, host_id)
task_id = upload_rss(upload_path, RSS_STRING)

print 'Waiting for the upload task to complete. This will take a while...'
task_info = retry_call_until(
    func=lambda: get_task_info(user_id, host_id, task_id),
    predicate=lambda task_info: task_info['load_status'] != 'PROCESSING')

print 'Task status: %s' % task_info['load_status']
task_info = get_task_info(user_id, host_id, task_id)
pp = pprint.PrettyPrinter(indent=4)
import json
import pprint
import time
from urllib.parse import urlparse as parse

import requests
from requests import HTTPError

HOST_ADDRESS = 'Адрес вашего сайта. Например,'
OAUTH_TOKEN = 'Ваш OAuth-токен'
RSS_STRING = 'Содержимое RSS-канала'

    'Authorization': 'OAuth %s' % OAUTH_TOKEN

SESSION = requests.Session()


def validate_api_response(response, required_key_name=None):
    content_type = response.headers['Content-Type']
    content = json.loads(response.text) if 'application/json' in content_type else None

    if response.status_code == 200:
        if required_key_name and required_key_name not in content:
            raise HTTPError('Unexpected API response. Missing required key: %s' % required_key_name, response=response)
    elif content and 'error_message' in content:
        raise HTTPError('Error API response. Error message: %s' % content['error_message'], response=response)

    return content

def url_to_host_id(url):
    parsed_url = parse(url)

    scheme = parsed_url.scheme
    if not scheme:
        raise ValueError('No protocol (https or http) in url')

    if scheme != 'http' and scheme != 'https':
        raise ValueError('Illegal protocol: %s' % scheme)

    port = parsed_url.port
    if not port:
        port = 80 if scheme == 'http' else 443

    hostname = parsed_url.hostname

    return scheme + ':' + hostname + ':' + str(port)

def get_user_id():
    r = SESSION.get(API_URL + '/user/')
    c = validate_api_response(r, 'user_id')

    return c['user_id']

def get_user_host_ids(user_id):
    path = '/user/{user_id}/hosts'.format(user_id=user_id)
    r = SESSION.get(API_URL + path)
    c = validate_api_response(r, 'hosts')

    host_ids = [host_info['host_id'] for host_info in c['hosts']]

    return host_ids

def is_user_host_id(user_id, host_id):
    host_ids = get_user_host_ids(user_id)

    return host_id in host_ids

def get_rss_upload_path(user_id, host_id):
    path = '/user/{user_id}/hosts/{host_id}/turbo/uploadAddress/?mode={mode}'.format(
        user_id=user_id, host_id=host_id, mode='DEBUG')

    r = SESSION.get(API_URL + path)
    c = validate_api_response(r, 'upload_address')

    parsed_url = c['upload_address']

    return parsed_url

def upload_rss(upload_path, rss_data):
    headers = {
        'Content-Type': 'application/rss+xml'

    c = {'task_id': None}

        r =, data=rss_data.encode('utf-8'), headers=headers)
        c = validate_api_response(r, 'task_id')

        # если ответ успешен, исключения задействованы не будут
    except HTTPError as http_err:
        print(f'HTTP error occurred: {http_err}')  # Python 3.6
    except Exception as err:
        print(f'Other error occurred: {err}')  # Python 3.6

    return c['task_id']

def get_task_info(user_id, host_id, task_id):
    path = '/user/{user_id}/hosts/{host_id}/turbo/tasks/{task_id}'.format(
        user_id=user_id, host_id=host_id, task_id=task_id)

    r = SESSION.get(API_URL + path)
    c = validate_api_response(r)

    return c

def retry_call_until(func, predicate, max_tries=5, initial_delay=60, backoff=2):
    current_delay = initial_delay

    ret_val = None
    for n_try in range(0, max_tries + 1):
        ret_val = func()
        if predicate(ret_val):

        print('Will retry. Sleeping for %ds', current_delay)
        current_delay *= backoff

    return ret_val

def main():
    user_id = get_user_id()
    host_id = url_to_host_id(HOST_ADDRESS)
    upload_path = get_rss_upload_path(user_id, host_id)
    task_id = upload_rss(upload_path, RSS_STRING)

    print('Waiting for the upload task to complete. This will take a while...')
    task_info = retry_call_until(
        func=lambda: get_task_info(user_id, host_id, task_id),
        predicate=lambda task_info: task_info['load_status'] != 'PROCESSING')

    print('Task status: %s', task_info['load_status'])
    task_info = get_task_info(user_id, host_id, task_id)
    pp = pprint.PrettyPrinter(indent=4)
