Программирование иногда не даёт нормально спать.
Придёт в голову какая-нибудь идея, присмотришься к ней — вроде простая и красивая. Потом начинаешь прикидывать подробности, и вроде тоже всё складывается. А потом садишься писать код... Тут-то и получается, что в реальности есть ещё и другие подробности, которые мозг при умозрительном разглядывании удобно опускал. Компилятор же, не обладающей такой моральной гибкостью, упрямо твердит, что нет, оно так не работает. И так не работает. И вот так тоже.
И вот теперь у вас Проблема. Вылезать из головы этот код отказывается напрочь. И вы, засыпая, с регулярностью раз в 20 минут осознаёте, что вместо того, чтобы засыпать, опять пытаетесь писать в уме код, и голова от этого болит всё больше и больше. И с утра за завтраком, в душе, по дороге на работу, на работе — всё то же самое. И так, пока либо оно не решится, либо вы не поймёте, что это принципиально невозможно.
У меня так было на прошедших выходных. Увидел комментарий от Yuppy про использование yield для асинхронных вызовов, вспомнил, что когда-то где-то уже видел такую штуку (может как раз в том же Twisted), и на свою голову решил прикинуть в подробностях, как оно работает. Понятно, что можно просто пойти посмотреть в код Twisted и в код Yuppy (что я, в итоге, тоже делал), но хотелось именно построить всё руками, чтобы лучше осознать.
В итоге, промучавшись несколько дней и три раза всё переписав, я это осилил. И теперь у меня два положительных результата: подробное описание принципа работы и небольшая библиотечка, которую можно использовать для таких вещей, независимо от каких-либо фреймворков (ссылка "скачать" в конце).
Чего хочется
В общем и целом, хочется писать асинхронный код без callback'ов. Потому что они плохо читаются, потому что выстраиваются в цепочки в неестественном порядке, потому что заставляют передавать параметры через глобальные значения и т.д. Хочется же, чтобы было как-то так:
def get_url(url):
# ... асинхронный запрос по HTTP
def process_url(url):
result = yield get_url(url) # здесь process_url теряет управление
print result # сюда управление возвращается после завершения
# асинхронного вызова
process_url('http://...')
print 'This line runs before get_url completes'
Есть функция get_url
, которая делает асинхронный запрос по HTTP. Это значит, что при её вызове она не ждёт завершения запроса, а возвращает управление вызвавшему сразу. А функция process_url
, которая вызывает get_url
, хочет получить управление тогда, когда асинхронный вызов завершился, не раньше. Принципиальную возможность так сделать даёт yield
, который для функции process_data
работает как return
— она возвращает значение куда-то во Внешний Эфир. А начиная с версии Питона 2.5 этот Внешний Эфир может вернуть какое-то значение обратно внутрь process_data
. Соответственно наша задача состоит в том, чтобы этот Внешний Эфир реализовать так, чтобы:
- вызов
process_url
не блокировал дальнейшее выполнение программы ('This line runs before...') - в
process_url
не надо было писать никаких callback'ов: неявно callback'ом является кусок кода после yield get_url
не была сильно ограниченной в реализации: запускает ли она тред, делает ли асинхронный IO-вызов сама, передаёт ли эту работу в какой-то фреймворк (Twisted, Tornado)
Начал я как раз с последнего. Поскольку get_url
возвращается быстрее, чем делает собственную работу, единственный способ для неё что-то сообщить внешнему миру — это принимать callback, который потом вызывать. Чтобы говорить конкретней, давайте эту функцию реально напишем, сделав асинхронность например с помощью треда (хотя это не принципиально):
def get_url(url, callback):
def read():
s = urllib2.urlopen(url).read()
callback(s)
threading.Thread(target=read).start()
Однако, когда мы зовём её из process_url
, та не может (да и не хочет) передать ей никакого явного callback'а — у ней его нет. Следовательно нам нужно как-то задекорировать get_url
. Я назвал этот декоратор "async":
def async(func):
def wrapper(*args, **kwargs):
def caller(callback):
return func(callback=callback, *args, **kwargs)
return caller
return wrapper
Он подменяет клиентскую функцию на свой wrapper, который не вызывает функцию сразу. Вместо этого он делает её ленивой: отдаёт наружу caller, который сможет позже вызвать func, передав ей callback. Работать это должно так:
- Функция
process_url
вызываетyield get_url(url)
, гдеget_url
задекорирована вasync
. - Значение под
yield
— этоcaller
, который знает, как вызватьget_url(url)
. - Через
yield
этот объект попадает во Внешний Эфир, который тут же вызываетcaller
, передавая ему какой-то конкретный callback.
Здесь уже должно быть понятно, что callback Внешнего Эфира должен возвращать управление обратно в генератор process_data
, пропихивая ему то значение, которое получит по завершению асинхронного вызова. Внешний эфир реализуется небольшим объектом и навешивается на функцию process_data
опять же декоратором с названием "process":
class CallbackDispatcher(object):
def __init__(self, generator):
self.g = generator
caller = self.g.next()
caller(callback=self.callback)
def callback(self, arg):
try:
caller = self.g.send(arg)
caller(callback=self.callback)
except StopIteration:
pass
def process(func):
def wrapper(*args, **kwargs):
CallbackDispatcher(func(*args, **kwargs))
return wrapper
Последовательность работы:
- Декоратор
process
нужен только для того, чтобы вызвать клиентскую функцию, и передать полученный генератор вCallbackDispatcher
. CallbackDispatcher
прямо в конструкторе запускает генератор черезself.g.next()
- В
process_data
при этом выполняется кусок кода до первогоyield
, и через него возвращается значение — caller дляget_url
. - Этот
caller
приезжает вCallbackDispatcher
в качестве возвратного значения next'а. caller
тут же вызывается с callback'ом — методом самогоCallbackDispatcher
.- На это моменте заканчивается конструктор диспетчера, а следовательно и работа декоратора
process
— соответственно программа тут же продолжается дальше наprint 'This line runs ...'
. - Тем временем где-то в
get_url
происходит асинхронный HTTP-вызов, заканчивается, и она вызываетCallbackDispatcher.callback
с каким-то результатом своей работы. CallbackDispatcher.callback
пропихивает результат обратно в генераторprocess_data
, тем самым запуская его следующий кусок.
И вот, собственно, теперь у нас есть базовая функциональность:
- Обрабатывать результаты асинхронных вызовов можно последовательно, сколько угодно раз в функции. Для этого её достаточно завернуть в
@process
и вызывать асинхронные функции черезyield
. - Асинхронная функция должна быть завёрнута в
@async
, должна принимать параметрcallback
и возвращать в него одно значение (если надо несколько, их можно упаковать в tuple например). - Это неочевидно, но async-функции сами могут быть process'ами и вызывать из себя другие async-функции. Единственное, при навешивании обоих декораторов
async
должен быть внешним.
Бонус: мультивызовы
Конечно я слукавил, когда сказал, что это всё было чисто теоретическим упражнением. На самом деле, у нас тут в Яндексе есть вполне конкретный юзкейс: небольшой HTTP-прокси, написанный на Tornado, который асинхронно обрабатывает входящие запросы и асинхронно же делает свои. Код там очень трудно читается из-за явных callback'ов, и всё это придумывалось для его упрощения.
Так вот там есть задачка, в которой надо запустить несколько асинхронных функций одновременно, дождаться их всех и список всех результатов обработать целиком (map-reduce, ага). Сейчас оно написано примерно так:
for item in items:
async_fetch(item, callback)
results = []
def callback(item, result):
items.remove(item)
results.append(result)
if not items:
process_results(results)
Вместо такого вороха обслуживающего кода хочется иметь что-то такое:
results = yield map(async_fetch, items)
# обработка results
Чтобы так было, надо всего лишь научить Внешний Эфир (CallbackDispatcher
) таким вещам:
- понимать, что через
yield
может прийти не только одинокий caller, но и их последовательность - посчитать caller'ы и подготовить список для их результатов
- в callback передавать ещё и порядковый номер задачи, чтобы порядок результатов совпадал с порядком задач
Выглядит это вот так (__init__
опущен для краткости):
class CallbackDispatcher(object):
def call(self, callers):
# приведение параметра к последовательности
single = not hasattr(callers, '__iter__')
if single:
callers = [callers]
# запуск перенумерованных caller'ов с прилеплением
# к callback'у номера задачи и признака одинокого результата
for count, caller in enumerate(callers):
caller(callback=partial(self.callback, count, single))
# создание места под результаты
self.call_count = count + 1
self.results = [None] * self.call_count
def callback(self, index, single, arg):
# сохранение результата
self.call_count -= 1
self.results[index] = arg
if self.call_count > 0: # возврат, если есть ещё кого ждать
return
# отсылка результата в генератор в виде списка или одного значения
try:
result = self.results[0] if single else self.results
self.call(self.g.send(result))
except StopIteration:
pass
В примерах работает :-)
Код
- Проект на Launchpad: adisp
- Архив: adisp-0.1.zip
- Просмотр кода
Комментарии: 39
не взлетел
Странно... Может в OS X Питон как-то так скомпилирован, что не ждёт выполнения тредов при завершении главного? Добавь time.sleep(5) в конце.
Это называется "асинхронный кусок в синхронном мире" :-) И приведенный тобой в тексте юскейс подтверждает это :-)
sleep ничего не изменил
Да, кстати, "Внешний Эфир" традиционно называется Реактор, так что вместо @process лучше было бы @reactor :-)
Очень настоятельно рекомендую посмотреть в сторону eventlet.
Нет, это не Реактор. Мой Внешний Эфир — это прослойка между юзерским кодом и каким-то внешним реактором. Она только делает возможным быстро вернуть управление в него.
Андрей, про eventlet я знаю, да.
Спасибо Иван, очень интересно. Я тоже мучился с примером Yuppy.
У меня сработало. python 2.5 freebsd 6.2.
В twisted можно использовать DeferredList для запуска паралельных операций,но общий калбэк надо писать. И вариант с yield красивее.
Сразу вспомнилась концепция из cogen (http://cogen.googlecode.com/svn/tags/0.2.1/docs/build/intro/introduction.html).
@Иван, пример работает под Python 2.5.4 и Python 2.6.4 в Mac OS X 10.5.8. Мой коллега Дима Смолин, когда я рассказал ему про твою статью "Надо все переписать" ответил, что вместо того чтобы переходить на Closure или node.js, кто-нибудь напишет парочку декораторов и асинхронный код можно будет писать на питоне :))))
Проблема, к сожалению, не только в callback'ах. Питон императивен, и у него есть GIL. Это значит, что ничего, кроме IO, эффективно параллелить нельзя. Сейчас нам этого хватает, но завтра хватать перестанет.
Проверил на MacOSX 10.6.2:
- падает на системном питоне 2.6.1
- падает на макпортовом питоне 2.5.4
- выполняется на макпортовом питоне 2.6.4
Падает на PyThread_acquire_lock().
GIL тормозит только треды, а twisted и иже с ним не предполагают тредов. Если вам надо будет занять 8 ядер вашего мегасервера вы просто запустите 8 процессов.
Но вы, как я понимаю, сразу смотрите в будущее, в котором будет 128 ядер. Вот только я думаю и оперативной памяти там тоже будет в достатке, а потому метод все также продолжит работать.
Поэтому мое мнение - не о чем беспокоиться, тем более, что не CPython-ом единым. Где то там на горизонте маячит Unladen Swallow, да и PyPy имеет некоторые успехи. И никто не утверждает, что не будет чего то еще.
Так может быть если и имеет смысл что-то переписывать, то не надо обязательно искать новый язык?
Ваш Python кажется мне всё страньше и страньше O_o.
Проблема "постоянного программирования" есть конечно. Решаю я её довольно просто - нужно отвлечься на что нибудь другое, но такое, что действительно интересно. У меня это комп. игры и фехтование. Тогда всё становится нормально.
А я вот одного не понял, зачем вот здесь:
caller = self.g.send(arg)
caller(callback=self.callback)
после caller = self.g.send(arg) опять вызывается caller(callback=self.callback)?
Deepwalker:
Это не выход. Я пытался обосновать это целой статьёй. Понимаю, что мог не выразить свои мысли достаточно понятно, но понятней у меня пока не получается.
В первой строчке caller не вызывается. В это время генератор висит на каком-то
yield
, ожидая просыпания. Первая строка будит его, посылая ему arg. Разбуженный генератор выполняет кусок кода до следующегоyield
и к нам из него возвращается новый caller, который мы должны вызвать дальше.Ну так здесь только 1 yield:
Функция get_url вызывается уже на этапе инициализации класса:
Здесь мы уже имеем ответ от этой функции
Вот этот самый self.g.send(arg) пропихивает результат в def process_url(url):
Там он печатается и программа заканчивается. Где тут еще 1 yield? Не могу понять, объясните пожалуйста.
Кстати, если убрать вызов после caller = self.g.send(arg)
ничего не меняется (то есть, как я думаю, исполнение туда и не доходит).
А... Ну это пример только. Здесь всё вот так происходит:
next
в конструкторе, генератор остаётся висеть наyield
send(arg)
yield
callback
от этого возникает исключениеStopIteration
, и до нового caller'а дело не доходит.Этот send нормально работает в случаях, когда у генератора несколько yield'ов:
Вот тут после первого
send(arg)
не будет исключения, а выполнитсяparse(result)
и генератор остановится на следующемyield
, вернёт новый caller, и его надо будет снова звать.А, все, теперь понятно, здесь как бы рекурсия, чтобы обработать любое количество yield в process.
Спасибо!
helloworld с торнадой:
http://dumpz.org/15078/
Пока не понятно что делать с ошибками (.
Хо! Быстро ты :-). А в чём "# Broken here" заключается?
Кстати, можно обойтись без своей
do_get
:По счастью, параметр в http.fetch тоже называется "callback".
Я понял проблему с ошибкой: http://dumpz.org/15084/
Более менее разобрался.Обработать можно примерно так:
В самой торнаде для этого введен специальный метод:
Сначала я про него забыл, потом стало понятно, что тут он не поможет. Отуда и взял это кусочек кода.
helloworld v2: http://dumpz.org/15085/
Упс, пока писал ответ, ты уже решил и лучше ).
А что если назвать его @coroutine?
Это имя уже занял David Beazley под генераторы, которым сразу делается один next().
Ого, спасибо за наводку!
David Beazley большой молодец, полвечера разбирался с его замечательными примерами coroutines.
Согласен, что асинхронное программирование будут важнейшей темой наступающего десятилетия.
Однако, в клиентском JavaScript проблемы асинхронного программирования являются критичными уже сейчас. Ведь большинство клиентского кода на JS — это и есть обработчики асинхронных Ajax-запросов.
Одной из задач, действительно, является написание асинхронного кода без callback'ов. В JavaScript, как и в вашем примере на Питоне, это возможно с использованием генераторов и оператора yield.
Генераторы доступны начиная с JavaScript 1.7.
Скажем, ваш пример на Питоне, можно реализовать на JavaScript с использованием библиотеки Er.js таким образом:
Ваня, погляди на fiber-ы в руби и попробуй лучше допилить питон, что бы он так же умел. Это будет в тысячу раз проще, честно =)
Max Lapshin, чем они отличаются от генераторов в Python, кроме того, что можно
передавать управление другим fiber-ам? В python это делает немногим сложнее:
Ну ещё и greenlet'ы есть отдельным модулем.
Вообщем разницы никакой...
В Twisted после Python 2.5 рекомендуется использовать генераторы, идея такая:
@defer.inlineCallbacks
def someFunction():
a = 1
b = yield deferredReturningFunction(a)
c = yield anotherDeferredReturningFunction(a, b)
defer.returnValue(c)
Алик Кириллович: В JS куда проще и нативнее сделать библиотечку для chain-обработки типа:
И императивно, и синтаксически привычно, и точно понятно, что за чем и в какой момент.
Не ожидал что мой комментарий лишит тебя сна :) Хотя такое применение yield мне тоже когда-то взрывало мозг, и я смог разобраться как оно должно работать, только написав свой велосипед. Надеюсь, благодаря тебе и David'у Beazley, эта тема станет понятнее и программисты смогут проводить ночи более продуктивно :)
Не могу не написать о таких недочетах в твоих примерах:
- async-функция, являющаяся process'ом не может возвращать значения
- нет обработки исключений, их желательно из callback через throw передавать в генераторы
- режет глаз добавление параметров функции с помощью декоратора. Из-за этого при чтении кода сложно понять откуда берется callback. Работа с Deferred в twisted или Operation в cogen, как мне кажется, прозрачнее.
Почему? Любая async-функция, она "возвращает" значение, вызывая callback, неважно завёрнута она при этом в process или нет. С точки зрения вызывающего это выглядит как возврат значения.
Мне хотелось сделать код, не привязанный ни к какому новому типу объекта. Я посчитал соглашение о неявно передаваемом параметре меньшим злом.
Извините, если не в тему. Как Вы относитесь к подходу Flow.js и Step.js к упрощению работы с callbackами?
Никогда на них не смотрел, честно говоря.
ЧЯДНТ! Callback не принимает логические значения (True и False). В этом случае все валится:
Очень трудно без собственно кода примера. По одному трейсбеку непонятно, что там происходит.