Bryan Helmig

Co-founder of Zapier, speaker, musician and builder of things.

Let me preface this post by saying South is awesome. It greatly simplifies schema changes when working with databases with Django. However, if you’ve ever had to do a large data migration, you likely will see South bite the dust. It’s not really made for that. At that point you really need something a little more robust at chewing through large amounts of data.

This is where I like to use Celery. Do your setup schema migrations like normal and write a new task for handling the migration from the old to new table, and then write another migration for running after your Celery tasks complete. Here’s a little better workflow:

  1. Create migration that insert new column(s) or table(s).
  2. Create separate migration that removes old column(s) or tables(s).
  3. Write a task to migrate a discrete chunk of rows.
  4. Run the first migration.
  5. Iterate over discrete chunks of rows (think id range 1-1000, 1001-2000, etc…) and launch tasks.
  6. Wait for tasks to complete.
  7. Run the second migration.

You migrations and tasks will obviously be implementation specific, but I thought I’d share the chunking of data sets that I’ve used.

Basically, I write a task that accepts both `begin` and `end` arguments, filter for those ID ranges, and then generate a bunch of `begin` and `end` pairs. Like so:

from celery.task import task
 
@task()
def sweep_migrate(begin, end):
    from app.models import Model
 
    for instance in Model.objects.filter(id__gt=begin, id__lte=end).iterator():
        # migrate instance
 
def gen_pairs(count, cut):
    """
    Generates a list of [begin, end] pairs for appropriate slicing in
    over massive lists. (mainly for Django QS).
 
    >>> gen_pairs(42, 10)
    >>> [[0, 10], [10, 20], [20, 30], [30, 40], [40, 42]]
    """
    try:
        _pairs = range(count)[cut::cut]
        return [[x-cut, x] for x in _pairs] + [[_pairs[-1], count]]
    except IndexError:
        return [[0, count]]
 
def start_tasks(final_id):
    pairs = gen_pairs(final_id, 1000)
 
    for begin, end in pairs:
        task = sweep_migrate.apply_async(args=[begin, end])

And to launch, I simply find the highest auto increment ID of the set I want to migrate and launch a shell and do something like so:

from app.task import start_tasks
start_tasks(12345678)

Go grab a cup of coffee and wait… 🙂


Posted March 5, 2012 @ 7:19 pm under Work.