diff --git a/mergin/client_pull.py b/mergin/client_pull.py index a46e6f96..c3eb422f 100644 --- a/mergin/client_pull.py +++ b/mergin/client_pull.py @@ -44,7 +44,15 @@ class DownloadJob: """ def __init__( - self, project_path, total_size, version, update_tasks, download_queue_items, directory, mp, project_info + self, + project_path, + total_size, + version, + update_tasks, + download_queue_items, + tmp_dir: tempfile.TemporaryDirectory, + mp, + project_info, ): self.project_path = project_path self.total_size = total_size # size of data to download (in bytes) @@ -52,7 +60,7 @@ def __init__( self.version = version self.update_tasks = update_tasks self.download_queue_items = download_queue_items - self.directory = directory # project's directory + self.tmp_dir = tmp_dir self.mp = mp # MerginProject instance self.is_cancelled = False self.project_info = project_info # parsed JSON with project info returned from the server @@ -96,7 +104,7 @@ def _do_download(item, mc, mp, project_path, job): job.transferred_size += item.size -def _cleanup_failed_download(directory, mergin_project=None): +def _cleanup_failed_download(mergin_project: MerginProject = None): """ If a download job fails, there will be the newly created directory left behind with some temporary files in it. We want to remove it because a new download would fail because @@ -109,7 +117,7 @@ def _cleanup_failed_download(directory, mergin_project=None): mergin_project.remove_logging_handler() # keep log file as it might contain useful debug info - log_file = os.path.join(directory, ".mergin", "client-log.txt") + log_file = os.path.join(mergin_project.dir, ".mergin", "client-log.txt") dest_path = None if os.path.exists(log_file): @@ -118,7 +126,6 @@ def _cleanup_failed_download(directory, mergin_project=None): dest_path = tmp_file.name shutil.copyfile(log_file, dest_path) - shutil.rmtree(directory) return dest_path @@ -138,6 +145,8 @@ def download_project_async(mc, project_path, directory, project_version=None): mp.log.info("--- version: " + mc.user_agent_info()) mp.log.info(f"--- start download {project_path}") + tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-", ignore_cleanup_errors=True, delete=True) + try: # check whether we download the latest version or not latest_proj_info = mc.project_info(project_path) @@ -147,7 +156,7 @@ def download_project_async(mc, project_path, directory, project_version=None): project_info = latest_proj_info except ClientError: - _cleanup_failed_download(directory, mp) + _cleanup_failed_download(mp) raise version = project_info["version"] if project_info["version"] else "v0" @@ -158,7 +167,7 @@ def download_project_async(mc, project_path, directory, project_version=None): update_tasks = [] # stuff to do at the end of download for file in project_info["files"]: file["version"] = version - items = _download_items(file, directory) + items = _download_items(file, tmp_dir.name) is_latest_version = project_version == latest_proj_info["version"] update_tasks.append(UpdateTask(file["path"], items, latest_version=is_latest_version)) @@ -172,7 +181,7 @@ def download_project_async(mc, project_path, directory, project_version=None): mp.log.info(f"will download {len(update_tasks)} files in {len(download_list)} chunks, total size {total_size}") - job = DownloadJob(project_path, total_size, version, update_tasks, download_list, directory, mp, project_info) + job = DownloadJob(project_path, total_size, version, update_tasks, download_list, tmp_dir, mp, project_info) # start download job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) @@ -203,7 +212,7 @@ def download_project_is_running(job): traceback_lines = traceback.format_exception(type(exc), exc, exc.__traceback__) job.mp.log.error("Error while downloading project: " + "".join(traceback_lines)) job.mp.log.info("--- download aborted") - job.failure_log_file = _cleanup_failed_download(job.directory, job.mp) + job.failure_log_file = _cleanup_failed_download(job.mp) raise future.exception() if future.running(): return True @@ -229,18 +238,20 @@ def download_project_finalize(job): traceback_lines = traceback.format_exception(type(exc), exc, exc.__traceback__) job.mp.log.error("Error while downloading project: " + "".join(traceback_lines)) job.mp.log.info("--- download aborted") - job.failure_log_file = _cleanup_failed_download(job.directory, job.mp) + job.failure_log_file = _cleanup_failed_download(job.mp) raise future.exception() job.mp.log.info("--- download finished") for task in job.update_tasks: # right now only copy tasks... - task.apply(job.directory, job.mp) + task.apply(job.mp.dir, job.mp) # final update of project metadata job.mp.update_metadata(job.project_info) + job.tmp_dir.cleanup() + def download_project_cancel(job): """ @@ -336,7 +347,7 @@ def __init__( version, files_to_merge, download_queue_items, - temp_dir, + tmp_dir, mp, project_info, basefiles_to_patch, @@ -351,7 +362,7 @@ def __init__( self.version = version self.files_to_merge = files_to_merge # list of FileToMerge instances self.download_queue_items = download_queue_items - self.temp_dir = temp_dir # full path to temporary directory where we store downloaded files + self.tmp_dir = tmp_dir # TemporaryDirectory instance where we store downloaded files self.mp = mp # MerginProject instance self.is_cancelled = False self.project_info = project_info # parsed JSON with project info returned from the server @@ -413,8 +424,7 @@ def pull_project_async(mc, directory): # then we just download the whole file _pulling_file_with_diffs = lambda f: "diffs" in f and len(f["diffs"]) != 0 - temp_dir = mp.fpath_meta(f"fetch_{local_version}-{server_version}") - os.makedirs(temp_dir, exist_ok=True) + tmp_dir = tempfile.TemporaryDirectory(prefix="mm-pull-", ignore_cleanup_errors=True, delete=True) pull_changes = mp.get_pull_changes(server_info["files"]) mp.log.debug("pull changes:\n" + pprint.pformat(pull_changes)) fetch_files = [] @@ -441,10 +451,10 @@ def pull_project_async(mc, directory): for file in fetch_files: diff_only = _pulling_file_with_diffs(file) - items = _download_items(file, temp_dir, diff_only) + items = _download_items(file, tmp_dir.name, diff_only) # figure out destination path for the file - file_dir = os.path.dirname(os.path.normpath(os.path.join(temp_dir, file["path"]))) + file_dir = os.path.dirname(os.path.normpath(os.path.join(tmp_dir.name, file["path"]))) basename = os.path.basename(file["diff"]["path"]) if diff_only else os.path.basename(file["path"]) dest_file_path = os.path.join(file_dir, basename) os.makedirs(file_dir, exist_ok=True) @@ -465,8 +475,8 @@ def pull_project_async(mc, directory): file_path = file["path"] mp.log.info(f"missing base file for {file_path} -> going to download it (version {server_version})") file["version"] = server_version - items = _download_items(file, temp_dir, diff_only=False) - dest_file_path = mp.fpath(file["path"], temp_dir) + items = _download_items(file, tmp_dir.name, diff_only=False) + dest_file_path = mp.fpath(file["path"], tmp_dir.name) # dest_file_path = os.path.join(os.path.dirname(os.path.normpath(os.path.join(temp_dir, file['path']))), os.path.basename(file['path'])) files_to_merge.append(FileToMerge(dest_file_path, items)) continue @@ -490,7 +500,7 @@ def pull_project_async(mc, directory): server_version, files_to_merge, download_list, - temp_dir, + tmp_dir, mp, server_info, basefiles_to_patch, @@ -604,10 +614,10 @@ def pull_project_finalize(job: PullJob): # download their full versions so we have them up-to-date for applying changes for file_path, file_diffs in job.basefiles_to_patch: basefile = job.mp.fpath_meta(file_path) - server_file = job.mp.fpath(file_path, job.temp_dir) + server_file = job.mp.fpath(file_path, job.tmp_dir.name) shutil.copy(basefile, server_file) - diffs = [job.mp.fpath(f, job.temp_dir) for f in file_diffs] + diffs = [job.mp.fpath(f, job.tmp_dir.name) for f in file_diffs] patch_error = job.mp.apply_diffs(server_file, diffs) if patch_error: # that's weird that we are unable to apply diffs to the basefile! @@ -623,7 +633,7 @@ def pull_project_finalize(job: PullJob): raise ClientError("Cannot patch basefile {}! Please try syncing again.".format(basefile)) try: - conflicts = job.mp.apply_pull_changes(job.pull_changes, job.temp_dir, job.project_info, job.mc) + conflicts = job.mp.apply_pull_changes(job.pull_changes, job.tmp_dir.name, job.project_info, job.mc) except Exception as e: job.mp.log.error("Failed to apply pull changes: " + str(e)) job.mp.log.info("--- pull aborted") @@ -636,7 +646,7 @@ def pull_project_finalize(job: PullJob): else: job.mp.log.info("--- pull finished -- at version " + job.mp.version()) - shutil.rmtree(job.temp_dir) + job.tmp_dir.cleanup() # delete our temporary dir and all its content return conflicts @@ -788,7 +798,7 @@ def download_files_async( mp.log.info(f"Got project info. version {project_info['version']}") # set temporary directory for download - temp_dir = tempfile.mkdtemp(prefix="python-api-client-") + tmp_dir = tempfile.mkdtemp(prefix="python-api-client-") if output_paths is None: output_paths = [] @@ -798,7 +808,7 @@ def download_files_async( if len(output_paths) != len(file_paths): warn = "Output file paths are not of the same length as file paths. Cannot store required files." mp.log.warning(warn) - shutil.rmtree(temp_dir) + shutil.rmtree(tmp_dir) raise ClientError(warn) download_list = [] @@ -812,7 +822,7 @@ def download_files_async( if file["path"] in file_paths: index = file_paths.index(file["path"]) file["version"] = version - items = _download_items(file, temp_dir) + items = _download_items(file, tmp_dir) is_latest_version = version == latest_proj_info["version"] task = UpdateTask(file["path"], items, output_paths[index], latest_version=is_latest_version) download_list.extend(task.download_queue_items) @@ -832,13 +842,13 @@ def download_files_async( if not download_list or missing_files: warn = f"No [{', '.join(missing_files)}] exists at version {version}" mp.log.warning(warn) - shutil.rmtree(temp_dir) + shutil.rmtree(tmp_dir) raise ClientError(warn) mp.log.info( f"will download files [{', '.join(files_to_download)}] in {len(download_list)} chunks, total size {total_size}" ) - job = DownloadJob(project_path, total_size, version, update_tasks, download_list, temp_dir, mp, project_info) + job = DownloadJob(project_path, total_size, version, update_tasks, download_list, tmp_dir, mp, project_info) job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) job.futures = [] for item in download_list: @@ -862,8 +872,8 @@ def download_files_finalize(job): job.mp.log.info("--- download finished") for task in job.update_tasks: - task.apply(job.directory, job.mp) + task.apply(job.tmp_dir, job.mp) # Remove temporary download directory - if job.directory is not None and os.path.exists(job.directory): - shutil.rmtree(job.directory) + if job.tmp_dir is not None and os.path.exists(job.tmp_dir): + shutil.rmtree(job.tmp_dir) diff --git a/mergin/client_push.py b/mergin/client_push.py index 885db9ac..af92b865 100644 --- a/mergin/client_push.py +++ b/mergin/client_push.py @@ -124,7 +124,7 @@ def push_project_async(mc, directory): changes = filter_changes(mc, project_info, changes) mp.log.debug("push changes:\n" + pprint.pformat(changes)) - tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-") + tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-", ignore_cleanup_errors=True, delete=True) # If there are any versioned files (aka .gpkg) that are not updated through a diff, # we need to make a temporary copy somewhere to be sure that we are uploading full content. diff --git a/mergin/test/test_client.py b/mergin/test/test_client.py index 32bc192f..d86e17fb 100644 --- a/mergin/test/test_client.py +++ b/mergin/test/test_client.py @@ -2549,7 +2549,7 @@ def test_download_failure(mc): # download project async with pytest.raises(IsADirectoryError): job = download_project_async(mc, project, download_dir) - os.makedirs(os.path.join(download_dir, "base.gpkg.0")) + os.makedirs(os.path.join(job.tmp_dir.name, "base.gpkg.0")) download_project_wait(job) download_project_finalize(job) @@ -2561,7 +2561,7 @@ def test_download_failure(mc): # active waiting remove_folders([download_dir]) job = download_project_async(mc, project, download_dir) - os.makedirs(os.path.join(download_dir, "base.gpkg.0")) + os.makedirs(os.path.join(job.tmp_dir.name, "base.gpkg.0")) with pytest.raises(IsADirectoryError): while True: assert download_project_is_running(job)