|
1 | 1 | from multiprocessing import Pool |
2 | | -from .copy_hdf import HDFTableCopy, HDFMetadata |
| 2 | + |
| 3 | +from sqlalchemy import MetaData, create_engine |
| 4 | + |
| 5 | +from .copy_hdf import HDFTableCopy |
| 6 | +from .utilities import HDFMetadata |
3 | 7 |
|
4 | 8 |
|
5 | 9 | def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6): |
@@ -34,91 +38,95 @@ def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6): |
34 | 38 | return tables |
35 | 39 |
|
36 | 40 |
|
37 | | -def _copy_worker(copy_obj, defer_sql_objs=True): |
38 | | - """ |
39 | | - Handle a SQLAlchemy connection and copy using HDFTableCopy object |
| 41 | +def _copy_worker(copy_obj, engine_args, engine_kwargs, maintenance_work_mem="1G"): |
| 42 | + |
| 43 | + # Since we fork()ed into a new process, the engine contains process |
| 44 | + # specific stuff that shouldn't be shared - this creates a fresh Engine |
| 45 | + # with the same settings but without those. |
| 46 | + |
| 47 | + engine = create_engine(*engine_args, **engine_kwargs) |
| 48 | + metadata = MetaData(bind=engine) |
| 49 | + metadata.reflect() |
| 50 | + |
| 51 | + with engine.connect() as conn: |
40 | 52 |
|
41 | | - copy_obj: HDFTableCopy or subclass |
42 | | - Object to use to run the copy() method on |
43 | | - defer_sql_objs: bool |
44 | | - If True, SQL objects were not build upon instantiation of copy_obj and should |
45 | | - be built before copying data to db (needed for multiprocessing) |
46 | | - """ |
47 | | - database.engine.dispose() |
48 | | - with database.engine.connect() as conn: |
49 | 53 | conn.execution_options(autocommit=True) |
50 | | - conn.execute("SET maintenance_work_mem TO 1000000;") |
51 | 54 |
|
52 | | - if defer_sql_objs: |
53 | | - table_obj = database.metadata.tables[copy_obj.sql_table] |
54 | | - copy_obj.instantiate_sql_objs(conn, table_obj) |
| 55 | + if maintenance_work_mem is not None: |
| 56 | + conn.execute("SET maintenance_work_mem TO {};".format(maintenance_work_mem)) |
| 57 | + |
| 58 | + # Get SQLAlchemy Table object |
| 59 | + table_obj = metadata.tables.get(copy_obj.sql_table, None) |
| 60 | + if table_obj is None: |
| 61 | + raise ValueError("Table {} does not exist.".format(copy_obj.sql_table)) |
| 62 | + |
| 63 | + copy_obj.instantiate_sql_objs(conn, table_obj) |
55 | 64 |
|
| 65 | + # Run the task |
56 | 66 | copy_obj.copy() |
57 | 67 |
|
58 | 68 |
|
59 | | -def hdf_to_postgres(file_name, db, keys=[], csv_chunksize=10 ** 6): |
| 69 | +def hdf_to_postgres(file_name, engine_args, engine_kwargs={}, keys=[], |
| 70 | + csv_chunksize=10 ** 6, processes=None, |
| 71 | + maintenance_work_mem=None): |
60 | 72 | """ |
61 | 73 | Copy tables in a HDF file to PostgreSQL database |
62 | 74 |
|
63 | 75 | Parameters |
64 | 76 | ---------- |
65 | 77 | file_name: str |
66 | 78 | name of file or path to file of HDF to use to copy |
67 | | - db: SQLAlchemy database object |
68 | | - destination database |
| 79 | + engine_args: list |
| 80 | + arguments to pass into create_engine() |
| 81 | + engine_kwargs: dict |
| 82 | + keyword arguments to pass into create_engine() |
69 | 83 | keys: list of strings |
70 | 84 | HDF keys to copy |
71 | 85 | csv_chunksize: int |
72 | 86 | Maximum number of StringIO CSV rows to keep in memory at a time |
| 87 | + processes: int or None |
| 88 | + If None, run single threaded. If integer, number of processes in the |
| 89 | + multiprocessing Pool |
| 90 | + maintenance_work_mem: str or None |
| 91 | + What to set postgresql's maintenance_work_mem option to: this helps |
| 92 | + when rebuilding large indexes, etc. |
73 | 93 | """ |
74 | 94 |
|
75 | | - global database |
76 | | - database = db |
77 | | - |
78 | 95 | hdf = HDFMetadata( |
79 | 96 | file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"] |
80 | 97 | ) |
81 | 98 |
|
82 | 99 | tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize) |
83 | 100 |
|
84 | | - for table in tables: |
85 | | - _copy_worker(table, defer_sql_objs=True) |
| 101 | + if processes is None: |
86 | 102 |
|
| 103 | + # Single-threaded run |
| 104 | + for table in tables: |
| 105 | + _copy_worker(table, engine_args, engine_kwargs, maintenance_work_mem) |
87 | 106 |
|
88 | | -def multiprocess_hdf_to_postgres( |
89 | | - file_name, db, keys=[], processes=4, csv_chunksize=10 ** 6 |
90 | | -): |
91 | | - """ |
92 | | - Copy tables in a HDF file to PostgreSQL database using a multiprocessing Pool |
| 107 | + elif type(processes) is int: |
93 | 108 |
|
94 | | - Parameters |
95 | | - ---------- |
96 | | - file_name: str |
97 | | - Name of file or path to file of HDF to use to copy |
98 | | - db: SQLAlchemy object |
99 | | - Destination database |
100 | | - keys: list of strings |
101 | | - HDF keys to copy |
102 | | - processes: int |
103 | | - Number of processes in the Pool |
104 | | - csv_chunksize: int |
105 | | - Maximum number of StringIO CSV rows to keep in memory at a time |
106 | | - """ |
| 109 | + args = zip( |
| 110 | + tables, |
| 111 | + [engine_args] * len(tables), |
| 112 | + [engine_kwargs] * len(tables), |
| 113 | + [maintenance_work_mem] * len(tables) |
| 114 | + ) |
107 | 115 |
|
108 | | - global database |
109 | | - database = db |
| 116 | + try: |
| 117 | + p = Pool(processes) |
| 118 | + result = p.starmap_async(_copy_worker, args, chunksize=1) |
110 | 119 |
|
111 | | - hdf = HDFMetadata( |
112 | | - file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"] |
113 | | - ) |
| 120 | + finally: |
| 121 | + del tables |
| 122 | + del hdf |
| 123 | + p.close() |
| 124 | + p.join() |
114 | 125 |
|
115 | | - tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize) |
| 126 | + if not result.successful(): |
| 127 | + # If there's an exception, throw it, but we don't care about the |
| 128 | + # results |
| 129 | + result.get() |
116 | 130 |
|
117 | | - try: |
118 | | - p = Pool(processes) |
119 | | - p.map(_copy_worker, tables, chunksize=1) |
120 | | - finally: |
121 | | - del tables |
122 | | - del hdf |
123 | | - p.close() |
124 | | - p.join() |
| 131 | + else: |
| 132 | + raise ValueError("processes should be int or None.") |
0 commit comments