Too Busy For Words - the PaulWay Blog

Thu 9th Oct, 2008

Perl threaded database query processing

In my work I've recently had to implement several pieces of code which follow this basic pattern:

  1. Retrieve data from the database
  2. Process data
  3. Store data somewhere.
Because of Perl DBI's habit (on the systems I've used) of grabbing all the data from the database into memory before actually giving it to the caller, and because that data can often get large enough to get my process swapping or killed, what this usually turns into is:

  1. Get a list of 'grouping' items (e.g. days, months, IP addresses, etc.)
  2. For each item in that group:
    1. Retrieve data from the database for that item.
    2. Process data
    3. Store data somewhere.
This runs into an unfortunate problem when the database server you're talking to takes a noticeable time to process your query - the whole thing slows down hugely. A typical slowdown I've seen is in the order of 500% - and both the database and the client processors are mostly idle during that time, as each query has to be individually fetched, processed, dumped back to the client, and then processed. It suffers the same problem if the time to process each group of data is significant - by the time you've got back to fetching the next group, the database has gone off and done other things and needs to get its disk heads back in the right place for your data.

These days we have processors capable of doing multiple things at the same time, and so it would be nice if the client could be processing rows at the same time as it's also requesting more data from the database. This is where Perl's threads and Thread::Queue libraries come in. It seems to me to be a generalisable task, so I'm sharing my first attempt at doing this in a generalisable way here. My main subroutine is:

######################################
sub Thread_Process {
######################################
    # We take one query which returns a list of items, a query which
    # returns other rows based on each of those items, and a function
    # which processes those rows.  We then run the processor function
    # in parallel to the fetching process to utilise the connection
    # to the database and keep the local processor active.
    # Requirements:
    # The item query must be executed and ready to return rows.  It
    #   can return any number of fields.
    # The rows query must be ready to be executed, and will be
    #   executed with the row_args_ref and then the items from each
    #   row in the item query in turn (as arrays).
    # The function takes as its last argument the Thread::Queue object
    #   that data will be passed through.  It must know exactly how
    #   many items it will take from each row, and that should match
    #   the number of items returned in the query.  For reasons as yet
    #   unclear, we can't pass references of any kind on the queue,
    #   so we pass the fields in each row as single scalars.  Any
    #   arguments that it needs should be given in fn_args_ref.  It
    #   should exit on receiving an undef.
    my ($items_qry, $rows_qry, $row_args_ref, $fn_ref, $fn_args_aref) = @_;
    my ($items_aref) = $items_qry->fetchall_arrayref;
    unless (ref $items_aref eq 'ARRAY') {
        carp "Warning: got no rows from item query\n";
        return 0;
    }
    
    my $queue = Thread::Queue->new();
    my $thread = threads->create($fn_ref, @$fn_args_aref, $queue);
    foreach my $item_aref (@$items_aref) {
        $rows_qry->execute(@$row_args_ref, @$item_aref);
        while (my $aref = $rows_qry->fetchrow_arrayref) {
            $queue->enqueue(@$aref);
        }
    }
    $queue->enqueue(undef);
    $thread->join();
    return scalar @$items_aref;
}
A sample caller function would be:

sub Send_mail_to_everyone {
    my ($mail_handler, $template, $start_date, $end_date);
    my $servers_qry = $dbh->prepare(
        'select distinct mail_server from addresses'
       .' where birth_date between ? and ? and current = true',
    );
    my $args_ref = [$start_date, $end_date];
    $servers->execute(@$args_ref);
    my $email_qry = $dbh->prepare(
        'select user_name, server, full_name, birth_date'
       .' from addresses'
       .' where birth_date between ? and ? and server = ?'
       .' and current = true'
    );
    my $mailer_sub = sub {
        my ($queue) = @_;
        while (defined my $user_name = $queue->dequeue) {
            my $server = $queue->dequeue;
            my $full_name = $queue->dequeue;
            my $birth_date = $queue->dequeue;
            my $email_body = sprintf $template
                , $username, $server, $full_name, $birth_date;
            $mail_handler->send("$user_name@$server", body => $email_body);
        }
    };
    # Here most of the work gets done.
    Thread_Process($servers_qry, $email_qry, $args_ref, $mailer_sub, []);
}
Of course, this is somewhat of a contrived example, and gives you little in the way of feedback or error handling. But it's an example of how to use the Thread_Process subroutine. The mailer subroutine gets its $mail_hander and $template from being within the Send_mail_to_everyone routine.

There are two problems I've discovered so far. The first is that trying to do any kind of database operation within the subroutine doesn't work, because the database handle needs to be cloned. On the systems I've tested this on, unfortunately, $dbh->clone seems to be a no-op and the DBI engine complains that a different thread is using the database handle. I've tried passing $dbh->clone to the handler function, and doing the clone inside the handler function, but they change nothing.

More annoying is the fact that the memory used by the process continues to rise even if the number of outstanding rows is constant or dropping. I haven't traced this down, and haven't really the time now, but it seems to be related to the Thread::Queue object - I've tested variations of my handler routine that reuse existing memory rather than doing undef @array and push @array, $data in the handler, and this changes little.

What I don't know yet is whether either to package this up as a Perl module and start being a maintainer or whether it's too trivial or not generalised enough to be useful for anyone but me.

Last updated: | path: tech / perl | permanent link to this entry


All posts licensed under the CC-BY-NC license. Author Paul Wayper.


Main index / tbfw/ - © 2004-2016 Paul Wayper
Valid HTML5 Valid CSS!