[PyGreSQL] DB API-2.0 + copy from stdin

Christopher Sean Hilton chris at vindaloo.com
Tue Sep 23 16:08:57 EDT 2008


Here's the first cut at my solution. The example code is at the
bottom. It takes a table of tuples and uses COPY table FROM stdin in
to add them to a new table and then uses a simple INSERT statement to
append the new rows into the existing author table. The function
parse_connection_args is just the beginning of the pgdb.connect()

-- Chris

----- Example --------------------------------------------------------------

#! /usr/bin/env python

import pg, pgdb

class PreConnectedDB(pg.DB):

    """A way to get a pg.DB object around a connection established from the pgdb.connect() method."""

    def __init__(self, cnx):
        self.db = cnx
        self.dbname = self.db.db
        self.__attnames = {}
        self.__pkeys = {}
        ## self.__args = args, kw
        self.debug = None     # For debugging scripts, this can be set
                              # * to a string format specification (e.g. in a CGI set to "%s<BR>"),
                              # * to a function which takes a string argument or
                              # * to a file object to write debug statements to.

    def close(self):
        """Don't allow the subconnection to close the connection to the database."""

        raise pg.InternalError, "Subconnection is not allowed to close the database."
    
class pgdbCnxWithClassicDb(pgdb.pgdbCnx):

    def __init__(self, cnx):
        self.classicDb = cnx
        pgdb.pgdbCnx.__init__(self, cnx)

    def get_classic_db(self):
        print self.__dict__.keys()
        return PreConnectedDB(self.classicDb)

def parse_connection_args(dsn = None,
                          user = None, password = None,
                          host = None, database = None):

    # first get params from DSN
    dbport = -1
    dbhost = ""
    dbbase = ""
    dbuser = ""
    dbpasswd = ""
    dbopt = ""
    dbtty = ""
    try:
        params = string.split(dsn, ":")
        dbhost = params[0]
        dbbase = params[1]
        dbuser = params[2]
        dbpasswd = params[3]
        dbopt = params[4]
        dbtty = params[5]
    except:
        pass
    
    # override if necessary
    if user != None:
        dbuser = user

    if password != None:
        dbpasswd = password

    if database != None:
        dbbase = database

    if host != None:
        try:
            params = string.split(host, ":")
            dbhost = params[0]
            dbport = int(params[1])
        except:
            pass

    # empty host is localhost
    if dbhost == "":
        dbhost = None

    if dbuser == "":
        dbuser = None

    return (dbbase, dbhost, dbport, dbopt, dbtty, dbuser, dbpasswd)

def connect(*args, **kwargs):

    dbbase, dbhost, dbport, dbopt, dbtty, dbuser, dbpasswd = parse_connection_args(*args, **kwargs)

    # open the connection
        
    cnx = pg.connect(dbbase, dbhost, dbport, dbopt, dbtty, dbuser, dbpasswd)
    return pgdbCnxWithClassicDb(cnx)

if __name__ == '__main__':
    data = ( ('ernest', 'ernest.hemingway at example.com'),
             ('richard', 'richard.stevens at example.com'),
             ('gwright', 'gary.wright at example.com'),
             ('lniven', 'larry.niven at example.com'),
             ('jerry', 'jerry.pournell at example.com'))
             
    dbiDb = connect(database = 'message_board')

    c = dbiDb.cursor()
    c.execute("CREATE TEMPORARY TABLE new_author AS "
              + "SELECT author_nickname, author_email FROM author LIMIT 0")

    classicDb = dbiDb.get_classic_db()

    classicDb.query('COPY new_author FROM stdin')
    for row in data:
        l = ("\t".join(row)) + "\n"
        classicDb.putline(l)

    classicDb.putline('\\.\n')
    classicDb.endcopy()

    del classicDb

    c.execute("INSERT INTO author (author_nickname, author_email) "
              +"SELECT author_nickname, author_email FROM new_author")

    dbiDb.commit()
    dbiDb.close()

----- End example ----------------------------------------------------------
    

-- 
Chris Hilton                                   chris-at-vindaloo-dot-com
------------------------------------------------------------------------
                "All I was doing was trying to get home from work!"
                                                 -- Rosa Parks


More information about the PyGreSQL mailing list