concurrent data pull

concurrent data pull

Sometimes you need to pull data concurrently from a database if the database is throttled or there are other parts of the system that constraining extract performance.

For example, if you need to pull multiple partitions to upload to the cloud but the on-premise system is slow on the extract, concurrent extracts may be helpful. The below python script uses the multiprocessing module (vs threading) to handle multiple extracts at once based on a partition key you identify in the table to be extracted. The partition value should be a string or number, or anything that can be converted to a string that can be used in a filename. The script is not sophisticated and does not really support restart well but it does have some support for restart in case your extract is interrupted.

Enhance as you see fit!

#!/usr/bin/env python
import os
import argparse
import datetime
from sqlalchemy import create_engine
from multiprocessing import Pool
import csvkit.utilities.sql2csv as c
import sys
import agate

# Use multiprocessing so we can run sql2csv uniquely. Use a separate process
# vs a thread, we could just use a thread of course.

class MySQL2CSV():
    # Run the extract. Create its own db engine. Return row count output.
    def run(self, connection_string, query, output_file):
        counter = 0
        engine = create_engine(connection_string)
        with engine.connect() as connection, open(output_file, 'w') as f:
            rows = connection.execution_options(no_parameters=True).execute(query)
            output = agate.csv.writer(f, {
                # extra options here...
            if rows.returns_rows:
                for row in rows:
                    counter +=1 
        return counter

def make_output_file(base, value):
    return "{}.{}.csv".format(base, value)

def make_completed_file(base):
    return "{}.completed.csv".format(base)

# assumes completed is in column 0, returns list of completed partition keys
def get_completed_list(base):
    filename = make_completed_file(base)
    completed = []
    if os.path.exists(filename):
        with open(filename, "r") as f:
            csvin = agate.csv.reader(f, {})
            for row in csvin:
    return completed

# Get enough parameters to run independently as a process
# args tuple: 0: partition value, 1: connstr, 2: query, 3: base output name
# output tuple: 0: partition value, 1; # rows output, 2: message, 3: output filename, 4: success 
def run_job(args):
        output_file = make_output_file(args[3], args[0])
        if os.path.exists(output_file):
            return (args[0], 0, "Skipped, output {} already exists.".format(output_file), output_file, True)
            rows = MySQL2CSV().run(args[1], args[2].format(args[0]), output_file)
            return (args[0], rows, "Processed.", output_file, True)
    except Exception as ex:
        return (args[0], 0, "Error during processing", output_file, False)    

# Run extract queries in parallel using multi-processing.
# This means we rely on OS level multi-tasking and n <> # processors since
# extracts are IO bound not CPU bound.
def run_extracts(values, connstr, partition_query, query, n, base):
    completed_filename = make_completed_file(base)
    completed_pre_existed = os.path.exists(completed_filename)
    completed_already = get_completed_list(base)
    print("# completed partition values: {}".format(len(completed_already)))
    with Pool(processes=n) as pool, open(completed_filename, "w+") as completed_file:
        counter = 0
        row_count_cum = 0
        completed_file_csv = agate.csv.writer(completed_file, {})
        if not completed_pre_existed:
                completed_file_csv.writerow(["partition_value", "count", "output_filename"])
        # this forces the submission immediately, but max_workers controls concurrency factor, whew...
        for result in pool.imap_unordered(run_job, [(key, connstr, query, base) for key in values if key not in completed_already]):
            counter = counter + 1
            row_count_cum = row_count_cum + result[1]
            print("Job result {}: {}. Cumulative: {}".format(counter, result, row_count_cum))
            if result[4]:
                completed_file_csv.writerow([result[0], result[1], result[3]])


# create engine and get partitioning values
def get_partition_values(connstr, query):
    engine = create_engine(connstr)
    with engine.connect() as con:
        results = con.execute(query)
        # assumes key is in the first column
        return [x[0] for x in results]
    return []

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--partition-query", help="Query to return partition values. First column in each row is assumed to be the partitioning key value.", type=str)
    parser.add_argument("--query", help="Query base to use for extracting data. Where clause will be added automatically based on partitioning key. {} should be used to place the partitioning value.", type=str)
    parser.add_argument("--db", help="Sql Alchemy style connections string for both sql2csv and obtaining partitioning keys", type=str)
    parser.add_argument("--concurrency", help="Number of simultaneous jobs to run. Default is 1.", type=int, default=1)
    parser.add_argument("--output-file-base", help="Output file base name. The partition value is appended.", type=str, default="output")
    args = parser.parse_args()
    start =

    # do the extract
    values = get_partition_values(args.db, args.partition_query)
    print("# partition values: {}".format(len(values)))
    run_extracts(values, args.db, args.partition_query, args.query, args.concurrency, args.output_file_base)

    stop =
    delta = stop - start
    print("Run time: {} min".format(delta.seconds/60))

Feel free to copy this script and enhance it as needed. I’ll keep adding to it as needed.


Popular posts from this blog

zio layers and framework integration

typescript and react types

dotty+scala.js+async: interesting options