В Питоне для парсинга JSON обычно берут либо включённый в стандартную библиотеку simplejson, либо популярный в последнее время cjson. Обе библиотеки обрабатывают JSON одним куском: парсят всё целиком и возвращают нативный питоний объект. Мне же недавно понадобилось обрабатывать JSON потоком, в духе SAX. Это имеет смысл, когда размер данных начинает исчисляться мегабайтами, а обработка не требует хранения всего объекта в памяти целиком.

Оказывается, в мире эта задача решена библиотекой yajl (Саша, спасибо за наводку), и к ней есть два питоньих биндинга. Мне не понравились оба, поэтому я написал свой — ijson.

Название обыгрывает тот же принцип, что и функции из itertools: "imap", "ifilter" и т.д.

Существующие решения

py-yajl не понравился сразу, потому что он предоставляет интерфейс в виде функций "dumps" и "loads", а следовательно полностью скрывает всю потоковую природу yajl.

Следующий — yajl-py — оказался интересней. По использованию он похож на SAX-парсер: вы объявляете свой класс-обработчик, передаёте его в парсер, и он дёргает методы этого класса, обрабатывающие отдельные события из входного потока. Проблема этого подхода в том, что вызов парсера синхронный — он сам прокручивает цикл чтения входных данных, и этот процесс нельзя удобно остановить и продолжить. Мне же нужен был по сути питоний итератор, чтобы читать из него события самому. Такой итератор можно обернуть в собственный генератор-обработчик и вернуть его из приложения в WSGI-сервер.

ijson

Я подумал, что такой итератор сделать будет несложно, взял yajl-py и переделал его основную функцию с циклом парсинга на генератор. Потом посмотрел вокруг, немного порефакторил, потом ещё, и ещё, и в итоге теперь мой parse.py почти ничем не напоминает оригинальный yajl_parse.py :-). Тем не менее, я благодарен Хатему Нассрату — автору yajl-py — потому что если бы я не посмотрел в его код, я бы никогда не взялся делать обвязку к сишной библиотеке.

В итоге появился итератор basic_parse(file), который генерирует пары (event, value), соответствующие тому, что возвращает сишная библиотека (плюс приведение к удобным питоновым типам). Однако быстро стало ясно, что обрабатывать их неудобно, потому что события возвращаются вне контекста: вам пришло ('map_key', 'name'), но вы понятия не имеете, в каком объекте этот ключ. Поэтому я обернул базовый парсер в ещё один, сохраняющий контекст. Он генерирует уже не пары, а тройки, где первый параметр — путь, показывающий местоположение собтия в дереве объекта.

Вот для такого документа:

{
    "array": [1, 2],
    "map": {
        "key": "value"
    }
}

... парсер выдаст такую последовательность событий:

('',           'start_map',   None)
('',           'map_key',     'array')
('array',      'start_array', None)
('array.item', 'number',      1)
('array.item', 'number',      2)
('array',      'end_array',   None)
('',           'map_key',     'map')
('map',        'start_map',   None)
('map',        'map_key',     'key')
('map.key',    'string',      u'value')
('map',        'end_map',     None)
('',           'end_map',     None)

Префиксы пути (первый элемент) позволяют во многих случаях не писать сложной логики хранения состояния в пользовательском коде, а просто реагировать на события какого-то конкретного поддерева.

А буквально вчера я добавил ещё один итератор items(file, prefix), который отдаёт не голые события, а нативные питоновые объекты, расположенные по заданному префиксу:

from ijson import items

for item in items(file, 'docs.item.name'):
    do_something_with(item)

Это, конечно, самый удобный для обработки способ, но он должен быть помедленней из-за постоянно создающихся временных объектов.

Обработка корутинами

Классический способ обработки потокового парсинга — написать класс, состоящий из callback'ов, которые будут вызываться по соответствующим событиям. Всё полезное состояние при этом хранится в инстансе класса. Однако этот способ не отличается наглядностью. Кроме того, он совсем не работает, если вы хотите использовать при обработке конструкции с with, потому что в один оператор, очевидно, нельзя обернуть несколько разных вызовов callback'ов.

Моя текущая задача на работе — это генерация XML из JSON-данных. Я использую свою же elementflow, которая генерирует готовый XML, не требуя никакого промежуточного объектного представления. Соответственно, соединив событийный парсер ijson и событийный генератор elementflow, можно получить довольно быструю и очень экономную в памяти комбинацию. Проблема только в том, что elementflow как раз написана в расчёте на with.

Встречайте — корутины Дэвида Бизли (здесь фанфары)!

Базовая идея состоит в том, чтобы вместо класса с callback'ами использовать питоний генератор, в который через .send() можно передавать значения. Если вы не видели этих слайдов, то посмотрите прямо сейчас, там вся концепция объясняется очень хорошо, и я совсем не хочу излагать её своими словами.

Такие генераторы как раз позволяют использовать with, поскольку теперь все события парсера получает одна функция:

@coroutine
def converter(xml):
    while True:
        prefix, event, value == yield # get another event from parser

        if prefix == 'rows.item.id':
            # store the value in a local var
            id = value

        elif prefix == 'rows.item.value':
            # create another coroutine that knows how to process 
            # 'rows.item.value' contents
            target = value_coroutine(xml)

            # generate a containg XML element
            with xml.container('value', {'id': id}):

                # secondary loop cosuming 'rows.item.value'
                while (prefix, event) != ('rows.item.value', 'end_map'):
                    prefix, event, value == yield
                    # offload events into a target
                    target.send((prefix, event, value))

Поработав какое-то время с кодом в таком стиле, я заметил повторяющиеся паттерны, парочку из которых завернул в утилиты, которые теперь лежат в ijson.utils:

В итоге весь код принимает вид дерева диспетчеров с листьями в виде генераторов конечных XML-элементов. Диспетчеры комбинируются каждый раз в соответствии со структурой входного JSON-дерева. Довольно непривычно, но вполне понятно на мой взгляд.

Последний момент — это организация одновременного парсинга и выдачи XML. Для этого пишется обычный генератор с очередью в памяти, который можно отдавать прямо в WSGI-сервер:

def generator(file):
    queue = elementflow.Queue()
    with elementflow.xml(queue, 'root') as xml:
        g = consumer_coroutine(xml)
        for event in ijson.parse(file):
            g.send(event)
            if len(queue) > BUF_SIZE:
                yield queue.pop()
    yield queue.pop()

Стоит ли оно того

Если по эффективности, то коротко — да. На реальной задаче выдача двухмегабайтного XML'а из примерно такого же JSON'а работает на 30% быстрее по сравнению с cjson даже под управлением форкающейся Django (то есть памяти тут не экономится). У меня под рукой пока нет жёстких данных, да и пост этот и так уже длинный, поэтому я хочу написать про тестирование отдельно.

Если говорить о размере и сложности кода, то тут всё очень субъективно. В любом случае надо понимать, что потоковый код обязательно будет немного больше и сложнее, чем "объектный".

Упражнение: ObjectBuilder

Хотите развлечься? :-)

Нужно написать код, создающий на основе событий от парсера нативный объект. Фактически — это то, что делают внутри так или иначе все непотоковые парсеры. В ijson такой есть — "ObjectBuilder" в модуле "parse" — но я предлагаю в его код пока не смотреть. Дело в том, что он у меня получился хоть и компактным, но довольно заумным. Было бы чудесно, если бы кто-нибудь написал решение проще, а в этом случае чужой код часто не даёт придумать что-то своё.

Комментарии: 19

  1. Сергей Кириллов

    не совсем по теме, но я в своё время был удивлён увидев что cjson работает медленнее чем simplejson с собраными сишными оптимизациями.

  2. Александр Кошелев

    не совсем по теме, но я в своё время был удивлён увидев что cjson работает медленнее чем simplejson с собраными сишными оптимизациями.

    У simplejson со speed-ups вроде самый быстрый энкодер, а у cjson декодер.

  3. Sergey

    Обработает ли ваш парсер такой объект? '[{"key": "value", obj: []}]'

  4. Ivan Sagalaev

    Конечно, почему нет? Правда с исправлением того, что "obj" должен быть в кавычках... А что вам мешает попробовать? :-)

  5. admin

    Хотите развлечься? :-) .. придумать что-то своё

    from itertools import izip, starmap, takewhile
    
    star = lambda f: lambda x: f(*x)
    startakewhile = lambda f, *args: takewhile(star(f), *args)
    
    def iload(it):
        def parse(k, v):
            if k in ('null', 'boolean', 'integer', 'double', 'number', 'string', 'map_key'):
                return v
            elif k == 'start_array':
                return list(iload(startakewhile(lambda k, v: k != 'end_array', it)))
            elif k == 'start_map':
                return dict(izip(iload(startakewhile(lambda k, v: k == 'map_key', it)),
                                 iload(startakewhile(lambda k, v: k != 'end_map', it))))
            else:
                raise ValueError(k, v)
        return starmap(parse, it)
    
    def load(tokens):
        """
        Usage:
            load(basic_parse(f))
        """
        return tuple(iload(iter(tokens)))[0]
    
  6. Ivan Sagalaev

    Круто! Нравится идея использовать рекурсию вместо поддержки стека вручную. Однако что-то тут не так. Вот на таком документе падает с ValueError:

    {
      "docs": [
        {
          "string": "\u0441\u0442\u0440\u043e\u043a\u0430",
          "null": null,
          "boolean": false,
          "integer": 0,
          "double": 0.5,
          "long": 10000000000,
          "decimal": 10000000000.5
        },
        {
          "meta": [[1], [2]]
        },
        {
          "meta": {"key": "value"}
        },
        {
          "meta": null
        }
      ]
    }
    
  7. Maximbo

    Подправленая версия admin'а:

    from itertools import izip, takewhile, imap
    from ijson import parse
    
    def build_object(last, parser):
        is_map_key = lambda (current, event, value): event == 'map_key'
        isnt_map_end = lambda (current, event, value): event != 'end_map'
        isnt_list_end = lambda (current, event, value): event != 'end_array'
    
        def build_chunk((current, event, value)):
            if event == 'start_map':
                return dict(izip(
                    imap(build_chunk, takewhile(is_map_key, parser)),
                    imap(build_chunk, takewhile(isnt_map_end, parser))))
            elif event == 'start_array':
                return list(imap(build_chunk, takewhile(isnt_list_end, parser)))
            else:
                return value
        return build_chunk(last)
    
    
    def items2(file, prefix):
        parser = iter(parse(file))
        try:
            while True:
                current, event, value = parser.next()
                if current == prefix:
                    yield build_object((current, event, value), parser)
        except StopIteration:
            pass
    
  8. admin

    Интересно. lambda (current, event, value): .. Я не знал, что python умеет распаковывать параметры в объявлении функции. Хотя чувстовалось, что это должно быть, по аналогии с выражениями [(a, b) for x in xs].

    Maximbo, здорово. Что-то подобное мне и хотелось изобразить, однако я не додумался, как вытащить parser наружу.

    Придумался вариант без генераторов и с обобщенной функцией build_object(), которая может строить питоньи структуры из произвольных данных (например, basic_parse() и parse() генерируют разные кортежи). Для этого функция дополнительно получает два конструктора: Event и Value, которые могут "извлекать" из item нужную информацию.

    from functools import partial
    from itertools import izip, takewhile, imap, ifilter
    from operator import itemgetter
    
    from ijson import parse
    from ijson import basic_parse
    
    def build_object(parser, Event, Value, item):
        is_map_key = lambda item: Event(item) == 'map_key'
        isnt_map_end = lambda item: Event(item) != 'end_map'
        isnt_list_end = lambda item: Event(item) != 'end_array'
    
        def build_chunk(item):
            if Event(item) == 'start_map':
                return dict(izip(
                    imap(build_chunk, takewhile(is_map_key, parser)),
                    imap(build_chunk, takewhile(isnt_map_end, parser))))
            elif Event(item) == 'start_array':
                return list(imap(build_chunk, takewhile(isnt_list_end, parser)))
            else:
                return Value(item)
        return build_chunk(item)
    
    def items2(file, prefix):
        Current, Event, Value = itemgetter(0), itemgetter(1), itemgetter(2)
        parser = iter(parse(file))
        has_prefix = lambda item: Current(item) == prefix
        compile = partial(build_object, parser, Event, Value)
        return imap(compile, ifilter(has_prefix, parser))
    
    def items1(file):
        Event, Value = itemgetter(0), itemgetter(1)
        parser = iter(basic_parse(file))
        compile = partial(build_object, parser, Event, Value)
        return imap(compile, parser)
    

    А вот еще вариация на тему items2: чистая и безопасная — здесь parser гарантированно отдает лишь элементы с заданным префиксом. С другой стороны, данная реализация менее эффективна нежели items2, поскольку делает проверки префиксов для каждого элемента.

    def items3(file, prefix):
        Current, Event, Value = itemgetter(0), itemgetter(1), itemgetter(2)
        has_prefix = lambda item: Current(item) == prefix or \
                                  Current(item).startswith(prefix + '.')
        parser = ifilter(has_prefix, iter(parse(file)))
        compile = partial(build_object, parser, Event, Value)
        return imap(compile, parser)
    

    В реальной жизни, разумеется, так не пишу. =) Даже здесь всю эту безумную функциональщину можно записать чище, если заранее вязать методы с данными (о да — ООП: абстракция + инкапсуляция = профит :) ) — просто научив parser, вместо кортежей, кидать объекты (class Item: event, value). %)

  9. admin

    вот вариант с примесями ООП. здесь парсер генерирует не кортежи, а поток этих самых объектов.

    python замечателен в том плане, что позволяет использовать методы в орыве от объекта. например, благодаря этому мне удалось записать отрицающий предикат: "токен не является концом массива" — not_f(Token.is_array_end), — а затем использовать его в потоке токенов, как будто это их родной метод.

    from functools import partial
    from itertools import izip, takewhile, imap, ifilter
    from ijson import basic_parse, parse
    
    EVENT_MAP_KEY = 'map_key'
    EVENT_START_MAP, EVENT_END_MAP = 'start_map', 'end_map'
    EVENT_START_ARRAY, EVENT_END_ARRAY = 'start_array', 'end_array'
    
    class Token(object):
        def __init__(self, item):
            self.event, self.value = item
        is_anyone = lambda self: True
        is_map_key = lambda self: self.event == EVENT_MAP_KEY
        is_map_start = lambda self: self.event == EVENT_START_MAP
        is_map_end = lambda self: self.event == EVENT_END_MAP
        is_array_start = lambda self: self.event == EVENT_START_ARRAY
        is_array_end = lambda self: self.event == EVENT_END_ARRAY
    
    class TokenWithPath(Token):
        def __init__(self, item_with_path):
            self.path = item_with_path[0]
            item = item_with_path[1:]
            super(TokenWithPath, self).__init__(item)
        has_prefix = lambda self, prefix: self.path == prefix or \
                                          self.path.startswith(prefix + '.')
    
    not_f = lambda f: lambda *x, **kw: not f(*x, **kw)
    
    def compile(parser):
        parse_while = lambda pred, f: imap(f, (parser if pred == Token.is_anyone 
                                               else takewhile(pred, parser)))
        def build_chunk(token):
            if token.is_map_start():
                return dict(izip(parse_while(Token.is_map_key, build_chunk),
                                 parse_while(not_f(Token.is_map_end), build_chunk)))
            elif token.is_array_start():
                return list(parse_while(not_f(Token.is_array_end), build_chunk))
            else:
                return token.value
    
        return parse_while(Token.is_anyone, build_chunk)
    
    def items1(file):
        parser = imap(Token, basic_parse(file))
        return compile(parser)
    
    def items3(file, prefix):
        parser = ifilter(partial(TokenWithPath.has_prefix, prefix=prefix),
                         imap(TokenWithPath, parse(file)))
        return compile(parser)
    

    было бы интересно посмотреть оценки производительности разных вариантов на реальных json с различной степенью сложности структур: что быстрее - генераторы или итераторы.

    хех, кажется тема зацепила . извиняюсь за спам. :)

  10. Mr Tiny

    This is just brilliant, thanks!

  11. ijson

  12. ei-grad

    Нужно заметить, что на практике способ с

    for item in items(file, 'docs.item.name'):
        do_something_with(item)
    

    оказался ощутимо быстрее подхода с корутинами. Но тут всплывает другой момент, что elementflow гораздо удобнее было бы использовать, если бы наш обработчик не был генератором.

    Меня всё мучает желание переделать ijson (или другую библиотеку на основе yail) в асинхронном стиле, для использования с tornado. Там бы и elementflow раскрыл свой потенциал...

  13. Ivan Sagalaev

    Я бы не обобщал тот наш случай на "практику вообще". У нас было очень много мелкой логики и глубокий стек.

    Для асинхронного стиля я смотрел в своё время на http://code.google.com/p/evserver/. Кажется, он бы лёг на elementflow-задачу просто идеально. На Tornado свет клином не сошёлся :-).

  14. Stu

    Hi, Is it possible to generate json in a streaming manner in python.

    E.G. if a structure was like

     def my_generator():
         yield 1
         yield 2
    
     { "header":  "blah fixed data",
       "data":  my_generator() }
    

    Is there any way to stream this sort of structure ?

  15. Ivan Sagalaev

    Stu, it shouldn't be particularly hard, you just have to output all the strings one by one:

    def generate_json():
        yield '{"header": "fixed data", "data": ['
        comma = ''
        for value in my_generator():
            yield comma
            yield '"%s"' % value
            comma = ','
        yield ']}'
    
  16. keios
  17. ivanff

    что то заюзал я ijson в laoddata джанги на гиговом файле, и память хоть и в разы медленнее, но сожрало всю 12 гигов.

    import ijson.backends.yajl2 as ijson
    
    def Deserializer(stream_or_string, **options):
        """
        Deserialize a stream or string of JSON data.
        """
        if not isinstance(stream_or_string, (bytes, six.string_types)):
            stream_or_string = stream_or_string
        # if isinstance(stream_or_string, bytes):
        # stream_or_string = stream_or_string.decode('utf-8')
        try:
            # objects = json.loads(stream_or_string)
            objects2 = ijson.items(stream_or_string, '')
    
            for objects in objects2:
                for obj in PythonDeserializer(objects, **options):
                    yield obj
    
  18. Ivan Sagalaev
    objects2 = ijson.items(stream_or_string, '')
    

    Префикс '' означает "загрузить всё одним объектом", конечно оно всё съело. Если в корне json'а лежит массив, и вам нужен каждый элемент, то нужно:

    objects2 = ijson.items(stream_or_string, 'item')
    
  19. Sergey K

    Ivan Sagalaev, спасибо тебе, добрый человек, за ijson и очень полезный комментарий про массив элементов. Два дня бился и наконец-то все заработало!

    Очень не хватает подробной документации.

Добавить комментарий