Friday, June 18, 2010

Asynchronous JSON

Today in #twisted.web the topic of generating large JSON responses in a Twisted Web server came up. The problem was that the data being serialized into JSON was so large that the JSON serialization process itself would block the web server, preventing other requests from being serviced.

The first solution that came up was to split the web server into two pieces, so that the URLs which could have these JSON responses were served by a different process than was serving the rest. This is a pretty decent solution, and it also provides the benefit of using extra CPU cores if there are any available. In this case, it complicated things a little since it meant sharing a session across two processes. So we went looking for another approach.

It turns out that the json module supports incremental serialization. When I saw the JSONEncoder.iterencode method, I thought it would be great used in combination with cooperate to create a producer. This would let an application serialize a large structure to JSON without multiple processes, threads, or unreasonably blocking the reactor.

Here's the little bit of glue necessary to make things work:


from json import JSONEncoder

from twisted.internet.task import cooperate

class AsyncJSON(object):
def __init__(self, value):
self._value = value


def beginProducing(self, consumer):
self._consumer = consumer
self._iterable = JSONEncoder().iterencode(self._value)
self._consumer.registerProducer(self, True)
self._task = cooperate(self._produce())
d = self._task.whenDone()
d.addBoth(self._unregister)
return d


def pauseProducing(self):
self._task.pause()


def resumeProducing(self):
self._task.resume()


def stopProducing(self):
self._task.stop()


def _produce(self):
for chunk in self._iterable:
self._consumer.write(chunk)
yield None


def _unregister(self, passthrough):
self._consumer.unregisterProducer()
return passthrough


By using the iterencode method, this avoids spending too much time generating json output at once. Instead, a little bit of the input will be serialized at a time, and each short resulting string is available from the iterator returned by iterencode.

By using cooperate, the _produce generator will iterated in a way that lets it cooperate with the reactor and other event sources/handlers. A few chunks of json data will be written to the consumer, then execution will switch away to something else, then come back and a few more will be written, and so on.

And by using the producer/consumer interface, if the HTTP client which issued the request doesn't read the results as fast as they're being generated, the server will stop generating new output until the client catches up.

Altogether, this provides a very cool, efficient way to generate JSON output.

Here's an example to make it easier to see how one might use AsyncJSON in a resource:


from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET

class BigIntegerList(Resource):
def render_GET(self, request):
length = int(request.args['length'][0])
d = AsyncJSON(range(length)).beginProducing(request)
d.addCallback(lambda ignored: request.finish())
return NOT_DONE_YET

2 comments:

  1. Thanks for posting an excellent example example of using cooperator!

    When you say "large" how big are these json blobs that they were causing problems?

    ReplyDelete
  2. Actually I'm not sure how big the data in the original application was. A few simple benchmarks of the json module (using the timeit module) shows that serializing a list of integers takes about 2.4ms per 1000 elements (on my relatively recent desktop). So by the time you're dealing with just a few thousand integers you might begin to notice latency introduced by using a json.dump-based solution.

    ReplyDelete