From 0ca8526853b9b35b8d334f46bdc78d2b7a597f5c Mon Sep 17 00:00:00 2001 From: Brelle Emmanuel Date: Mon, 8 Dec 2025 17:20:24 +0100 Subject: [PATCH 1/2] [pml/ubcl] Removed bad assert: a PML should suport add_proc on a NULL proc pointer Signed-off-by: Brelle Emmanuel --- ompi/mca/pml/ubcl/pml_ubcl_endpoint.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/ompi/mca/pml/ubcl/pml_ubcl_endpoint.c b/ompi/mca/pml/ubcl/pml_ubcl_endpoint.c index 04e29babed9..4d1b45d6a1f 100644 --- a/ompi/mca/pml/ubcl/pml_ubcl_endpoint.c +++ b/ompi/mca/pml/ubcl/pml_ubcl_endpoint.c @@ -375,10 +375,18 @@ int mca_pml_ubcl_endpoint_release(ompi_proc_t *proc) ubcl_error_t ret = UBCL_SUCCESS; int ompi_error = OMPI_SUCCESS; mca_common_ubcl_endpoint_t *endpoint = NULL; - assert(NULL != proc); + + if (NULL == proc) { + OPAL_OUTPUT_VERBOSE((90, mca_pml_ubcl_component.output, "pml_ubcl_endpoint release : proc is NULL")); + return OMPI_SUCCESS; + } endpoint = (proc)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]; - assert(NULL != endpoint); + + if (NULL == endpoint) { + OPAL_OUTPUT_VERBOSE((50, mca_pml_ubcl_component.output, "pml_ubcl_endpoint release : endpoint is NULL")); + return OMPI_SUCCESS; + } endpoint_refcount = opal_atomic_sub_fetch_32(&endpoint->refcount, 1); if (0 == endpoint_refcount) { From 100621ec0a67c5c5680a938388282e6e1f96b2e6 Mon Sep 17 00:00:00 2001 From: "DUPRAT, JULIEN" Date: Tue, 18 Nov 2025 14:41:14 +0100 Subject: [PATCH 2/2] Properly del_proc spawned procs upon instance finalize Signed-off-by: Brelle Emmanuel --- ompi/communicator/comm.c | 5 +++ ompi/dpm/dpm.c | 6 ++- ompi/instance/instance.c | 83 +++++++++++++++++++++++++++++++++++++++- ompi/instance/instance.h | 13 ++++++- ompi/proc/proc.c | 26 ++++++++----- ompi/proc/proc.h | 19 +++++++++ 6 files changed, 139 insertions(+), 13 deletions(-) diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index ce51aa7336a..7d0805c0f87 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -2417,6 +2417,11 @@ int ompi_comm_get_rprocs (ompi_communicator_t *local_comm, ompi_communicator_t * goto err_exit; } + /* When a process gets spawned, every local_comm process needs to create + * an intercomm with the spawnees to communicate. These spawned procs needs + * to be remembered for cleaning later on */ + ompi_proc_retain_spawned_jobids(rprocs, rsize); + err_exit: /* rprocs isn't freed unless we have an error, since it is used in the communicator */ diff --git a/ompi/dpm/dpm.c b/ompi/dpm/dpm.c index ebf296ce4a5..afcbb659fa3 100644 --- a/ompi/dpm/dpm.c +++ b/ompi/dpm/dpm.c @@ -25,6 +25,7 @@ * reserved. * Copyright (c) 2022 IBM Corporation. All rights reserved. * Copyright (c) 2023 Jeffrey M. Squyres. All rights reserved. + * Copyright (c) 2025 BULL S.A.S. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -473,7 +474,10 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, } while (!opal_list_is_empty(&ilist)); /* call add_procs on the new ones */ - rc = MCA_PML_CALL(add_procs(new_proc_list, opal_list_get_size(&ilist))); + rc = MCA_PML_CALL(add_procs(new_proc_list, i)); + /* Register spawned procs names to clean them up after */ + ompi_proc_retain_spawned_jobids(new_proc_list, i); + free(new_proc_list); new_proc_list = NULL; if (OMPI_SUCCESS != rc) { diff --git a/ompi/instance/instance.c b/ompi/instance/instance.c index 8ca19a9724c..67a5426654e 100644 --- a/ompi/instance/instance.c +++ b/ompi/instance/instance.c @@ -8,6 +8,7 @@ * reserved. * Copyright (c) 2023 Jeffrey M. Squyres. All rights reserved. * Copyright (c) 2024 NVIDIA Corporation. All rights reserved. + * Copyright (c) 2025 Bull SAS. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -19,6 +20,7 @@ #include "instance.h" #include "opal/util/arch.h" +#include "opal/util/proc.h" #include "opal/util/show_help.h" #include "opal/util/argv.h" @@ -39,6 +41,7 @@ #include "ompi/dpm/dpm.h" #include "ompi/file/file.h" #include "ompi/mpiext/mpiext.h" +#include "ompi/runtime/ompi_rte.h" #include "ompi/mca/hook/base/base.h" #include "ompi/mca/op/base/base.h" @@ -110,6 +113,8 @@ static void ompi_instance_construct (ompi_instance_t *instance) instance->i_name[0] = '\0'; instance->i_flags = 0; instance->i_keyhash = NULL; + OBJ_CONSTRUCT(&instance->i_spawned_proc_namelists, opal_list_t); + OBJ_CONSTRUCT(&instance->i_spawned_proc_lock, opal_mutex_t); OBJ_CONSTRUCT(&instance->s_lock, opal_mutex_t); instance->errhandler_type = OMPI_ERRHANDLER_TYPE_INSTANCE; instance->bsend_buffer = NULL; @@ -117,6 +122,8 @@ static void ompi_instance_construct (ompi_instance_t *instance) static void ompi_instance_destruct(ompi_instance_t *instance) { + OBJ_DESTRUCT(&instance->i_spawned_proc_namelists); + OBJ_DESTRUCT(&instance->i_spawned_proc_lock); OBJ_DESTRUCT(&instance->s_lock); } @@ -177,6 +184,61 @@ static int ompi_instance_print_error (const char *error, int ret) return ret; } +/* This function is only needed for the world paradigm because it's the only one + * we can spawn processes in it for now */ +void ompi_proc_retain_spawned_jobids(ompi_proc_t **spawned_procs, size_t list_size) { + const ompi_proc_t *spawned_proc; + opal_namelist_t *registered_proc; + ompi_process_name_t name; + ompi_rte_cmp_bitmask_t mask; + + /* NULL if session paradigm, not NULL if world paradigm */ + if (ompi_mpi_instance_default == NULL) { + return; + } + + /* return the proc-struct which matches this jobid */ + mask = OMPI_RTE_CMP_JOBID; + + for (size_t i = 0; i < list_size; i++) { + /* The idea is to filter the procs that have the same jobid, + * aka the jobs in the same instance. + * After that we lookup if the jobid is already present, meaning this + * instance is already registered via the jobid of its procs. + * If the jobid is not present we add it */ + + int found = 0; + spawned_proc = spawned_procs[i]; + if (OMPI_PROC_MY_NAME->jobid == spawned_proc->super.proc_name.jobid) { + continue; + } + + name.jobid = spawned_proc->super.proc_name.jobid; + name.vpid = spawned_proc->super.proc_name.vpid; + + opal_mutex_lock(&ompi_mpi_instance_default->i_spawned_proc_lock); + OPAL_LIST_FOREACH(registered_proc, + &ompi_mpi_instance_default->i_spawned_proc_namelists, + opal_namelist_t) { + if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, + ®istered_proc->name, &name)) { + found = 1; + break; + } + } + + if (0 == found) { + opal_namelist_t *namelist = OBJ_NEW(opal_namelist_t); + namelist->name.jobid = name.jobid; + namelist->name.vpid = 0; /* not needed for lookup */ + opal_list_append(&ompi_mpi_instance_default->i_spawned_proc_namelists, + &namelist->super); + } + opal_mutex_unlock(&ompi_mpi_instance_default->i_spawned_proc_lock); + } + return; +} + static int ompi_mpi_instance_cleanup_pml (void) { /* call del_procs on all allocated procs even though some may not be known @@ -184,11 +246,28 @@ static int ompi_mpi_instance_cleanup_pml (void) * any unknown procs. */ size_t nprocs = 0; ompi_proc_t **procs; + opal_namelist_t *registered_name; + opal_namelist_t *next; procs = ompi_proc_get_allocated (&nprocs); MCA_PML_CALL(del_procs(procs, nprocs)); free(procs); + /* If we are in a world paradigm and spawned processes we need to clean */ + if (ompi_mpi_instance_default != NULL) { + + /* Let's loop on all spawned jobids and del_proc the concerned procs */ + OPAL_LIST_FOREACH_SAFE(registered_name, next, + &ompi_mpi_instance_default->i_spawned_proc_namelists, + opal_namelist_t) { + + procs = ompi_proc_get_by_name(®istered_name->name, &nprocs); + MCA_PML_CALL(del_procs(procs, nprocs)); + opal_list_remove_item(&ompi_mpi_instance_default->i_spawned_proc_namelists, + ®istered_name->super); + } + } + return OMPI_SUCCESS; } @@ -989,14 +1068,14 @@ int ompi_mpi_instance_finalize (ompi_instance_t **instance) { int ret = OMPI_SUCCESS; - OBJ_RELEASE(*instance); - opal_mutex_lock (&instance_lock); if (0 == opal_atomic_add_fetch_32 (&ompi_instance_count, -1)) { ret = ompi_mpi_instance_finalize_common (); } opal_mutex_unlock (&instance_lock); + OBJ_RELEASE(*instance); + *instance = &ompi_mpi_instance_null.instance; return ret; diff --git a/ompi/instance/instance.h b/ompi/instance/instance.h index ce5fb25919c..7766a319bd1 100644 --- a/ompi/instance/instance.h +++ b/ompi/instance/instance.h @@ -1,6 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2018-2025 Triad National Security, LLC. All rights reserved. + * Copyright (c) 2025 BULL S.A.S. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -34,6 +35,8 @@ struct ompi_instance_t { /* Attributes */ opal_hash_table_t *i_keyhash; + opal_mutex_t i_spawned_proc_lock; + opal_list_t i_spawned_proc_namelists; /* index in Fortran <-> C translation array (for when I get around * to implementing fortran support-- UGH) */ @@ -88,7 +91,7 @@ OBJ_CLASS_DECLARATION(ompi_instance_t); * the PREDEFINED_COMMUNICATOR_PAD macro? * A: Most likely not, but it would be good to check. */ -#define PREDEFINED_INSTANCE_PAD 512 +#define PREDEFINED_INSTANCE_PAD 1024 struct ompi_predefined_instance_t { ompi_instance_t instance; @@ -120,6 +123,14 @@ int ompi_mpi_instance_retain (void); */ void ompi_mpi_instance_release (void); +/** + * @brief Saves jobid of spawned procs to cleanup upon finalize + * + * @param[in] spawned_proc_list list of procs that were spawned + * @param[in] list_size size of the list of procs that were spawned + */ +void ompi_proc_retain_spawned_jobids(ompi_proc_t **spawned_proc_list, size_t list_size); + /** * @brief Create a new MPI instance * diff --git a/ompi/proc/proc.c b/ompi/proc/proc.c index 080506925c6..45f20cc8ab8 100644 --- a/ompi/proc/proc.c +++ b/ompi/proc/proc.c @@ -20,6 +20,7 @@ * Copyright (c) 2015-2017 Mellanox Technologies. All rights reserved. * * Copyright (c) 2021 Nanook Consulting. All rights reserved. + * Copyright (c) 2025 Bull SAS. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -417,25 +418,19 @@ int ompi_proc_world_size (void) return ompi_process_info.num_procs; } -ompi_proc_t **ompi_proc_get_allocated (size_t *size) +ompi_proc_t **ompi_proc_get_by_name(const ompi_process_name_t *name, size_t *size) { ompi_proc_t **procs; ompi_proc_t *proc; size_t count = 0; ompi_rte_cmp_bitmask_t mask; - ompi_process_name_t my_name; - /* check bozo case */ - if (NULL == ompi_proc_local_proc) { - return NULL; - } mask = OMPI_RTE_CMP_JOBID; - my_name = *OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name); /* First count how many match this jobid */ opal_mutex_lock (&ompi_proc_lock); OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) { - if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, OMPI_CAST_RTE_NAME(&proc->super.proc_name), &my_name)) { + if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, OMPI_CAST_RTE_NAME(&proc->super.proc_name), name)) { ++count; } } @@ -450,7 +445,7 @@ ompi_proc_t **ompi_proc_get_allocated (size_t *size) /* now save only the procs that match this jobid */ count = 0; OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) { - if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, &my_name)) { + if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, name)) { /* DO NOT RETAIN THIS OBJECT - the reference count on this * object will be adjusted by external callers. The intent * here is to allow the reference count to drop to zero if @@ -474,6 +469,19 @@ ompi_proc_t **ompi_proc_get_allocated (size_t *size) return procs; } +ompi_proc_t **ompi_proc_get_allocated (size_t *size) +{ + ompi_process_name_t my_name; + + /* check bozo case */ + if (NULL == ompi_proc_local_proc) { + return NULL; + } + my_name = *OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name); + + return ompi_proc_get_by_name(&my_name, size); +} + ompi_proc_t **ompi_proc_world (size_t *size) { ompi_proc_t **procs; diff --git a/ompi/proc/proc.h b/ompi/proc/proc.h index 028d1348d79..b0a5d199bf2 100644 --- a/ompi/proc/proc.h +++ b/ompi/proc/proc.h @@ -17,6 +17,7 @@ * Copyright (c) 2015-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2021 Nanook Consulting. All rights reserved. + * Copyright (c) 2025 BULL S.A.S. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -192,6 +193,24 @@ OMPI_DECLSPEC ompi_proc_t** ompi_proc_world(size_t* size); OMPI_DECLSPEC int ompi_proc_world_size (void); +/** + * Returns the list of proc with the given name + * Returns the list of proc associated with the jobid of the given + * name. If at least one proc with the jobid, then the name is known and we + * return the procs. + * + * @note The reference count of each process in the array is + * NOT incremented. + * + * @param[in] name Name containing the jobid of wanted processes + * @param[in] size Number of processes in the ompi_proc_t array + * + * @return Array of pointers to proc instances under the same name in the current + * MPI_COMM_WORLD, or NULL if there is an internal failure. + */ +OMPI_DECLSPEC ompi_proc_t **ompi_proc_get_by_name(const ompi_process_name_t *name, + size_t *size); + /** * Returns the list of proc instances associated with this job. *