X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fptlrpcd.c;h=9810009f35318d6649329cf966625bc1f27f1406;hp=b98d0826606285af21ac23a333925ed12d0fb952;hb=ca6c35cab141597809c6f3a58102fac8ac86104a;hpb=e02cb40761ff8aae3df76c4210a345420b6d4ba1 diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index b98d082..9810009 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -224,9 +224,11 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) if (count == i) { wake_up(&new->set_waitq); - /* XXX: It maybe unnecessary to wakeup all the partners. But to + /* + * XXX: It maybe unnecessary to wakeup all the partners. But to * guarantee the async RPC can be processed ASAP, we have - * no other better choice. It maybe fixed in future. */ + * no other better choice. It maybe fixed in future. + */ for (i = 0; i < pc->pc_npartners; i++) wake_up(&pc->pc_partners[i]->pc_set->set_waitq); } @@ -236,7 +238,7 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) * Return transferred RPCs count. */ static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des, - struct ptlrpc_request_set *src) + struct ptlrpc_request_set *src) { struct list_head *tmp, *pos; struct ptlrpc_request *req; @@ -272,15 +274,18 @@ void ptlrpcd_add_req(struct ptlrpc_request *req) spin_lock(&req->rq_lock); if (req->rq_invalid_rqset) { - struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(5), - back_to_sleep, NULL); - req->rq_invalid_rqset = 0; spin_unlock(&req->rq_lock); - l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi); + if (wait_event_idle_timeout(req->rq_set_waitq, + req->rq_set == NULL, + cfs_time_seconds(5)) == 0) + l_wait_event_abortable(req->rq_set_waitq, + req->rq_set == NULL); } else if (req->rq_set) { - /* If we have a vaid "rq_set", just reuse it to avoid double - * linked. */ + /* + * If we have a vaid "rq_set", just reuse it to avoid double + * linked. + */ LASSERT(req->rq_phase == RQ_PHASE_NEW); LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY); @@ -295,7 +300,7 @@ void ptlrpcd_add_req(struct ptlrpc_request *req) pc = ptlrpcd_select_pc(req); - DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]", + DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s+%d]", req, pc->pc_name, pc->pc_index); ptlrpc_set_add_new_req(pc, req); @@ -314,11 +319,12 @@ static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set) static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) { struct list_head *tmp, *pos; - struct ptlrpc_request *req; - struct ptlrpc_request_set *set = pc->pc_set; - int rc = 0; - int rc2; - ENTRY; + struct ptlrpc_request *req; + struct ptlrpc_request_set *set = pc->pc_set; + int rc = 0; + int rc2; + + ENTRY; if (atomic_read(&set->set_new_count)) { spin_lock(&set->set_new_req_lock); @@ -336,7 +342,8 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) spin_unlock(&set->set_new_req_lock); } - /* We should call lu_env_refill() before handling new requests to make + /* + * We should call lu_env_refill() before handling new requests to make * sure that env key the requests depending on really exists. */ rc2 = lu_env_refill(env); @@ -358,8 +365,10 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) if (atomic_read(&set->set_remaining)) rc |= ptlrpc_check_set(env, set); - /* NB: ptlrpc_check_set has already moved complted request at the - * head of seq::set_requests */ + /* + * NB: ptlrpc_check_set has already moved complted request at the + * head of seq::set_requests + */ list_for_each_safe(pos, tmp, &set->set_requests) { req = list_entry(pos, struct ptlrpc_request, rq_set_chain); if (req->rq_phase != RQ_PHASE_COMPLETE) @@ -376,19 +385,21 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) */ rc = atomic_read(&set->set_new_count); - /* If we have nothing to do, check whether we can take some - * work from our partner threads. */ - if (rc == 0 && pc->pc_npartners > 0) { - struct ptlrpcd_ctl *partner; - struct ptlrpc_request_set *ps; - int first = pc->pc_cursor; - - do { - partner = pc->pc_partners[pc->pc_cursor++]; - if (pc->pc_cursor >= pc->pc_npartners) - pc->pc_cursor = 0; - if (partner == NULL) - continue; + /* + * If we have nothing to do, check whether we can take some + * work from our partner threads. + */ + if (rc == 0 && pc->pc_npartners > 0) { + struct ptlrpcd_ctl *partner; + struct ptlrpc_request_set *ps; + int first = pc->pc_cursor; + + do { + partner = pc->pc_partners[pc->pc_cursor++]; + if (pc->pc_cursor >= pc->pc_npartners) + pc->pc_cursor = 0; + if (partner == NULL) + continue; spin_lock(&partner->pc_lock); ps = partner->pc_set; @@ -403,8 +414,8 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) if (atomic_read(&ps->set_new_count)) { rc = ptlrpcd_steal_rqset(set, ps); if (rc > 0) - CDEBUG(D_RPCTRACE, "transfer %d" - " async RPCs [%d->%d]\n", + CDEBUG(D_RPCTRACE, + "transfer %d async RPCs [%d->%d]\n", rc, partner->pc_index, pc->pc_index); } @@ -420,7 +431,6 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) * Main ptlrpcd thread. * ptlrpc's code paths like to execute in process context, so we have this * thread which spins on a set which contains the rpcs and sends them. - * */ static int ptlrpcd(void *arg) { @@ -430,6 +440,7 @@ static int ptlrpcd(void *arg) struct lu_env env = { .le_ses = &ses }; int rc = 0; int exit = 0; + ENTRY; unshare_fs_struct(); @@ -468,17 +479,17 @@ static int ptlrpcd(void *arg) complete(&pc->pc_starting); - /* - * This mainloop strongly resembles ptlrpc_set_wait() except that our - * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when - * there are requests in the set. New requests come in on the set's - * new_req_list and ptlrpcd_check() moves them into the set. - */ - do { - struct l_wait_info lwi; + /* + * This mainloop strongly resembles ptlrpc_set_wait() except that our + * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when + * there are requests in the set. New requests come in on the set's + * new_req_list and ptlrpcd_check() moves them into the set. + */ + do { + struct l_wait_info lwi; time64_t timeout; - timeout = ptlrpc_set_next_timeout(set); + timeout = ptlrpc_set_next_timeout(set); lwi = LWI_TIMEOUT(cfs_time_seconds(timeout), ptlrpc_expired_set, set); @@ -493,15 +504,15 @@ static int ptlrpcd(void *arg) */ if (test_bit(LIOD_STOP, &pc->pc_flags)) { if (test_bit(LIOD_FORCE, &pc->pc_flags)) - ptlrpc_abort_set(set); - exit++; - } + ptlrpc_abort_set(set); + exit++; + } - /* - * Let's make one more loop to make sure that ptlrpcd_check() - * copied all raced new rpcs into the set so we can kill them. - */ - } while (exit < 2); + /* + * Let's make one more loop to make sure that ptlrpcd_check() + * copied all raced new rpcs into the set so we can kill them. + */ + } while (exit < 2); /* * Wait for inflight requests to drain. @@ -567,6 +578,7 @@ static int ptlrpcd_partners(struct ptlrpcd *pd, int index) int first; int i; int rc = 0; + ENTRY; LASSERT(index >= 0 && index < pd->pd_nthreads); @@ -597,6 +609,7 @@ int ptlrpcd_start(struct ptlrpcd_ctl *pc) { struct task_struct *task; int rc = 0; + ENTRY; /* @@ -653,6 +666,7 @@ out: void ptlrpcd_free(struct ptlrpcd_ctl *pc) { struct ptlrpc_request_set *set = pc->pc_set; + ENTRY; if (!test_bit(LIOD_START, &pc->pc_flags)) { @@ -672,16 +686,16 @@ void ptlrpcd_free(struct ptlrpcd_ctl *pc) clear_bit(LIOD_FORCE, &pc->pc_flags); out: - if (pc->pc_npartners > 0) { - LASSERT(pc->pc_partners != NULL); - - OBD_FREE(pc->pc_partners, - sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners); - pc->pc_partners = NULL; - } - pc->pc_npartners = 0; + if (pc->pc_npartners > 0) { + LASSERT(pc->pc_partners != NULL); + + OBD_FREE(pc->pc_partners, + sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners); + pc->pc_partners = NULL; + } + pc->pc_npartners = 0; pc->pc_error = 0; - EXIT; + EXIT; } static void ptlrpcd_fini(void) @@ -689,6 +703,7 @@ static void ptlrpcd_fini(void) int i; int j; int ncpts; + ENTRY; if (ptlrpcds != NULL) { @@ -731,6 +746,7 @@ static int ptlrpcd_init(void) int ncpts; int cpt; struct ptlrpcd *pd; + ENTRY; /* @@ -739,7 +755,7 @@ static int ptlrpcd_init(void) cptable = cfs_cpt_table; ncpts = cfs_cpt_number(cptable); if (ptlrpcd_cpts != NULL) { - struct cfs_expr_list *el; + struct cfs_expr_list *el; size = ncpts * sizeof(ptlrpcds_cpt_idx[0]); OBD_ALLOC(ptlrpcds_cpt_idx, size); @@ -884,6 +900,7 @@ static int ptlrpcd_init(void) size = offsetof(struct ptlrpcd, pd_threads[nthreads]); OBD_CPT_ALLOC(pd, cptable, cpt, size); + if (!pd) GOTO(out, rc = -ENOMEM); pd->pd_size = size; @@ -940,25 +957,26 @@ out: int ptlrpcd_addref(void) { - int rc = 0; - ENTRY; + int rc = 0; + + ENTRY; mutex_lock(&ptlrpcd_mutex); - if (++ptlrpcd_users == 1) { + if (++ptlrpcd_users == 1) { rc = ptlrpcd_init(); if (rc < 0) ptlrpcd_users--; } mutex_unlock(&ptlrpcd_mutex); - RETURN(rc); + RETURN(rc); } EXPORT_SYMBOL(ptlrpcd_addref); void ptlrpcd_decref(void) { mutex_lock(&ptlrpcd_mutex); - if (--ptlrpcd_users == 0) - ptlrpcd_fini(); + if (--ptlrpcd_users == 0) + ptlrpcd_fini(); mutex_unlock(&ptlrpcd_mutex); } EXPORT_SYMBOL(ptlrpcd_decref);