X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Flnet%2Flib-eq.c;h=fec269a9a664033568e14345868a5049d9626093;hb=2f5ad4e8cf26406a7aa732efa4d813dbcced55d9;hp=059afa22e929a548f3a6ee5154bfe23a9c24c5cb;hpb=c7bff5640caff778d4cfca229672a2cc67b350d6;p=fs%2Flustre-release.git diff --git a/lnet/lnet/lib-eq.c b/lnet/lnet/lib-eq.c index 059afa2..fec269a 100644 --- a/lnet/lnet/lib-eq.c +++ b/lnet/lnet/lib-eq.c @@ -97,10 +97,8 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback, if (count != 0) { LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t)); - if (eq->eq_events == NULL) { - lnet_eq_free(eq); - return -ENOMEM; - } + if (eq->eq_events == NULL) + goto failed; /* NB allocator has set all event sequence numbers to 0, * so all them should be earlier than eq_deq_seq */ } @@ -108,19 +106,39 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback, eq->eq_deq_seq = 1; eq->eq_enq_seq = 1; eq->eq_size = count; - eq->eq_refcount = 0; eq->eq_callback = callback; - lnet_res_lock(); + eq->eq_refs = cfs_percpt_alloc(lnet_cpt_table(), + sizeof(*eq->eq_refs[0])); + if (eq->eq_refs == NULL) + goto failed; + + /* MUST hold both exclusive lnet_res_lock */ + lnet_res_lock(LNET_LOCK_EX); + /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do + * both EQ lookup and poll event with only lnet_eq_wait_lock */ + lnet_eq_wait_lock(); lnet_res_lh_initialize(&the_lnet.ln_eq_container, &eq->eq_lh); cfs_list_add(&eq->eq_list, &the_lnet.ln_eq_container.rec_active); - lnet_res_unlock(); + lnet_eq_wait_unlock(); + lnet_res_unlock(LNET_LOCK_EX); lnet_eq2handle(handle, eq); return 0; + +failed: + if (eq->eq_events != NULL) + LIBCFS_FREE(eq->eq_events, count * sizeof(lnet_event_t)); + + if (eq->eq_refs != NULL) + cfs_percpt_free(eq->eq_refs); + + lnet_eq_free(eq); + return -ENOMEM; } +EXPORT_SYMBOL(LNetEQAlloc); /** * Release the resources associated with an event queue if it's idle; @@ -135,48 +153,64 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback, int LNetEQFree(lnet_handle_eq_t eqh) { - lnet_eq_t *eq; - int size; - lnet_event_t *events; - - LASSERT (the_lnet.ln_init); - LASSERT (the_lnet.ln_refcount > 0); - - lnet_res_lock(); + struct lnet_eq *eq; + lnet_event_t *events = NULL; + int **refs = NULL; + int *ref; + int rc = 0; + int size = 0; + int i; + + LASSERT(the_lnet.ln_init); + LASSERT(the_lnet.ln_refcount > 0); + + lnet_res_lock(LNET_LOCK_EX); + /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do + * both EQ lookup and poll event with only lnet_eq_wait_lock */ + lnet_eq_wait_lock(); eq = lnet_handle2eq(&eqh); if (eq == NULL) { - lnet_res_unlock(); - return -ENOENT; + rc = -ENOENT; + goto out; } - if (eq->eq_refcount != 0) { - CDEBUG(D_NET, "Event queue (%d) busy on destroy.\n", - eq->eq_refcount); - lnet_res_unlock(); - return -EBUSY; + cfs_percpt_for_each(ref, i, eq->eq_refs) { + LASSERT(*ref >= 0); + if (*ref == 0) + continue; + + CDEBUG(D_NET, "Event equeue (%d: %d) busy on destroy.\n", + i, *ref); + rc = -EBUSY; + goto out; } /* stash for free after lock dropped */ events = eq->eq_events; size = eq->eq_size; + refs = eq->eq_refs; lnet_res_lh_invalidate(&eq->eq_lh); cfs_list_del(&eq->eq_list); lnet_eq_free_locked(eq); - - lnet_res_unlock(); + out: + lnet_eq_wait_unlock(); + lnet_res_unlock(LNET_LOCK_EX); if (events != NULL) LIBCFS_FREE(events, size * sizeof(lnet_event_t)); + if (refs != NULL) + cfs_percpt_free(refs); - return 0; + return rc; } +EXPORT_SYMBOL(LNetEQFree); void lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev) { - /* MUST called with resource lock hold */ + /* MUST called with resource lock hold but w/o lnet_eq_wait_lock */ int index; if (eq->eq_size == 0) { @@ -185,6 +219,7 @@ lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev) return; } + lnet_eq_wait_lock(); ev->sequence = eq->eq_enq_seq++; LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size)); @@ -207,22 +242,23 @@ lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev) pthread_cond_broadcast(&the_lnet.ln_eq_cond); # endif #endif + lnet_eq_wait_unlock(); } int lnet_eq_dequeue_event(lnet_eq_t *eq, lnet_event_t *ev) { - int new_index = eq->eq_deq_seq & (eq->eq_size - 1); - lnet_event_t *new_event = &eq->eq_events[new_index]; - int rc; - ENTRY; + int new_index = eq->eq_deq_seq & (eq->eq_size - 1); + lnet_event_t *new_event = &eq->eq_events[new_index]; + int rc; + ENTRY; - if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) { - RETURN(0); - } + /* must called with lnet_eq_wait_lock hold */ + if (LNET_SEQ_GT(eq->eq_deq_seq, new_event->sequence)) + RETURN(0); - /* We've got a new event... */ - *ev = *new_event; + /* We've got a new event... */ + *ev = *new_event; CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n", new_event, eq->eq_deq_seq, eq->eq_size); @@ -266,6 +302,7 @@ LNetEQGet (lnet_handle_eq_t eventq, lnet_event_t *event) return LNetEQPoll(&eventq, 1, 0, event, &which); } +EXPORT_SYMBOL(LNetEQGet); /** * Block the calling process until there is an event in the EQ. @@ -291,16 +328,17 @@ LNetEQWait (lnet_handle_eq_t eventq, lnet_event_t *event) return LNetEQPoll(&eventq, 1, LNET_TIME_FOREVER, event, &which); } +EXPORT_SYMBOL(LNetEQWait); #ifdef __KERNEL__ static int lnet_eq_wait_locked(int *timeout_ms) { - int tms = *timeout_ms; - int wait; - cfs_waitlink_t wl; - cfs_time_t now; + int tms = *timeout_ms; + int wait; + cfs_waitlink_t wl; + cfs_time_t now; if (tms == 0) return -1; /* don't want to wait and no new event */ @@ -309,7 +347,7 @@ lnet_eq_wait_locked(int *timeout_ms) cfs_set_current_state(CFS_TASK_INTERRUPTIBLE); cfs_waitq_add(&the_lnet.ln_eq_waitq, &wl); - lnet_res_unlock(); + lnet_eq_wait_unlock(); if (tms < 0) { cfs_waitq_wait(&wl, CFS_TASK_INTERRUPTIBLE); @@ -329,7 +367,7 @@ lnet_eq_wait_locked(int *timeout_ms) wait = tms != 0; /* might need to call here again */ *timeout_ms = tms; - lnet_res_lock(); + lnet_eq_wait_lock(); cfs_waitq_del(&the_lnet.ln_eq_waitq, &wl); return wait; @@ -342,10 +380,11 @@ static void lnet_eq_cond_wait(struct timespec *ts) { if (ts == NULL) { - pthread_cond_wait(&the_lnet.ln_eq_cond, &the_lnet.ln_res_lock); + pthread_cond_wait(&the_lnet.ln_eq_cond, + &the_lnet.ln_eq_wait_lock); } else { pthread_cond_timedwait(&the_lnet.ln_eq_cond, - &the_lnet.ln_res_lock, ts); + &the_lnet.ln_eq_wait_lock, ts); } } # endif @@ -353,28 +392,28 @@ lnet_eq_cond_wait(struct timespec *ts) static int lnet_eq_wait_locked(int *timeout_ms) { - lnet_ni_t *eq_waitni = NULL; - int tms = *timeout_ms; - int wait; - struct timeval then; - struct timeval now; + lnet_ni_t *eq_waitni = NULL; + int tms = *timeout_ms; + int wait; + struct timeval then; + struct timeval now; if (the_lnet.ln_eq_waitni != NULL) { /* I have a single NI that I have to call into, to get * events queued, or to block. */ - lnet_res_unlock(); + lnet_eq_wait_unlock(); - LNET_LOCK(); + lnet_net_lock(0); eq_waitni = the_lnet.ln_eq_waitni; if (unlikely(eq_waitni == NULL)) { - LNET_UNLOCK(); + lnet_net_unlock(0); - lnet_res_lock(); + lnet_eq_wait_lock(); return -1; } - lnet_ni_addref_locked(eq_waitni); - LNET_UNLOCK(); + lnet_ni_addref_locked(eq_waitni, 0); + lnet_net_unlock(0); if (tms <= 0) { /* even for tms == 0 */ (eq_waitni->ni_lnd->lnd_wait)(eq_waitni, tms); @@ -392,7 +431,7 @@ lnet_eq_wait_locked(int *timeout_ms) } lnet_ni_decref(eq_waitni); - lnet_res_lock(); + lnet_eq_wait_lock(); } else { /* w/o eq_waitni */ # ifndef HAVE_LIBPTHREAD /* If I'm single-threaded, LNET fails at startup if it can't @@ -466,8 +505,8 @@ lnet_eq_wait_locked(int *timeout_ms) * \retval -ENOENT If there's an invalid handle in \a eventqs. */ int -LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms, - lnet_event_t *event, int *which) +LNetEQPoll(lnet_handle_eq_t *eventqs, int neq, int timeout_ms, + lnet_event_t *event, int *which) { int wait = 1; int rc; @@ -480,30 +519,30 @@ LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms, if (neq < 1) RETURN(-ENOENT); - lnet_res_lock(); + lnet_eq_wait_lock(); - for (;;) { + for (;;) { #ifndef __KERNEL__ - lnet_res_unlock(); + lnet_eq_wait_unlock(); /* Recursion breaker */ if (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING && !LNetHandleIsEqual(eventqs[0], the_lnet.ln_rc_eqh)) lnet_router_checker(); - lnet_res_lock(); + lnet_eq_wait_lock(); #endif for (i = 0; i < neq; i++) { lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]); if (eq == NULL) { - lnet_res_unlock(); + lnet_eq_wait_unlock(); RETURN(-ENOENT); } rc = lnet_eq_dequeue_event(eq, event); if (rc != 0) { - lnet_res_unlock(); + lnet_eq_wait_unlock(); *which = i; RETURN(rc); } @@ -524,6 +563,6 @@ LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms, break; } - lnet_res_unlock(); + lnet_eq_wait_unlock(); RETURN(0); }