11from .utilities import logger
2- from io import StringIO
3- from pandas import DataFrame
4- from typing import Callable , List
52from sqlalchemy .schema import AddConstraint , DropConstraint
63from sqlalchemy .exc import SQLAlchemyError
7- from sqlalchemy .sql .schema import Table
8- from sqlalchemy .engine .base import Connection
94
105
116class BaseCopy (object ):
@@ -15,21 +10,26 @@ class BaseCopy(object):
1510
1611 def __init__ (
1712 self ,
18- defer_sql_objs : bool = False ,
19- conn : Connection = None ,
20- table_obj : Table = None ,
21- sql_table : str = None ,
22- csv_chunksize : int = 10 ** 6 ,
13+ defer_sql_objs = False ,
14+ conn = None ,
15+ table_obj = None ,
16+ sql_table = None ,
17+ csv_chunksize = 10 ** 6 ,
2318 ):
2419 """
2520 Parameters
2621 ----------
27- defer_sql_objs: multiprocessing has issue with passing SQLALchemy objects, so if
22+ defer_sql_objs: bool
23+ multiprocessing has issue with passing SQLALchemy objects, so if
2824 True, defer attributing these to the object until after pickled by Pool
29- conn: SQLAlchemy connection managed outside of the object
30- table_obj: SQLAlchemy object for the destination SQL Table
31- sql_table: string of SQL table name
32- csv_chunksize: max rows to keep in memory when generating CSV for COPY
25+ conn: SQLAlchemy Connection
26+ Managed outside of the object
27+ table_obj: SQLAlchemy Table
28+ Model object for the destination SQL Table
29+ sql_table: string
30+ SQL table name
31+ csv_chunksize: int
32+ Max rows to keep in memory when generating CSV for COPY
3333 """
3434
3535 self .rows = 0
@@ -47,8 +47,10 @@ def instantiate_sql_objs(self, conn, table_obj):
4747
4848 Parameters
4949 ----------
50- conn: SQLAlchemy connection managed outside of the object
51- table_obj: SQLAlchemy object for the destination SQL Table
50+ conn: SQLAlchemy Connection
51+ Managed outside of the object
52+ table_obj: SQLAlchemy Table
53+ Model object for the destination SQL Table
5254 """
5355 self .conn = conn
5456 self .table_obj = table_obj
@@ -61,71 +63,77 @@ def drop_pk(self):
6163 Drop primary key constraints on PostgreSQL table as well as CASCADE any other
6264 constraints that may rely on the PK
6365 """
64- logger .info (f "Dropping { self . sql_table } primary key" )
66+ logger .info ("Dropping {} primary key" . format ( self . sql_table ) )
6567 try :
6668 with self .conn .begin_nested ():
6769 self .conn .execute (DropConstraint (self .primary_key , cascade = True ))
6870 except SQLAlchemyError :
69- logger .info (f" { self . sql_table } primary key not found. Skipping" )
71+ logger .info ("{ } primary key not found. Skipping". format ( self . sql_table ) )
7072
7173 def create_pk (self ):
7274 """Create primary key constraints on PostgreSQL table"""
73- logger .info (f "Creating { self . sql_table } primary key" )
75+ logger .info ("Creating {} primary key" . format ( self . sql_table ) )
7476 self .conn .execute (AddConstraint (self .primary_key ))
7577
7678 def drop_fks (self ):
7779 """Drop foreign key constraints on PostgreSQL table"""
7880 for fk in self .foreign_keys :
79- logger .info (f "Dropping foreign key { fk .name } " )
81+ logger .info ("Dropping foreign key {}" . format ( fk .name ) )
8082 try :
8183 with self .conn .begin_nested ():
8284 self .conn .execute (DropConstraint (fk ))
8385 except SQLAlchemyError :
84- logger .warn (f "Foreign key { fk . name } not found" )
86+ logger .warn ("Foreign key {} not found" . format ( fk . name ) )
8587
8688 def create_fks (self ):
8789 """Create foreign key constraints on PostgreSQL table"""
8890 for fk in self .foreign_keys :
8991 try :
90- logger .info (f "Creating foreign key { fk .name } " )
92+ logger .info ("Creating foreign key {fk.name}" . format ( fk . name ) )
9193 self .conn .execute (AddConstraint (fk ))
9294 except SQLAlchemyError :
93- logger .warn (f "Error creating foreign key { fk .name } " )
95+ logger .warn ("Error creating foreign key {fk.name}" . format ( fk . name ) )
9496
9597 def truncate (self ):
9698 """TRUNCATE PostgreSQL table"""
97- logger .info (f "Truncating { self .sql_table } " )
98- self .conn .execute (f "TRUNCATE TABLE { self . sql_table } ;" )
99+ logger .info ("Truncating {}" . format ( self .sql_table ) )
100+ self .conn .execute ("TRUNCATE TABLE {};" . format ( self . sql_table ) )
99101
100102 def analyze (self ):
101103 """Run ANALYZE on PostgreSQL table"""
102- logger .info (f "Analyzing { self .sql_table } " )
103- self .conn .execute (f "ANALYZE { self . sql_table } ;" )
104+ logger .info ("Analyzing {}" . format ( self .sql_table ) )
105+ self .conn .execute ("ANALYZE {};" . format ( self . sql_table ) )
104106
105- def copy_from_file (self , file_object : StringIO ):
107+ def copy_from_file (self , file_object ):
106108 """
107109 COPY to PostgreSQL table using StringIO CSV object
108110
109111 Parameters
110112 ----------
111- file_object: CSV formatted data to COPY from DataFrame to PostgreSQL
113+ file_object: StringIO
114+ CSV formatted data to COPY from DataFrame to PostgreSQL
112115 """
113116 cur = self .conn .connection .cursor ()
114117 file_object .seek (0 )
115118 columns = file_object .readline ()
116- sql = f"COPY { self .sql_table } ({ columns } ) FROM STDIN WITH CSV FREEZE"
119+ sql = "COPY {table} ({columns}) FROM STDIN WITH CSV FREEZE" .format (
120+ table = self .sql_table , columns = columns
121+ )
117122 cur .copy_expert (sql = sql , file = file_object )
118123
119- def data_formatting (self , df : DataFrame , functions : List [ Callable ] = [], ** kwargs ):
124+ def data_formatting (self , df , functions = [], ** kwargs ):
120125 """
121126 Call each function in the functions list arg on the DataFrame and return
122127
123128 Parameters
124129 ----------
125- df: dataframe to format
126- functions: list of functions to apply to df. each gets passed df, self as
127- copy_obj, and all kwargs passed to data_formatting
128- **kwargs: kwargs to pass on to each function
130+ df: pandas DataFrame
131+ dataframe to format
132+ functions: list of functions
133+ Functions to apply to df. each gets passed df, self as copy_obj, and all
134+ kwargs passed to data_formatting
135+ **kwargs
136+ kwargs to pass on to each function
129137 """
130138 for f in functions :
131139 df = f (df , copy_obj = self , ** kwargs )
0 commit comments