diff --git a/druzhba/main.py b/druzhba/main.py index 3ddcdef..d2f8e42 100644 --- a/druzhba/main.py +++ b/druzhba/main.py @@ -18,7 +18,7 @@ create_extract_monitor_table, create_index_table, create_load_monitor_table, - init_redshift + init_redshift, ) from druzhba.table import ( ConfigurationError, @@ -36,6 +36,150 @@ VALIDATE_ONLY = None +def process_table( + table_yaml, + index_schema, + index_table, + db: DatabaseConfig, + db_alias, + full_refresh=None, + rebuild=None, + monitor_tables_config=None, +): + source_table_name = table_yaml["source_table_name"] + + try: + TableConfig.validate_yaml_configuration(table_yaml) + logger.info("Validated: %s / %s", db.database_alias, source_table_name) + except ConfigurationError as e: + logger.error(str(e)) + return source_table_name + + if VALIDATE_ONLY: + return + + table_params = copy.deepcopy(table_yaml) + if rebuild: + table_params["rebuild"] = True + table_params["full_refresh"] = True + elif full_refresh: + table_params["full_refresh"] = True + table = db.get_table_config( + table_params, + index_schema=index_schema, + index_table=index_table, + monitor_tables_config=monitor_tables_config, + ) + table.validate_runtime_configuration() + + if COMPILE_ONLY: + print("---------------------------------------------") + print(table.get_query_sql()) + print("---------------------------------------------\n\n\n") + return + + if PRINT_SQL_ONLY: + select_query = table.get_query_sql() + # Create statement introspects the source DB for a schema + create_statement = table.query_to_redshift_create_table( + select_query, table.destination_table_name + ) + + print("---------------------------------------------") + print(create_statement) + print("---------------------------------------------") + print(select_query) + print("---------------------------------------------\n\n\n") + return + + retries_remaining = 5 + table_complete = False + while not table_complete and retries_remaining > 0: + try: + with monitor.wrap("create-redshift-table", db_alias=db_alias): + table.check_destination_table_status() + + with monitor.wrap( + "extract-table", db_alias=db_alias, table=table.source_table_name + ): + table.extract() + + with monitor.wrap( + "load-table", db_alias=db_alias, table=table.source_table_name + ): + table.load() + + table_complete = True + + except (InvalidSchemaError, MigrationError): + logger.exception( + "Error preparing target table %s.%s", + table.destination_schema_name, + table.destination_table_name, + ) + table_complete = True + + except ( + ValueError, + db.db_errors.InternalError, + db.db_errors.IntegrityError, + db.db_errors.ProgrammingError, + psycopg2.InternalError, + psycopg2.IntegrityError, + psycopg2.ProgrammingError, + psycopg2.extensions.TransactionRollbackError, + psycopg2.errors.FeatureNotSupported, # pylint: disable=no-member + ) as e: + logger.warning( + "Unexpected error processing %s table %s: ```%s\n\n%s```", + table.database_alias, + table.source_table_name, + e, + "".join(traceback.format_exc()), + ) + logger.info("Continuing") + table_complete = True + + except ( + SSLError, + db.db_errors.OperationalError, + db.db_errors.DatabaseError, + ) as e: + retries_remaining -= 1 + if retries_remaining > 0: + logger.info( + "Disconnected while processing %s table %s with error... Retrying.", + table.database_alias, + table.source_table_name, + ) + logger.info(e) + monitor.record_error("disconnect-error", db_alias=db_alias) + time.sleep((5.0 - retries_remaining) ** 2) + else: + logger.error( + "Error processing %s table %s and out of retries: ```%s\n\n%s```", + table.database_alias, + table.source_table_name, + e, + "".join(traceback.format_exc()), + ) + raise + + except (psycopg2.extensions.QueryCanceledError, Exception) as e: + logger.error( + "Unexpected error processing %s table %s", + table.database_alias, + table.source_table_name, + ) + raise + + logger.info( + "Done with %s table %s", + table.database_alias, + table.source_table_name, + ) + + def process_database( index_schema, index_table, @@ -47,6 +191,8 @@ def process_database( full_refresh=None, rebuild=None, monitor_tables_config=None, + num_processes=cpu_count(), + parallelize="database", ): logger.info("Beginning database %s", db_alias) try: @@ -62,6 +208,8 @@ def process_database( full_refresh, rebuild, monitor_tables_config, + num_processes, + parallelize, ) logger.info("Done with database %s", db_alias) except Exception as e: @@ -80,6 +228,8 @@ def _process_database( full_refresh=None, rebuild=None, monitor_tables_config=None, + num_processes=cpu_count(), + parallelize="database", ): db, dbconfig = set_up_database( db_alias, @@ -98,8 +248,8 @@ def _process_database( ) invalids = [] - retries_remaining = 5 + tables_to_process = [] for table_yaml in tables_yaml: if not only_table_names and not table_yaml.get("enabled", True): continue @@ -108,135 +258,46 @@ def _process_database( if only_table_names and source_table_name not in only_table_names: continue - try: - TableConfig.validate_yaml_configuration(table_yaml) - logger.info("Validated: %s / %s", db.database_alias, source_table_name) - except ConfigurationError as e: - logger.error(str(e)) - invalids.append(source_table_name) - continue - - if VALIDATE_ONLY: - continue - - table_params = copy.deepcopy(table_yaml) - if rebuild: - table_params["rebuild"] = True - table_params["full_refresh"] = True - elif full_refresh: - table_params["full_refresh"] = True - table = db.get_table_config( - table_params, index_schema=index_schema, index_table=index_table, - monitor_tables_config=monitor_tables_config, - ) - table.validate_runtime_configuration() - - if COMPILE_ONLY: - print("---------------------------------------------") - print(table.get_query_sql()) - print("---------------------------------------------\n\n\n") - continue + tables_to_process.append(table_yaml) - if PRINT_SQL_ONLY: - select_query = table.get_query_sql() - # Create statement introspects the source DB for a schema - create_statement = table.query_to_redshift_create_table( - select_query, table.destination_table_name + if num_processes == 1 or parallelize == "database": + invalids = [ + process_table( + table_yaml, + index_schema, + index_table, + db, + db_alias, + full_refresh, + rebuild, + monitor_tables_config, ) - - print("---------------------------------------------") - print(create_statement) - print("---------------------------------------------") - print(select_query) - print("---------------------------------------------\n\n\n") - continue - - advance_to_next_table = False - while not advance_to_next_table and retries_remaining > 0: - try: - with monitor.wrap("create-redshift-table", db_alias=db_alias): - table.check_destination_table_status() - - with monitor.wrap( - "extract-table", db_alias=db_alias, table=table.source_table_name - ): - table.extract() - - with monitor.wrap( - "load-table", db_alias=db_alias, table=table.source_table_name - ): - table.load() - - advance_to_next_table = True - - except (InvalidSchemaError, MigrationError): - logger.exception( - "Error preparing target table %s.%s", - table.destination_schema_name, - table.destination_table_name, - ) - advance_to_next_table = True - - except ( - ValueError, - db.db_errors.InternalError, - db.db_errors.IntegrityError, - db.db_errors.ProgrammingError, - psycopg2.InternalError, - psycopg2.IntegrityError, - psycopg2.ProgrammingError, - psycopg2.extensions.TransactionRollbackError, - psycopg2.errors.FeatureNotSupported, # pylint: disable=no-member - ) as e: - logger.warning( - "Unexpected error processing %s table %s: ```%s\n\n%s```", - table.database_alias, - table.source_table_name, - e, - "".join(traceback.format_exc()), - ) - logger.info("Continuing") - advance_to_next_table = True - - except ( - SSLError, - db.db_errors.OperationalError, - db.db_errors.DatabaseError, - ) as e: - retries_remaining -= 1 - if retries_remaining > 0: - logger.info( - "Disconnected while processing %s table %s with error... Retrying.", - table.database_alias, - table.source_table_name, - ) - logger.info(e) - monitor.record_error("disconnect-error", db_alias=db_alias) - time.sleep((5.0 - retries_remaining) ** 2) - else: - logger.error( - "Error processing %s table %s and out of retries: ```%s\n\n%s```", - table.database_alias, - table.source_table_name, - e, - "".join(traceback.format_exc()), - ) - raise - - except (psycopg2.extensions.QueryCanceledError, Exception) as e: - logger.error( - "Unexpected error processing %s table %s", - table.database_alias, - table.source_table_name, - ) - raise - - logger.info( - "Done with %s table %s", - table.database_alias, - table.source_table_name, + for table_yaml in tables_to_process + ] + else: + # Preload _strptime to avoid a threading bug in cpython + # See: https://mail.python.org/pipermail/python-list/2015-October/697689.html + _ = datetime.datetime.strptime("2018-01-01 01:02:03", "%Y-%m-%d %H:%M:%S") + with Pool(num_processes) as pool: + invalids = pool.map_async( + lambda table_yaml: process_table( + table_yaml, + index_schema, + index_table, + db, + db_alias, + full_refresh, + rebuild, + monitor_tables_config, + ), + tables_to_process, ) + invalids.wait() + if not invalids.successful(): + # Don't need to relog on failure, the process already logged + sys.exit(2) + if len(invalids) > 0: raise RuntimeError( "Had invalid table configurations in {}: \n{}".format( @@ -309,6 +370,8 @@ def run(args): args.num_processes or "unspecified", args.num_processes or cpu_count(), ) + if not args.num_processes: + args.num_processes = cpu_count() global COMPILE_ONLY COMPILE_ONLY = args.compile_only @@ -353,6 +416,8 @@ def run(args): args.full_refresh, args.rebuild, monitor_tables_config, + args.num_processes, + args.parallelize, ) for db in destination_config["sources"] if db["alias"] == args.database @@ -373,12 +438,14 @@ def run(args): None, None, monitor_tables_config, + args.num_processes, + args.parallelize, ) for db in destination_config["sources"] if db.get("enabled", True) ] - if args.num_processes == 1: + if args.num_processes == 1 or args.parallelize == "table": for db in dbs: process_database(*db) else: @@ -426,6 +493,17 @@ def _get_parser(): "\nDefaults to number of CPUs (cores) available.", type=int, ) + parser.add_argument( + "-p", + "--parallelize", + help="What level to apply paralell processing. Either 'database' or 'table'." + "\n'database' will spawn multiple procesess per database configured." + "\n'table' will spawn multiple procesess per table configured within a database." + "\nDefaults to 'database'", + type=str, + choices=["table", "database"], + default="database", + ) parser.add_argument( "-co", "--compile-only",