Программирование иногда не даёт нормально спать.

Придёт в голову какая-нибудь идея, присмотришься к ней — вроде простая и красивая. Потом начинаешь прикидывать подробности, и вроде тоже всё складывается. А потом садишься писать код... Тут-то и получается, что в реальности есть ещё и другие подробности, которые мозг при умозрительном разглядывании удобно опускал. Компилятор же, не обладающей такой моральной гибкостью, упрямо твердит, что нет, оно так не работает. И так не работает. И вот так тоже.

И вот теперь у вас Проблема. Вылезать из головы этот код отказывается напрочь. И вы, засыпая, с регулярностью раз в 20 минут осознаёте, что вместо того, чтобы засыпать, опять пытаетесь писать в уме код, и голова от этого болит всё больше и больше. И с утра за завтраком, в душе, по дороге на работу, на работе — всё то же самое. И так, пока либо оно не решится, либо вы не поймёте, что это принципиально невозможно.

У меня так было на прошедших выходных. Увидел комментарий от Yuppy про использование yield для асинхронных вызовов, вспомнил, что когда-то где-то уже видел такую штуку (может как раз в том же Twisted), и на свою голову решил прикинуть в подробностях, как оно работает. Понятно, что можно просто пойти посмотреть в код Twisted и в код Yuppy (что я, в итоге, тоже делал), но хотелось именно построить всё руками, чтобы лучше осознать.

В итоге, промучавшись несколько дней и три раза всё переписав, я это осилил. И теперь у меня два положительных результата: подробное описание принципа работы и небольшая библиотечка, которую можно использовать для таких вещей, независимо от каких-либо фреймворков (ссылка "скачать" в конце).

Чего хочется

В общем и целом, хочется писать асинхронный код без callback'ов. Потому что они плохо читаются, потому что выстраиваются в цепочки в неестественном порядке, потому что заставляют передавать параметры через глобальные значения и т.д. Хочется же, чтобы было как-то так:

def get_url(url):
    # ... асинхронный запрос по HTTP

def process_url(url):
    result = yield get_url(url) # здесь process_url теряет управление
    print result                # сюда управление возвращается после завершения
                                # асинхронного вызова

process_url('http://...')
print 'This line runs before get_url completes'

Есть функция get_url, которая делает асинхронный запрос по HTTP. Это значит, что при её вызове она не ждёт завершения запроса, а возвращает управление вызвавшему сразу. А функция process_url, которая вызывает get_url, хочет получить управление тогда, когда асинхронный вызов завершился, не раньше. Принципиальную возможность так сделать даёт yield, который для функции process_data работает как return — она возвращает значение куда-то во Внешний Эфир. А начиная с версии Питона 2.5 этот Внешний Эфир может вернуть какое-то значение обратно внутрь process_data. Соответственно наша задача состоит в том, чтобы этот Внешний Эфир реализовать так, чтобы:

Начал я как раз с последнего. Поскольку get_url возвращается быстрее, чем делает собственную работу, единственный способ для неё что-то сообщить внешнему миру — это принимать callback, который потом вызывать. Чтобы говорить конкретней, давайте эту функцию реально напишем, сделав асинхронность например с помощью треда (хотя это не принципиально):

def get_url(url, callback):
    def read():
        s = urllib2.urlopen(url).read()
        callback(s)
    threading.Thread(target=read).start()

Однако, когда мы зовём её из process_url, та не может (да и не хочет) передать ей никакого явного callback'а — у ней его нет. Следовательно нам нужно как-то задекорировать get_url. Я назвал этот декоратор "async":

def async(func):
    def wrapper(*args, **kwargs):
        def caller(callback):
            return func(callback=callback, *args, **kwargs)
        return caller
    return wrapper

Он подменяет клиентскую функцию на свой wrapper, который не вызывает функцию сразу. Вместо этого он делает её ленивой: отдаёт наружу caller, который сможет позже вызвать func, передав ей callback. Работать это должно так:

  1. Функция process_url вызывает yield get_url(url), где get_url задекорирована в async.
  2. Значение под yield — это caller, который знает, как вызвать get_url(url).
  3. Через yield этот объект попадает во Внешний Эфир, который тут же вызывает caller, передавая ему какой-то конкретный callback.

Здесь уже должно быть понятно, что callback Внешнего Эфира должен возвращать управление обратно в генератор process_data, пропихивая ему то значение, которое получит по завершению асинхронного вызова. Внешний эфир реализуется небольшим объектом и навешивается на функцию process_data опять же декоратором с названием "process":

class CallbackDispatcher(object):
    def __init__(self, generator):
        self.g = generator
        caller = self.g.next()
        caller(callback=self.callback)

    def callback(self, arg):
        try:
            caller = self.g.send(arg)
            caller(callback=self.callback)
        except StopIteration:
            pass

def process(func):
    def wrapper(*args, **kwargs):
        CallbackDispatcher(func(*args, **kwargs))
    return wrapper

Последовательность работы:

  1. Декоратор process нужен только для того, чтобы вызвать клиентскую функцию, и передать полученный генератор в CallbackDispatcher.
  2. CallbackDispatcher прямо в конструкторе запускает генератор через self.g.next()
  3. В process_data при этом выполняется кусок кода до первого yield, и через него возвращается значение — caller для get_url.
  4. Этот caller приезжает в CallbackDispatcher в качестве возвратного значения next'а.
  5. caller тут же вызывается с callback'ом — методом самого CallbackDispatcher.
  6. На это моменте заканчивается конструктор диспетчера, а следовательно и работа декоратора process — соответственно программа тут же продолжается дальше на print 'This line runs ...'.
  7. Тем временем где-то в get_url происходит асинхронный HTTP-вызов, заканчивается, и она вызывает CallbackDispatcher.callback с каким-то результатом своей работы.
  8. CallbackDispatcher.callback пропихивает результат обратно в генератор process_data, тем самым запуская его следующий кусок.

И вот, собственно, теперь у нас есть базовая функциональность:

Мне ещё не очень нравится название "process", но лучше я пока не придумал.

Бонус: мультивызовы

Конечно я слукавил, когда сказал, что это всё было чисто теоретическим упражнением. На самом деле, у нас тут в Яндексе есть вполне конкретный юзкейс: небольшой HTTP-прокси, написанный на Tornado, который асинхронно обрабатывает входящие запросы и асинхронно же делает свои. Код там очень трудно читается из-за явных callback'ов, и всё это придумывалось для его упрощения.

Так вот там есть задачка, в которой надо запустить несколько асинхронных функций одновременно, дождаться их всех и список всех результатов обработать целиком (map-reduce, ага). Сейчас оно написано примерно так:

for item in items:
    async_fetch(item, callback)
results = []

def callback(item, result):
    items.remove(item)
    results.append(result)
    if not items:
        process_results(results)

Вместо такого вороха обслуживающего кода хочется иметь что-то такое:

results = yield map(async_fetch, items)
# обработка results

Чтобы так было, надо всего лишь научить Внешний Эфир (CallbackDispatcher) таким вещам:

Выглядит это вот так (__init__ опущен для краткости):

class CallbackDispatcher(object):
    def call(self, callers):

        # приведение параметра к последовательности
        single = not hasattr(callers, '__iter__')
        if single:
            callers = [callers]

        # запуск перенумерованных caller'ов с прилеплением
        # к callback'у номера задачи и признака одинокого результата
        for count, caller in enumerate(callers):
            caller(callback=partial(self.callback, count, single))

        # создание места под результаты
        self.call_count = count + 1
        self.results = [None] * self.call_count

    def callback(self, index, single, arg):

        # сохранение результата
        self.call_count -= 1
        self.results[index] = arg

        if self.call_count > 0: # возврат, если есть ещё кого ждать
            return

        # отсылка результата в генератор в виде списка или одного значения
        try:
            result = self.results[0] if single else self.results
            self.call(self.g.send(result))
        except StopIteration:
            pass

В примерах работает :-)

Код

Комментарии: 37 (feed)

  1. не взлетел

    $ python example.py
    Trace/BPT trap
    $ python2.6 example.py
    process_url exited
    Trace/BPT trap
    
  2. Странно... Может в OS X Питон как-то так скомпилирован, что не ждёт выполнения тредов при завершении главного? Добавь time.sleep(5) в конце.

  3. А функция process_url, которая вызывает get_url, хочет получить управление тогда, когда асинхронный вызов завершился, не раньше.

    Это называется "асинхронный кусок в синхронном мире" :-) И приведенный тобой в тексте юскейс подтверждает это :-)

  4. sleep ничего не изменил

  5. Да, кстати, "Внешний Эфир" традиционно называется Реактор, так что вместо @process лучше было бы @reactor :-)

  6. Andrey Popp

    Очень настоятельно рекомендую посмотреть в сторону eventlet.

  7. Да, кстати, "Внешний Эфир" традиционно называется Реактор

    Нет, это не Реактор. Мой Внешний Эфир — это прослойка между юзерским кодом и каким-то внешним реактором. Она только делает возможным быстро вернуть управление в него.

  8. Андрей, про eventlet я знаю, да.

  9. Спасибо Иван, очень интересно. Я тоже мучился с примером Yuppy.
    У меня сработало. python 2.5 freebsd 6.2.

    В twisted можно использовать DeferredList для запуска паралельных операций,но общий калбэк надо писать. И вариант с yield красивее.

  10. Стромнов Андрей

    Сразу вспомнилась концепция из cogen (http://cogen.googlecode.com/svn/tags/0.2.1/docs/build/intro/introduction.html).

  11. @Иван, пример работает под Python 2.5.4 и Python 2.6.4 в Mac OS X 10.5.8. Мой коллега Дима Смолин, когда я рассказал ему про твою статью "Надо все переписать" ответил, что вместо того чтобы переходить на Closure или node.js, кто-нибудь напишет парочку декораторов и асинхронный код можно будет писать на питоне :))))

  12. вместо того чтобы переходить на Closure или node.js, кто-нибудь напишет парочку декораторов и асинхронный код можно будет писать на питоне :))))

    Проблема, к сожалению, не только в callback'ах. Питон императивен, и у него есть GIL. Это значит, что ничего, кроме IO, эффективно параллелить нельзя. Сейчас нам этого хватает, но завтра хватать перестанет.

  13. Стромнов Андрей

    Проверил на MacOSX 10.6.2:
    - падает на системном питоне 2.6.1
    - падает на макпортовом питоне 2.5.4
    - выполняется на макпортовом питоне 2.6.4

    Падает на PyThread_acquire_lock().

  14. GIL тормозит только треды, а twisted и иже с ним не предполагают тредов. Если вам надо будет занять 8 ядер вашего мегасервера вы просто запустите 8 процессов.
    Но вы, как я понимаю, сразу смотрите в будущее, в котором будет 128 ядер. Вот только я думаю и оперативной памяти там тоже будет в достатке, а потому метод все также продолжит работать.
    Поэтому мое мнение - не о чем беспокоиться, тем более, что не CPython-ом единым. Где то там на горизонте маячит Unladen Swallow, да и PyPy имеет некоторые успехи. И никто не утверждает, что не будет чего то еще.
    Так может быть если и имеет смысл что-то переписывать, то не надо обязательно искать новый язык?

  15. Принципиальную возможность так сделать даёт yield, который для функции process_data работает как return — она возвращает значение куда-то во Внешний Эфир. А начиная с версии Питона 2.5 этот Внешний Эфир может вернуть какое-то значение обратно внутрь process_data.

    Ваш Python кажется мне всё страньше и страньше O_o.

    Проблема "постоянного программирования" есть конечно. Решаю я её довольно просто - нужно отвлечься на что нибудь другое, но такое, что действительно интересно. У меня это комп. игры и фехтование. Тогда всё становится нормально.

  16. Вячеслав

    А я вот одного не понял, зачем вот здесь:
    caller = self.g.send(arg)
    caller(callback=self.callback)
    после caller = self.g.send(arg) опять вызывается caller(callback=self.callback)?

  17. Deepwalker:

    Если вам надо будет занять 8 ядер вашего мегасервера вы просто запустите 8 процессов.

    Это не выход. Я пытался обосновать это целой статьёй. Понимаю, что мог не выразить свои мысли достаточно понятно, но понятней у меня пока не получается.

    А я вот одного не понял, зачем вот здесь:
    caller = self.g.send(arg)
    caller(callback=self.callback)
    после caller = self.g.send(arg) опять вызывается caller(callback=self.callback)?

    В первой строчке caller не вызывается. В это время генератор висит на каком-то yield, ожидая просыпания. Первая строка будит его, посылая ему arg. Разбуженный генератор выполняет кусок кода до следующего yield и к нам из него возвращается новый caller, который мы должны вызвать дальше.

  18. Вячеслав

    Ну так здесь только 1 yield:

    def process_url(url):
        result = yield get_url(url)
        print result
    

    Функция get_url вызывается уже на этапе инициализации класса:

    caller = self.g.next()
    caller(callback=self.callback)
    

    Здесь мы уже имеем ответ от этой функции

    def callback(self, arg):
        try:
            self.g.send(arg)
    

    Вот этот самый self.g.send(arg) пропихивает результат в def process_url(url):
    Там он печатается и программа заканчивается. Где тут еще 1 yield? Не могу понять, объясните пожалуйста.
    Кстати, если убрать вызов после caller = self.g.send(arg)
    ничего не меняется (то есть, как я думаю, исполнение туда и не доходит).

  19. Ну так здесь только 1 yield:

    def process_url(url):
        result = yield get_url(url)
        print result
    

    А... Ну это пример только. Здесь всё вот так происходит:

    1. Мы получаем caller'а из первого же next в конструкторе, генератор остаётся висеть на yield
    2. callback вызывает send(arg)
    3. Отрабатывает кусок кода после yield
    4. А дальше генератор заканчивается и в вызвавшем его callback от этого возникает исключение StopIteration, и до нового caller'а дело не доходит.

    Этот send нормально работает в случаях, когда у генератора несколько yield'ов:

    def process_url(url):
        result = yield get_url(url)
        new_url = parse(result)
        new_result = yield get_url(url)
        print new_result
    

    Вот тут после первого send(arg) не будет исключения, а выполнится parse(result) и генератор остановится на следующем yield, вернёт новый caller, и его надо будет снова звать.

  20. Вячеслав

    А, все, теперь понятно, здесь как бы рекурсия, чтобы обработать любое количество yield в process.

    Спасибо!

  21. helloworld с торнадой:
    http://dumpz.org/15078/

    Пока не понятно что делать с ошибками (.

  22. Хо! Быстро ты :-). А в чём "# Broken here" заключается?

    Кстати, можно обойтись без своей do_get:

    --- hw.old.py   2009-12-11 23:01:01.965425578 +0300
    +++ hw.new.py   2009-12-11 23:00:33.101425824 +0300
    @@ -15,12 +15,8 @@
     class MainHandler(tornado.web.RequestHandler):
         @process
         def perform_response(self, url):
    -        @async
    -        def do_get(callback):
    -            http = tornado.httpclient.AsyncHTTPClient()
    -            http.fetch(url, callback=lambda response: callback(response))
    -
    -        response = yield do_get()
    +        http = tornado.httpclient.AsyncHTTPClient()
    +        response = yield async(http.fetch)(url)
    
             if response.error:
                 # Broken here
    

    По счастью, параметр в http.fetch тоже называется "callback".

  23. Я понял проблему с ошибкой: http://dumpz.org/15084/

    • Чтобы починить 500 ошибки пришлось таки написать свой fetch, который заворачивает callback в торнадовский async_callback
    • Зато я избавился от process_response, потому что сам get можно просто завернуть в @process
  24. Более менее разобрался.Обработать можно примерно так:

    if response.error:
        if self._headers_written:
            log.error("Exception after headers written")
        else:
            self._handle_request_exception(response.error)
    

    В самой торнаде для этого введен специальный метод:

    If you make calls to asynchronous library functions that require a callback (like the HTTP fetch function above), you should always wrap your callbacks with self.async_callback. This simple wrapper ensures that if your callback function raises an exception or has a programming error, a proper HTTP error response will be sent to the browser, and the connection will be properly closed.

    Сначала я про него забыл, потом стало понятно, что тут он не поможет. Отуда и взял это кусочек кода.

    helloworld v2: http://dumpz.org/15085/

  25. Упс, пока писал ответ, ты уже решил и лучше ).

  26. Мне ещё не очень нравится название "process", но лучше я пока не придумал.

    А что если назвать его @coroutine?

  27. А что если назвать его @coroutine?

    Это имя уже занял David Beazley под генераторы, которым сразу делается один next().

  28. генераторы, которым сразу делается один next().

    Ого, спасибо за наводку!
    David Beazley большой молодец, полвечера разбирался с его замечательными примерами coroutines.

  29. Согласен, что асинхронное программирование будут важнейшей темой наступающего десятилетия.

    Однако, в клиентском JavaScript проблемы асинхронного программирования являются критичными уже сейчас. Ведь большинство клиентского кода на JS — это и есть обработчики асинхронных Ajax-запросов.

    Одной из задач, действительно, является написание асинхронного кода без callback'ов. В JavaScript, как и в вашем примере на Питоне, это возможно с использованием генераторов и оператора yield.

    Генераторы доступны начиная с JavaScript 1.7.

    Скажем, ваш пример на Питоне, можно реализовать на JavaScript с использованием библиотеки Er.js таким образом:

    <script type = "application/javascript;version=1.7"/ src = "er.js"></script>
    <script type = "application/javascript;version=1.7"/>
    function process_url ()
      {
      //Делаем Ajax запрос и, до его завершения, отдаем управление наружу
      var result = yield Er.Ajax.get("http://download.alik.su");
      //После выполнения запроса, перехватываем управление и показываем окошко с результатом
      alert (result);
      }
    
    //Оборачиваем и асинхронно запускаем process_url
    Er.spawn (process_url);
    //Это сообщение будет показано до завершения process_url
    alert ("his line runs before get_url completes");
    </script>
    
  30. Ваня, погляди на fiber-ы в руби и попробуй лучше допилить питон, что бы он так же умел. Это будет в тысячу раз проще, честно =)

  31. Andrey Popp

    Max Lapshin, чем они отличаются от генераторов в Python, кроме того, что можно
    передавать управление другим fiber-ам? В python это делает немногим сложнее:

    for x in subcoroutine()
        yield x
    

    Ну ещё и greenlet'ы есть отдельным модулем.

    Вообщем разницы никакой...

  32. Александр Чепиков

    В Twisted после Python 2.5 рекомендуется использовать генераторы, идея такая:

    @defer.inlineCallbacks
    def someFunction():
    a = 1
    b = yield deferredReturningFunction(a)
    c = yield anotherDeferredReturningFunction(a, b)
    defer.returnValue(c)

  33. Алик Кириллович: В JS куда проще и нативнее сделать библиотечку для chain-обработки типа:

    perform(
        function() { some action #1 }
    ).then(
        function() { some action #2, depends on #1 },
        function() { some action #3, depends on #1 }
    ).then(
        function() { some action #4, depends on ##2,3 }
    )
    alert ("his line runs before chain completes");
    

    И императивно, и синтаксически привычно, и точно понятно, что за чем и в какой момент.

  34. Не ожидал что мой комментарий лишит тебя сна :) Хотя такое применение yield мне тоже когда-то взрывало мозг, и я смог разобраться как оно должно работать, только написав свой велосипед. Надеюсь, благодаря тебе и David'у Beazley, эта тема станет понятнее и программисты смогут проводить ночи более продуктивно :)

    Не могу не написать о таких недочетах в твоих примерах:
    - async-функция, являющаяся process'ом не может возвращать значения
    - нет обработки исключений, их желательно из callback через throw передавать в генераторы
    - режет глаз добавление параметров функции с помощью декоратора. Из-за этого при чтении кода сложно понять откуда берется callback. Работа с Deferred в twisted или Operation в cogen, как мне кажется, прозрачнее.

  35. async-функция, являющаяся process'ом не может возвращать значения

    Почему? Любая async-функция, она "возвращает" значение, вызывая callback, неважно завёрнута она при этом в process или нет. С точки зрения вызывающего это выглядит как возврат значения.

    Работа с Deferred в twisted или Operation в cogen, как мне кажется, прозрачнее

    Мне хотелось сделать код, не привязанный ни к какому новому типу объекта. Я посчитал соглашение о неявно передаваемом параметре меньшим злом.

  36. Роман

    Извините, если не в тему. Как Вы относитесь к подходу Flow.js и Step.js к упрощению работы с callbackами?

  37. Никогда на них не смотрел, честно говоря.

Добавить комментарий

Текст через пустую строку превращается в отдельные абзацы, цитата отделяется символами > слева, список состоит из пунктов с дефисом слева, курсив выделяется * с каждой стороны, жирный - двойными **, блоки кода отступают слева на 4 пробела