Программирование иногда не даёт нормально спать.

Придёт в голову какая-нибудь идея, присмотришься к ней — вроде простая и красивая. Потом начинаешь прикидывать подробности, и вроде тоже всё складывается. А потом садишься писать код... Тут-то и получается, что в реальности есть ещё и другие подробности, которые мозг при умозрительном разглядывании удобно опускал. Компилятор же, не обладающей такой моральной гибкостью, упрямо твердит, что нет, оно так не работает. И так не работает. И вот так тоже.

И вот теперь у вас Проблема. Вылезать из головы этот код отказывается напрочь. И вы, засыпая, с регулярностью раз в 20 минут осознаёте, что вместо того, чтобы засыпать, опять пытаетесь писать в уме код, и голова от этого болит всё больше и больше. И с утра за завтраком, в душе, по дороге на работу, на работе — всё то же самое. И так, пока либо оно не решится, либо вы не поймёте, что это принципиально невозможно.

У меня так было на прошедших выходных. Увидел комментарий от Yuppy про использование yield для асинхронных вызовов, вспомнил, что когда-то где-то уже видел такую штуку (может как раз в том же Twisted), и на свою голову решил прикинуть в подробностях, как оно работает. Понятно, что можно просто пойти посмотреть в код Twisted и в код Yuppy (что я, в итоге, тоже делал), но хотелось именно построить всё руками, чтобы лучше осознать.

В итоге, промучавшись несколько дней и три раза всё переписав, я это осилил. И теперь у меня два положительных результата: подробное описание принципа работы и небольшая библиотечка, которую можно использовать для таких вещей, независимо от каких-либо фреймворков (ссылка "скачать" в конце).

Чего хочется

В общем и целом, хочется писать асинхронный код без callback'ов. Потому что они плохо читаются, потому что выстраиваются в цепочки в неестественном порядке, потому что заставляют передавать параметры через глобальные значения и т.д. Хочется же, чтобы было как-то так:

def get_url(url):
    # ... асинхронный запрос по HTTP

def process_url(url):
    result = yield get_url(url) # здесь process_url теряет управление
    print result                # сюда управление возвращается после завершения
                                # асинхронного вызова

process_url('http://...')
print 'This line runs before get_url completes'

Есть функция get_url, которая делает асинхронный запрос по HTTP. Это значит, что при её вызове она не ждёт завершения запроса, а возвращает управление вызвавшему сразу. А функция process_url, которая вызывает get_url, хочет получить управление тогда, когда асинхронный вызов завершился, не раньше. Принципиальную возможность так сделать даёт yield, который для функции process_data работает как return — она возвращает значение куда-то во Внешний Эфир. А начиная с версии Питона 2.5 этот Внешний Эфир может вернуть какое-то значение обратно внутрь process_data. Соответственно наша задача состоит в том, чтобы этот Внешний Эфир реализовать так, чтобы:

Начал я как раз с последнего. Поскольку get_url возвращается быстрее, чем делает собственную работу, единственный способ для неё что-то сообщить внешнему миру — это принимать callback, который потом вызывать. Чтобы говорить конкретней, давайте эту функцию реально напишем, сделав асинхронность например с помощью треда (хотя это не принципиально):

def get_url(url, callback):
    def read():
        s = urllib2.urlopen(url).read()
        callback(s)
    threading.Thread(target=read).start()

Однако, когда мы зовём её из process_url, та не может (да и не хочет) передать ей никакого явного callback'а — у ней его нет. Следовательно нам нужно как-то задекорировать get_url. Я назвал этот декоратор "async":

def async(func):
    def wrapper(*args, **kwargs):
        def caller(callback):
            return func(callback=callback, *args, **kwargs)
        return caller
    return wrapper

Он подменяет клиентскую функцию на свой wrapper, который не вызывает функцию сразу. Вместо этого он делает её ленивой: отдаёт наружу caller, который сможет позже вызвать func, передав ей callback. Работать это должно так:

  1. Функция process_url вызывает yield get_url(url), где get_url задекорирована в async.
  2. Значение под yield — это caller, который знает, как вызвать get_url(url).
  3. Через yield этот объект попадает во Внешний Эфир, который тут же вызывает caller, передавая ему какой-то конкретный callback.

Здесь уже должно быть понятно, что callback Внешнего Эфира должен возвращать управление обратно в генератор process_data, пропихивая ему то значение, которое получит по завершению асинхронного вызова. Внешний эфир реализуется небольшим объектом и навешивается на функцию process_data опять же декоратором с названием "process":

class CallbackDispatcher(object):
    def __init__(self, generator):
        self.g = generator
        caller = self.g.next()
        caller(callback=self.callback)

    def callback(self, arg):
        try:
            caller = self.g.send(arg)
            caller(callback=self.callback)
        except StopIteration:
            pass

def process(func):
    def wrapper(*args, **kwargs):
        CallbackDispatcher(func(*args, **kwargs))
    return wrapper

Последовательность работы:

  1. Декоратор process нужен только для того, чтобы вызвать клиентскую функцию, и передать полученный генератор в CallbackDispatcher.
  2. CallbackDispatcher прямо в конструкторе запускает генератор через self.g.next()
  3. В process_data при этом выполняется кусок кода до первого yield, и через него возвращается значение — caller для get_url.
  4. Этот caller приезжает в CallbackDispatcher в качестве возвратного значения next'а.
  5. caller тут же вызывается с callback'ом — методом самого CallbackDispatcher.
  6. На это моменте заканчивается конструктор диспетчера, а следовательно и работа декоратора process — соответственно программа тут же продолжается дальше на print 'This line runs ...'.
  7. Тем временем где-то в get_url происходит асинхронный HTTP-вызов, заканчивается, и она вызывает CallbackDispatcher.callback с каким-то результатом своей работы.
  8. CallbackDispatcher.callback пропихивает результат обратно в генератор process_data, тем самым запуская его следующий кусок.

И вот, собственно, теперь у нас есть базовая функциональность:

Мне ещё не очень нравится название "process", но лучше я пока не придумал.

Бонус: мультивызовы

Конечно я слукавил, когда сказал, что это всё было чисто теоретическим упражнением. На самом деле, у нас тут в Яндексе есть вполне конкретный юзкейс: небольшой HTTP-прокси, написанный на Tornado, который асинхронно обрабатывает входящие запросы и асинхронно же делает свои. Код там очень трудно читается из-за явных callback'ов, и всё это придумывалось для его упрощения.

Так вот там есть задачка, в которой надо запустить несколько асинхронных функций одновременно, дождаться их всех и список всех результатов обработать целиком (map-reduce, ага). Сейчас оно написано примерно так:

for item in items:
    async_fetch(item, callback)
results = []

def callback(item, result):
    items.remove(item)
    results.append(result)
    if not items:
        process_results(results)

Вместо такого вороха обслуживающего кода хочется иметь что-то такое:

results = yield map(async_fetch, items)
# обработка results

Чтобы так было, надо всего лишь научить Внешний Эфир (CallbackDispatcher) таким вещам:

Выглядит это вот так (__init__ опущен для краткости):

class CallbackDispatcher(object):
    def call(self, callers):

        # приведение параметра к последовательности
        single = not hasattr(callers, '__iter__')
        if single:
            callers = [callers]

        # запуск перенумерованных caller'ов с прилеплением
        # к callback'у номера задачи и признака одинокого результата
        for count, caller in enumerate(callers):
            caller(callback=partial(self.callback, count, single))

        # создание места под результаты
        self.call_count = count + 1
        self.results = [None] * self.call_count

    def callback(self, index, single, arg):

        # сохранение результата
        self.call_count -= 1
        self.results[index] = arg

        if self.call_count > 0: # возврат, если есть ещё кого ждать
            return

        # отсылка результата в генератор в виде списка или одного значения
        try:
            result = self.results[0] if single else self.results
            self.call(self.g.send(result))
        except StopIteration:
            pass

В примерах работает :-)

Код

Комментарии: 39

  1. Виктор Коцеруба

    не взлетел

    $ python example.py
    Trace/BPT trap
    $ python2.6 example.py
    process_url exited
    Trace/BPT trap
    
  2. Ivan Sagalaev

    Странно... Может в OS X Питон как-то так скомпилирован, что не ждёт выполнения тредов при завершении главного? Добавь time.sleep(5) в конце.

  3. pyobject.ru/blog

    А функция process_url, которая вызывает get_url, хочет получить управление тогда, когда асинхронный вызов завершился, не раньше.

    Это называется "асинхронный кусок в синхронном мире" :-) И приведенный тобой в тексте юскейс подтверждает это :-)

  4. Виктор Коцеруба

    sleep ничего не изменил

  5. pyobject.ru/blog

    Да, кстати, "Внешний Эфир" традиционно называется Реактор, так что вместо @process лучше было бы @reactor :-)

  6. Andrey Popp

    Очень настоятельно рекомендую посмотреть в сторону eventlet.

  7. Ivan Sagalaev

    Да, кстати, "Внешний Эфир" традиционно называется Реактор

    Нет, это не Реактор. Мой Внешний Эфир — это прослойка между юзерским кодом и каким-то внешним реактором. Она только делает возможным быстро вернуть управление в него.

  8. Ivan Sagalaev

    Андрей, про eventlet я знаю, да.

  9. xcat

    Спасибо Иван, очень интересно. Я тоже мучился с примером Yuppy.
    У меня сработало. python 2.5 freebsd 6.2.

    В twisted можно использовать DeferredList для запуска паралельных операций,но общий калбэк надо писать. И вариант с yield красивее.

  10. Стромнов Андрей

    Сразу вспомнилась концепция из cogen (http://cogen.googlecode.com/svn/tags/0.2.1/docs/build/intro/introduction.html).

  11. Роман Ворушин

    @Иван, пример работает под Python 2.5.4 и Python 2.6.4 в Mac OS X 10.5.8. Мой коллега Дима Смолин, когда я рассказал ему про твою статью "Надо все переписать" ответил, что вместо того чтобы переходить на Closure или node.js, кто-нибудь напишет парочку декораторов и асинхронный код можно будет писать на питоне :))))

  12. Ivan Sagalaev

    вместо того чтобы переходить на Closure или node.js, кто-нибудь напишет парочку декораторов и асинхронный код можно будет писать на питоне :))))

    Проблема, к сожалению, не только в callback'ах. Питон императивен, и у него есть GIL. Это значит, что ничего, кроме IO, эффективно параллелить нельзя. Сейчас нам этого хватает, но завтра хватать перестанет.

  13. Стромнов Андрей

    Проверил на MacOSX 10.6.2:
    - падает на системном питоне 2.6.1
    - падает на макпортовом питоне 2.5.4
    - выполняется на макпортовом питоне 2.6.4

    Падает на PyThread_acquire_lock().

  14. Deepwalker

    GIL тормозит только треды, а twisted и иже с ним не предполагают тредов. Если вам надо будет занять 8 ядер вашего мегасервера вы просто запустите 8 процессов.
    Но вы, как я понимаю, сразу смотрите в будущее, в котором будет 128 ядер. Вот только я думаю и оперативной памяти там тоже будет в достатке, а потому метод все также продолжит работать.
    Поэтому мое мнение - не о чем беспокоиться, тем более, что не CPython-ом единым. Где то там на горизонте маячит Unladen Swallow, да и PyPy имеет некоторые успехи. И никто не утверждает, что не будет чего то еще.
    Так может быть если и имеет смысл что-то переписывать, то не надо обязательно искать новый язык?

  15. Kuroki Kaze

    Принципиальную возможность так сделать даёт yield, который для функции process_data работает как return — она возвращает значение куда-то во Внешний Эфир. А начиная с версии Питона 2.5 этот Внешний Эфир может вернуть какое-то значение обратно внутрь process_data.

    Ваш Python кажется мне всё страньше и страньше O_o.

    Проблема "постоянного программирования" есть конечно. Решаю я её довольно просто - нужно отвлечься на что нибудь другое, но такое, что действительно интересно. У меня это комп. игры и фехтование. Тогда всё становится нормально.

  16. Вячеслав

    А я вот одного не понял, зачем вот здесь:
    caller = self.g.send(arg)
    caller(callback=self.callback)
    после caller = self.g.send(arg) опять вызывается caller(callback=self.callback)?

  17. Ivan Sagalaev

    Deepwalker:

    Если вам надо будет занять 8 ядер вашего мегасервера вы просто запустите 8 процессов.

    Это не выход. Я пытался обосновать это целой статьёй. Понимаю, что мог не выразить свои мысли достаточно понятно, но понятней у меня пока не получается.

    А я вот одного не понял, зачем вот здесь:
    caller = self.g.send(arg)
    caller(callback=self.callback)
    после caller = self.g.send(arg) опять вызывается caller(callback=self.callback)?

    В первой строчке caller не вызывается. В это время генератор висит на каком-то yield, ожидая просыпания. Первая строка будит его, посылая ему arg. Разбуженный генератор выполняет кусок кода до следующего yield и к нам из него возвращается новый caller, который мы должны вызвать дальше.

  18. Вячеслав

    Ну так здесь только 1 yield:

    def process_url(url):
        result = yield get_url(url)
        print result
    

    Функция get_url вызывается уже на этапе инициализации класса:

    caller = self.g.next()
    caller(callback=self.callback)
    

    Здесь мы уже имеем ответ от этой функции

    def callback(self, arg):
        try:
            self.g.send(arg)
    

    Вот этот самый self.g.send(arg) пропихивает результат в def process_url(url):
    Там он печатается и программа заканчивается. Где тут еще 1 yield? Не могу понять, объясните пожалуйста.
    Кстати, если убрать вызов после caller = self.g.send(arg)
    ничего не меняется (то есть, как я думаю, исполнение туда и не доходит).

  19. Ivan Sagalaev

    Ну так здесь только 1 yield:

    def process_url(url):
        result = yield get_url(url)
        print result
    

    А... Ну это пример только. Здесь всё вот так происходит:

    1. Мы получаем caller'а из первого же next в конструкторе, генератор остаётся висеть на yield
    2. callback вызывает send(arg)
    3. Отрабатывает кусок кода после yield
    4. А дальше генератор заканчивается и в вызвавшем его callback от этого возникает исключение StopIteration, и до нового caller'а дело не доходит.

    Этот send нормально работает в случаях, когда у генератора несколько yield'ов:

    def process_url(url):
        result = yield get_url(url)
        new_url = parse(result)
        new_result = yield get_url(url)
        print new_result
    

    Вот тут после первого send(arg) не будет исключения, а выполнится parse(result) и генератор остановится на следующем yield, вернёт новый caller, и его надо будет снова звать.

  20. Вячеслав

    А, все, теперь понятно, здесь как бы рекурсия, чтобы обработать любое количество yield в process.

    Спасибо!

  21. evilkost

    helloworld с торнадой:
    http://dumpz.org/15078/

    Пока не понятно что делать с ошибками (.

  22. Ivan Sagalaev

    Хо! Быстро ты :-). А в чём "# Broken here" заключается?

    Кстати, можно обойтись без своей do_get:

    --- hw.old.py   2009-12-11 23:01:01.965425578 +0300
    +++ hw.new.py   2009-12-11 23:00:33.101425824 +0300
    @@ -15,12 +15,8 @@
     class MainHandler(tornado.web.RequestHandler):
         @process
         def perform_response(self, url):
    -        @async
    -        def do_get(callback):
    -            http = tornado.httpclient.AsyncHTTPClient()
    -            http.fetch(url, callback=lambda response: callback(response))
    -
    -        response = yield do_get()
    +        http = tornado.httpclient.AsyncHTTPClient()
    +        response = yield async(http.fetch)(url)
    
             if response.error:
                 # Broken here
    

    По счастью, параметр в http.fetch тоже называется "callback".

  23. Ivan Sagalaev

    Я понял проблему с ошибкой: http://dumpz.org/15084/

    • Чтобы починить 500 ошибки пришлось таки написать свой fetch, который заворачивает callback в торнадовский async_callback
    • Зато я избавился от process_response, потому что сам get можно просто завернуть в @process
  24. evilkost

    Более менее разобрался.Обработать можно примерно так:

    if response.error:
        if self._headers_written:
            log.error("Exception after headers written")
        else:
            self._handle_request_exception(response.error)
    

    В самой торнаде для этого введен специальный метод:

    If you make calls to asynchronous library functions that require a callback (like the HTTP fetch function above), you should always wrap your callbacks with self.async_callback. This simple wrapper ensures that if your callback function raises an exception or has a programming error, a proper HTTP error response will be sent to the browser, and the connection will be properly closed.

    Сначала я про него забыл, потом стало понятно, что тут он не поможет. Отуда и взял это кусочек кода.

    helloworld v2: http://dumpz.org/15085/

  25. evilkost

    Упс, пока писал ответ, ты уже решил и лучше ).

  26. Роман Ворушин

    Мне ещё не очень нравится название "process", но лучше я пока не придумал.

    А что если назвать его @coroutine?

  27. Ivan Sagalaev

    А что если назвать его @coroutine?

    Это имя уже занял David Beazley под генераторы, которым сразу делается один next().

  28. Роман Ворушин

    генераторы, которым сразу делается один next().

    Ого, спасибо за наводку!
    David Beazley большой молодец, полвечера разбирался с его замечательными примерами coroutines.

  29. Alik Kirillovich

    Согласен, что асинхронное программирование будут важнейшей темой наступающего десятилетия.

    Однако, в клиентском JavaScript проблемы асинхронного программирования являются критичными уже сейчас. Ведь большинство клиентского кода на JS — это и есть обработчики асинхронных Ajax-запросов.

    Одной из задач, действительно, является написание асинхронного кода без callback'ов. В JavaScript, как и в вашем примере на Питоне, это возможно с использованием генераторов и оператора yield.

    Генераторы доступны начиная с JavaScript 1.7.

    Скажем, ваш пример на Питоне, можно реализовать на JavaScript с использованием библиотеки Er.js таким образом:

    <script type="application/javascript;version=1.7"/ src="er.js"></script>
    <script type="application/javascript;version=1.7"/>
    function process_url ()
      {
      //Делаем Ajax запрос и, до его завершения, отдаем управление наружу
      var result = yield Er.Ajax.get("http://download.alik.su");
      //После выполнения запроса, перехватываем управление и показываем окошко с результатом
      alert (result);
      }
    
    //Оборачиваем и асинхронно запускаем process_url
    Er.spawn (process_url);
    //Это сообщение будет показано до завершения process_url
    alert ("his line runs before get_url completes");
    </script>
    
  30. http://maxidoors.ru/

    Ваня, погляди на fiber-ы в руби и попробуй лучше допилить питон, что бы он так же умел. Это будет в тысячу раз проще, честно =)

  31. Andrey Popp

    Max Lapshin, чем они отличаются от генераторов в Python, кроме того, что можно
    передавать управление другим fiber-ам? В python это делает немногим сложнее:

    for x in subcoroutine()
        yield x
    

    Ну ещё и greenlet'ы есть отдельным модулем.

    Вообщем разницы никакой...

  32. Александр Чепиков

    В Twisted после Python 2.5 рекомендуется использовать генераторы, идея такая:

    @defer.inlineCallbacks
    def someFunction():
    a = 1
    b = yield deferredReturningFunction(a)
    c = yield anotherDeferredReturningFunction(a, b)
    defer.returnValue(c)

  33. david-m.livejournal.com

    Алик Кириллович: В JS куда проще и нативнее сделать библиотечку для chain-обработки типа:

    perform(
        function() { some action #1 }
    ).then(
        function() { some action #2, depends on #1 },
        function() { some action #3, depends on #1 }
    ).then(
        function() { some action #4, depends on ##2,3 }
    )
    alert ("his line runs before chain completes");
    

    И императивно, и синтаксически привычно, и точно понятно, что за чем и в какой момент.

  34. yuppy99.ya.ru

    Не ожидал что мой комментарий лишит тебя сна :) Хотя такое применение yield мне тоже когда-то взрывало мозг, и я смог разобраться как оно должно работать, только написав свой велосипед. Надеюсь, благодаря тебе и David'у Beazley, эта тема станет понятнее и программисты смогут проводить ночи более продуктивно :)

    Не могу не написать о таких недочетах в твоих примерах:
    - async-функция, являющаяся process'ом не может возвращать значения
    - нет обработки исключений, их желательно из callback через throw передавать в генераторы
    - режет глаз добавление параметров функции с помощью декоратора. Из-за этого при чтении кода сложно понять откуда берется callback. Работа с Deferred в twisted или Operation в cogen, как мне кажется, прозрачнее.

  35. Ivan Sagalaev

    async-функция, являющаяся process'ом не может возвращать значения

    Почему? Любая async-функция, она "возвращает" значение, вызывая callback, неважно завёрнута она при этом в process или нет. С точки зрения вызывающего это выглядит как возврат значения.

    Работа с Deferred в twisted или Operation в cogen, как мне кажется, прозрачнее

    Мне хотелось сделать код, не привязанный ни к какому новому типу объекта. Я посчитал соглашение о неявно передаваемом параметре меньшим злом.

  36. Роман

    Извините, если не в тему. Как Вы относитесь к подходу Flow.js и Step.js к упрощению работы с callbackами?

  37. Ivan Sagalaev

    Никогда на них не смотрел, честно говоря.

  38. vs@llc.ac

    ЧЯДНТ! Callback не принимает логические значения (True и False). В этом случае все валится:

    File "/home/satchitananda/venv/openite/local/lib/python2.7/site-packages/tornado/web.py", line 1074, in wrapper
        return method(self, *args, **kwargs)
      File "adisp_test.py", line 65, in post
        self.perform_response(url)
      File "/home/satchitananda/Ubuntu One/Atomic Creative/Projects/openite_cms/core/adisp.py", line 130, in wrapper
        CallbackDispatcher(func(*args, **kwargs))
      File "/home/satchitananda/Ubuntu One/Atomic Creative/Projects/openite_cms/core/adisp.py", line 98, in __init__
        self.call(self.g.next())
      File "/home/satchitananda/Ubuntu One/Atomic Creative/Projects/openite_cms/core/adisp.py", line 119, in call
        caller(callback=partial(self.callback, results, count, single))
      File "/home/satchitananda/Ubuntu One/Atomic Creative/Projects/openite_cms/core/adisp.py", line 137, in caller
        return func(*args, **kwargs)
      File "adisp_test.py", line 19, in do_get
        callback(True)
      File "/home/satchitananda/Ubuntu One/Atomic Creative/Projects/openite_cms/core/adisp.py", line 126, in callback
        self._send_result(results, single)
      File "/home/satchitananda/Ubuntu One/Atomic Creative/Projects/openite_cms/core/adisp.py", line 105, in _send_result
        self.call(self.g.send(result))
      File "adisp_test.py", line 33, in perform_response
        self.write(response)
      File "/home/satchitananda/venv/openite/local/lib/python2.7/site-packages/tornado/web.py", line 469, in write
        chunk = utf8(chunk)
      File "/home/satchitananda/venv/openite/local/lib/python2.7/site-packages/tornado/escape.py", line 161, in utf8
        assert isinstance(value, unicode)
    AssertionError
    
  39. Ivan Sagalaev

    Очень трудно без собственно кода примера. По одному трейсбеку непонятно, что там происходит.

Добавить комментарий