Skip to content

Commit b39d27f

Browse files
authored
Merge pull request #15 from shintaro-iwasaki/topic/thread_framework2
Fix threads framework
2 parents 205ee16 + d8f1bb9 commit b39d27f

32 files changed

+353
-329
lines changed

ompi/runtime/ompi_rte.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
#include "opal/util/string_copy.h"
4747
#include "opal/mca/hwloc/base/base.h"
4848
#include "opal/mca/pmix/base/base.h"
49-
#include "opal/mca/threads/threads.h"
5049
#include "opal/mca/threads/tsd.h"
5150
#include "opal/class/opal_list.h"
5251
#include "opal/dss/dss.h"

opal/mca/threads/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ The threading model is chosen via the configure option `--with-threads=<threadin
1616

1717
The MCA for threading libraries is implemented in two places, once as a set of `.h` files in `mca/threads/<threading_model>/threads_<threading_model>_{threads,mutex,tsd}.h` which are defined inline to the main thread implementation and also as an MCA component that is loaded at runtime.
1818

19-
For performance reasons, in particular synchonization overhead, it is not possible to implement a threading model as a traditional MCA. This means --at least in the short term-- that threading models are chosen at compile time rather than runtime options, using mechanisms similar to Open MPI's libevent integration.
19+
For performance reasons, in particular synchronization overhead, it is not possible to implement a threading model as a traditional MCA. This means --at least in the short term-- that threading models are chosen at compile time rather than runtime options, using mechanisms similar to Open MPI's libevent integration.
2020

2121
The .h files are meant to be run on the fast path containing inline synchonization functions (threads_<threading_model>_mutex.h, thread local storage (threads_<threading_model>_tsd.h) and the opal_thread structure (threads_<threading_model>_thread.h).
2222

opal/mca/threads/argobots/Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,5 @@ libmca_threads_argobots_la_SOURCES = \
3232
threads_argobots_tsd.h \
3333
threads_argobots_wait_sync.c \
3434
threads_argobots_wait_sync.h
35+
36+
AM_LDFLAGS = -labt

opal/mca/threads/argobots/threads_argobots.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
static inline void opal_threads_argobots_ensure_init(void)
3232
{
33-
if (ABT_initialized() != 0) {
33+
if (ABT_SUCCESS != ABT_initialized()) {
3434
ABT_init(0, 0);
3535
}
3636
}

opal/mca/threads/argobots/threads_argobots_event.c

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ static int evthread_argobots_lock(unsigned mode, void *_lock)
4747
} else {
4848
ret = ABT_mutex_lock(lock);
4949
}
50-
return ret;
50+
return ABT_SUCCESS == ret ? 0 : -1;
5151
}
5252

5353
static int evthread_argobots_unlock(unsigned mode, void *_lock)
@@ -56,7 +56,7 @@ static int evthread_argobots_unlock(unsigned mode, void *_lock)
5656
int ret = ABT_mutex_unlock(lock);
5757
/* This yield is necessary to avoid taking a lock consecutively. */
5858
ABT_thread_yield();
59-
return ret;
59+
return ABT_SUCCESS == ret ? 0 : -1;
6060
}
6161

6262
static unsigned long evthread_argobots_get_id(void)
@@ -82,19 +82,19 @@ static void evthread_argobots_cond_free(void *_cond)
8282
static int evthread_argobots_cond_signal(void *_cond, int broadcast)
8383
{
8484
ABT_cond cond = _cond;
85-
int r;
85+
int ret;
8686
if (broadcast) {
87-
r = ABT_cond_broadcast(cond);
87+
ret = ABT_cond_broadcast(cond);
8888
} else {
89-
r = ABT_cond_signal(cond);
89+
ret = ABT_cond_signal(cond);
9090
}
91-
return r ? -1 : 0;
91+
return ABT_SUCCESS == ret ? 0 : -1;
9292
}
9393

9494
static int evthread_argobots_cond_wait(void *_cond, void *_lock,
9595
const struct timeval *tv)
9696
{
97-
int r;
97+
int ret;
9898
ABT_cond cond = _cond;
9999
ABT_mutex lock = _lock;
100100

@@ -105,15 +105,17 @@ static int evthread_argobots_cond_wait(void *_cond, void *_lock,
105105
evutil_timeradd(&now, tv, &abstime);
106106
ts.tv_sec = abstime.tv_sec;
107107
ts.tv_nsec = abstime.tv_usec * 1000;
108-
r = ABT_cond_timedwait(cond, lock, &ts);
109-
if (r != 0) {
108+
ret = ABT_cond_timedwait(cond, lock, &ts);
109+
if (ABT_ERR_COND_TIMEDOUT == ret) {
110110
return 1;
111+
} else if (ABT_SUCCESS != ret) {
112+
return -1;
111113
} else {
112114
return 0;
113115
}
114116
} else {
115-
r = ABT_cond_wait(cond, lock);
116-
return r ? -1 : 0;
117+
ret = ABT_cond_wait(cond, lock);
118+
return ABT_SUCCESS == ret ? 0 : -1;
117119
}
118120
}
119121

opal/mca/threads/argobots/threads_argobots_module.c

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,12 @@ bool opal_thread_self_compare(opal_thread_t *t)
8585

8686
int opal_thread_join(opal_thread_t *t, void **thr_return)
8787
{
88-
opal_threads_argobots_ensure_init();
8988
int rc = ABT_thread_free(&t->t_handle);
9089
if (thr_return) {
9190
*thr_return = t->t_ret;
9291
}
9392
t->t_handle = ABT_THREAD_NULL;
94-
return (0 == rc) ? OPAL_SUCCESS : OPAL_ERROR;
93+
return (ABT_SUCCESS == rc) ? OPAL_SUCCESS : OPAL_ERROR;
9594
}
9695

9796
void opal_thread_set_main()
@@ -105,7 +104,7 @@ int opal_thread_start(opal_thread_t *t)
105104
opal_threads_argobots_ensure_init();
106105
int rc;
107106
if (OPAL_ENABLE_DEBUG) {
108-
if (NULL == t->t_run || t->t_handle != ABT_THREAD_NULL) {
107+
if (NULL == t->t_run || ABT_THREAD_NULL != t->t_handle) {
109108
return OPAL_ERR_BAD_PARAM;
110109
}
111110
}
@@ -116,30 +115,30 @@ int opal_thread_start(opal_thread_t *t)
116115
opal_thread_argobots_wrapper, t,
117116
ABT_THREAD_ATTR_NULL, &t->t_handle);
118117

119-
return (0 == rc) ? OPAL_SUCCESS : OPAL_ERROR;
118+
return (ABT_SUCCESS == rc) ? OPAL_SUCCESS : OPAL_ERROR;
120119
}
121120

122-
opal_class_t opal_thread_t_class;
121+
OBJ_CLASS_DECLARATION(opal_thread_t);
123122

124123
int opal_tsd_key_create(opal_tsd_key_t *key, opal_tsd_destructor_t destructor)
125124
{
126125
opal_threads_argobots_ensure_init();
127126
int rc;
128127
rc = ABT_key_create(destructor, key);
129-
if ((0 == rc) && (opal_thread_get_argobots_self() == opal_main_thread)) {
128+
if ((ABT_SUCCESS == rc) &&
129+
(opal_thread_get_argobots_self() == opal_main_thread)) {
130130
opal_tsd_key_values = (struct opal_tsd_key_value *)
131131
realloc(opal_tsd_key_values, (opal_tsd_key_values_count + 1) *
132132
sizeof(struct opal_tsd_key_value));
133133
opal_tsd_key_values[opal_tsd_key_values_count].key = *key;
134134
opal_tsd_key_values[opal_tsd_key_values_count].destructor = destructor;
135135
opal_tsd_key_values_count++;
136136
}
137-
return rc;
137+
return (ABT_SUCCESS == rc) ? OPAL_SUCCESS : OPAL_ERROR;
138138
}
139139

140140
int opal_tsd_keys_destruct(void)
141141
{
142-
opal_threads_argobots_ensure_init();
143142
int i;
144143
void *ptr;
145144
for (i = 0; i < opal_tsd_key_values_count; i++) {

opal/mca/threads/argobots/threads_argobots_mutex.c

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
#include "opal/mca/threads/mutex.h"
3232
#include "opal/mca/threads/argobots/threads_argobots_mutex.h"
33+
#include "opal/constants.h"
3334

3435
/*
3536
* Wait and see if some upper layer wants to use threads, if support
@@ -50,9 +51,8 @@ static void mca_threads_argobots_mutex_constructor(opal_mutex_t *p_mutex)
5051
opal_atomic_lock_init(&p_mutex->m_lock_atomic, 0);
5152
}
5253

53-
static void mca_threads_argobots_mutex_desctructor(opal_mutex_t *p_mutex)
54+
static void mca_threads_argobots_mutex_destructor(opal_mutex_t *p_mutex)
5455
{
55-
opal_threads_argobots_ensure_init();
5656
if (OPAL_ABT_MUTEX_NULL != p_mutex->m_lock_argobots) {
5757
ABT_mutex_free(&p_mutex->m_lock_argobots);
5858
}
@@ -72,10 +72,9 @@ static void mca_threads_argobots_recursive_mutex_constructor
7272
opal_atomic_lock_init(&p_mutex->m_lock_atomic, 0);
7373
}
7474

75-
static void mca_threads_argobots_recursive_mutex_desctructor
75+
static void mca_threads_argobots_recursive_mutex_destructor
7676
(opal_recursive_mutex_t *p_mutex)
7777
{
78-
opal_threads_argobots_ensure_init();
7978
if (OPAL_ABT_MUTEX_NULL != p_mutex->m_lock_argobots) {
8079
ABT_mutex_free(&p_mutex->m_lock_argobots);
8180
}
@@ -84,8 +83,92 @@ static void mca_threads_argobots_recursive_mutex_desctructor
8483
OBJ_CLASS_INSTANCE(opal_mutex_t,
8584
opal_object_t,
8685
mca_threads_argobots_mutex_constructor,
87-
mca_threads_argobots_mutex_desctructor);
86+
mca_threads_argobots_mutex_destructor);
8887
OBJ_CLASS_INSTANCE(opal_recursive_mutex_t,
8988
opal_object_t,
9089
mca_threads_argobots_recursive_mutex_constructor,
91-
mca_threads_argobots_recursive_mutex_desctructor);
90+
mca_threads_argobots_recursive_mutex_destructor);
91+
92+
void opal_mutex_create(struct opal_mutex_t *m)
93+
{
94+
opal_threads_argobots_ensure_init();
95+
while (OPAL_ABT_MUTEX_NULL == m->m_lock_argobots) {
96+
ABT_mutex abt_mutex;
97+
if (m->m_recursive) {
98+
ABT_mutex_attr abt_mutex_attr;
99+
ABT_mutex_attr_create(&abt_mutex_attr);
100+
ABT_mutex_attr_set_recursive(abt_mutex_attr, ABT_TRUE);
101+
ABT_mutex_create_with_attr(abt_mutex_attr, &abt_mutex);
102+
ABT_mutex_attr_free(&abt_mutex_attr);
103+
} else {
104+
ABT_mutex_create(&abt_mutex);
105+
}
106+
void *null_ptr = OPAL_ABT_MUTEX_NULL;
107+
if (opal_atomic_compare_exchange_strong_ptr(
108+
(intptr_t *)&m->m_lock_argobots, (intptr_t *)&null_ptr,
109+
(intptr_t)abt_mutex)) {
110+
/* mutex is successfully created and substituted. */
111+
return;
112+
}
113+
ABT_mutex_free(&abt_mutex);
114+
}
115+
}
116+
117+
static void opal_cond_create(opal_cond_t *cond)
118+
{
119+
opal_threads_argobots_ensure_init();
120+
while (OPAL_ABT_COND_NULL == *cond) {
121+
ABT_cond new_cond;
122+
ABT_cond_create(&new_cond);
123+
void *null_ptr = OPAL_ABT_COND_NULL;
124+
if (opal_atomic_compare_exchange_strong_ptr((intptr_t *)cond,
125+
(intptr_t *)&null_ptr,
126+
(intptr_t)new_cond)) {
127+
/* cond is successfully created and substituted. */
128+
return;
129+
}
130+
ABT_cond_free(&new_cond);
131+
}
132+
}
133+
134+
int opal_cond_init(opal_cond_t *cond)
135+
{
136+
*cond = OPAL_ABT_COND_NULL;
137+
return OPAL_SUCCESS;
138+
}
139+
140+
int opal_cond_wait(opal_cond_t *cond, opal_mutex_t *lock)
141+
{
142+
if (OPAL_ABT_COND_NULL == *cond) {
143+
opal_cond_create(cond);
144+
}
145+
int ret = ABT_cond_wait(*cond, lock->m_lock_argobots);
146+
return ABT_SUCCESS == ret ? OPAL_SUCCESS : OPAL_ERROR;
147+
}
148+
149+
int opal_cond_broadcast(opal_cond_t *cond)
150+
{
151+
if (OPAL_ABT_COND_NULL == *cond) {
152+
opal_cond_create(cond);
153+
}
154+
int ret = ABT_cond_broadcast(*cond);
155+
return ABT_SUCCESS == ret ? OPAL_SUCCESS : OPAL_ERROR;
156+
}
157+
158+
int opal_cond_signal(opal_cond_t *cond)
159+
{
160+
if (OPAL_ABT_COND_NULL == *cond) {
161+
opal_cond_create(cond);
162+
}
163+
int ret = ABT_cond_signal(*cond);
164+
return ABT_SUCCESS == ret ? OPAL_SUCCESS : OPAL_ERROR;
165+
}
166+
167+
int opal_cond_destroy(opal_cond_t *cond)
168+
{
169+
int ret = ABT_SUCCESS;
170+
if (OPAL_ABT_COND_NULL != *cond) {
171+
ret = ABT_cond_free(cond);
172+
}
173+
return ABT_SUCCESS == ret ? OPAL_SUCCESS : OPAL_ERROR;
174+
}

0 commit comments

Comments
 (0)