X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fptlrpcd.c;h=4c9b577ba6960e0e011958a80d7b84b04c1378e6;hp=b98d0826606285af21ac23a333925ed12d0fb952;hb=885b494632ca16d95fd09685a571b76d80d09414;hpb=e02cb40761ff8aae3df76c4210a345420b6d4ba1;ds=sidebyside diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index b98d082..4c9b577 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -27,7 +27,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. * * lustre/ptlrpc/ptlrpcd.c */ @@ -175,7 +174,7 @@ ptlrpcd_select_pc(struct ptlrpc_request *req) if (req != NULL && req->rq_send_state != LUSTRE_IMP_FULL) return &ptlrpcd_rcv; - cpt = cfs_cpt_current(cfs_cpt_table, 1); + cpt = cfs_cpt_current(cfs_cpt_tab, 1); if (ptlrpcds_cpt_idx == NULL) idx = cpt; else @@ -224,9 +223,11 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) if (count == i) { wake_up(&new->set_waitq); - /* XXX: It maybe unnecessary to wakeup all the partners. But to + /* + * XXX: It maybe unnecessary to wakeup all the partners. But to * guarantee the async RPC can be processed ASAP, we have - * no other better choice. It maybe fixed in future. */ + * no other better choice. It maybe fixed in future. + */ for (i = 0; i < pc->pc_npartners; i++) wake_up(&pc->pc_partners[i]->pc_set->set_waitq); } @@ -236,19 +237,16 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) * Return transferred RPCs count. */ static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des, - struct ptlrpc_request_set *src) + struct ptlrpc_request_set *src) { - struct list_head *tmp, *pos; struct ptlrpc_request *req; int rc = 0; spin_lock(&src->set_new_req_lock); if (likely(!list_empty(&src->set_new_requests))) { - list_for_each_safe(pos, tmp, &src->set_new_requests) { - req = list_entry(pos, struct ptlrpc_request, - rq_set_chain); + list_for_each_entry(req, &src->set_new_requests, rq_set_chain) req->rq_set = des; - } + list_splice_init(&src->set_new_requests, &des->set_requests); rc = atomic_read(&src->set_new_count); @@ -272,15 +270,18 @@ void ptlrpcd_add_req(struct ptlrpc_request *req) spin_lock(&req->rq_lock); if (req->rq_invalid_rqset) { - struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(5), - back_to_sleep, NULL); - req->rq_invalid_rqset = 0; spin_unlock(&req->rq_lock); - l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi); + if (wait_event_idle_timeout(req->rq_set_waitq, + req->rq_set == NULL, + cfs_time_seconds(5)) == 0) + l_wait_event_abortable(req->rq_set_waitq, + req->rq_set == NULL); } else if (req->rq_set) { - /* If we have a vaid "rq_set", just reuse it to avoid double - * linked. */ + /* + * If we have a vaid "rq_set", just reuse it to avoid double + * linked. + */ LASSERT(req->rq_phase == RQ_PHASE_NEW); LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY); @@ -295,7 +296,7 @@ void ptlrpcd_add_req(struct ptlrpc_request *req) pc = ptlrpcd_select_pc(req); - DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]", + DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s+%d]", req, pc->pc_name, pc->pc_index); ptlrpc_set_add_new_req(pc, req); @@ -313,12 +314,12 @@ static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set) */ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) { - struct list_head *tmp, *pos; - struct ptlrpc_request *req; - struct ptlrpc_request_set *set = pc->pc_set; - int rc = 0; - int rc2; - ENTRY; + struct ptlrpc_request *req, *tmp; + struct ptlrpc_request_set *set = pc->pc_set; + int rc = 0; + int rc2; + + ENTRY; if (atomic_read(&set->set_new_count)) { spin_lock(&set->set_new_req_lock); @@ -336,7 +337,8 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) spin_unlock(&set->set_new_req_lock); } - /* We should call lu_env_refill() before handling new requests to make + /* + * We should call lu_env_refill() before handling new requests to make * sure that env key the requests depending on really exists. */ rc2 = lu_env_refill(env); @@ -358,10 +360,11 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) if (atomic_read(&set->set_remaining)) rc |= ptlrpc_check_set(env, set); - /* NB: ptlrpc_check_set has already moved complted request at the - * head of seq::set_requests */ - list_for_each_safe(pos, tmp, &set->set_requests) { - req = list_entry(pos, struct ptlrpc_request, rq_set_chain); + /* + * NB: ptlrpc_check_set has already moved complted request at the + * head of seq::set_requests + */ + list_for_each_entry_safe(req, tmp, &set->set_requests, rq_set_chain) { if (req->rq_phase != RQ_PHASE_COMPLETE) break; @@ -376,19 +379,21 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) */ rc = atomic_read(&set->set_new_count); - /* If we have nothing to do, check whether we can take some - * work from our partner threads. */ - if (rc == 0 && pc->pc_npartners > 0) { - struct ptlrpcd_ctl *partner; - struct ptlrpc_request_set *ps; - int first = pc->pc_cursor; - - do { - partner = pc->pc_partners[pc->pc_cursor++]; - if (pc->pc_cursor >= pc->pc_npartners) - pc->pc_cursor = 0; - if (partner == NULL) - continue; + /* + * If we have nothing to do, check whether we can take some + * work from our partner threads. + */ + if (rc == 0 && pc->pc_npartners > 0) { + struct ptlrpcd_ctl *partner; + struct ptlrpc_request_set *ps; + int first = pc->pc_cursor; + + do { + partner = pc->pc_partners[pc->pc_cursor++]; + if (pc->pc_cursor >= pc->pc_npartners) + pc->pc_cursor = 0; + if (partner == NULL) + continue; spin_lock(&partner->pc_lock); ps = partner->pc_set; @@ -403,8 +408,8 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) if (atomic_read(&ps->set_new_count)) { rc = ptlrpcd_steal_rqset(set, ps); if (rc > 0) - CDEBUG(D_RPCTRACE, "transfer %d" - " async RPCs [%d->%d]\n", + CDEBUG(D_RPCTRACE, + "transfer %d async RPCs [%d->%d]\n", rc, partner->pc_index, pc->pc_index); } @@ -420,7 +425,6 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) * Main ptlrpcd thread. * ptlrpc's code paths like to execute in process context, so we have this * thread which spins on a set which contains the rpcs and sends them. - * */ static int ptlrpcd(void *arg) { @@ -430,11 +434,9 @@ static int ptlrpcd(void *arg) struct lu_env env = { .le_ses = &ses }; int rc = 0; int exit = 0; - ENTRY; - - unshare_fs_struct(); - if (cfs_cpt_bind(cfs_cpt_table, pc->pc_cpt) != 0) + ENTRY; + if (cfs_cpt_bind(cfs_cpt_tab, pc->pc_cpt) != 0) CWARN("Failed to bind %s on CPT %d\n", pc->pc_name, pc->pc_cpt); /* @@ -468,23 +470,39 @@ static int ptlrpcd(void *arg) complete(&pc->pc_starting); - /* - * This mainloop strongly resembles ptlrpc_set_wait() except that our - * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when - * there are requests in the set. New requests come in on the set's - * new_req_list and ptlrpcd_check() moves them into the set. - */ - do { - struct l_wait_info lwi; + /* + * This mainloop strongly resembles ptlrpc_set_wait() except that our + * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when + * there are requests in the set. New requests come in on the set's + * new_req_list and ptlrpcd_check() moves them into the set. + */ + do { + DEFINE_WAIT_FUNC(wait, woken_wake_function); time64_t timeout; - timeout = ptlrpc_set_next_timeout(set); - lwi = LWI_TIMEOUT(cfs_time_seconds(timeout), - ptlrpc_expired_set, set); + timeout = ptlrpc_set_next_timeout(set); lu_context_enter(&env.le_ctx); lu_context_enter(env.le_ses); - l_wait_event(set->set_waitq, ptlrpcd_check(&env, pc), &lwi); + + add_wait_queue(&set->set_waitq, &wait); + while (!ptlrpcd_check(&env, pc)) { + int ret; + + if (timeout == 0) + ret = wait_woken(&wait, TASK_IDLE, + MAX_SCHEDULE_TIMEOUT); + else + ret = wait_woken(&wait, TASK_IDLE, + cfs_time_seconds(timeout)); + if (ret != 0) + continue; + /* Timed out */ + ptlrpc_expired_set(set); + break; + } + remove_wait_queue(&set->set_waitq, &wait); + lu_context_exit(&env.le_ctx); lu_context_exit(env.le_ses); @@ -493,15 +511,15 @@ static int ptlrpcd(void *arg) */ if (test_bit(LIOD_STOP, &pc->pc_flags)) { if (test_bit(LIOD_FORCE, &pc->pc_flags)) - ptlrpc_abort_set(set); - exit++; - } + ptlrpc_abort_set(set); + exit++; + } - /* - * Let's make one more loop to make sure that ptlrpcd_check() - * copied all raced new rpcs into the set so we can kill them. - */ - } while (exit < 2); + /* + * Let's make one more loop to make sure that ptlrpcd_check() + * copied all raced new rpcs into the set so we can kill them. + */ + } while (exit < 2); /* * Wait for inflight requests to drain. @@ -567,6 +585,7 @@ static int ptlrpcd_partners(struct ptlrpcd *pd, int index) int first; int i; int rc = 0; + ENTRY; LASSERT(index >= 0 && index < pd->pd_nthreads); @@ -576,7 +595,7 @@ static int ptlrpcd_partners(struct ptlrpcd *pd, int index) if (pc->pc_npartners <= 0) GOTO(out, rc); - OBD_CPT_ALLOC(pc->pc_partners, cfs_cpt_table, pc->pc_cpt, + OBD_CPT_ALLOC(pc->pc_partners, cfs_cpt_tab, pc->pc_cpt, sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners); if (pc->pc_partners == NULL) { pc->pc_npartners = 0; @@ -597,6 +616,7 @@ int ptlrpcd_start(struct ptlrpcd_ctl *pc) { struct task_struct *task; int rc = 0; + ENTRY; /* @@ -608,7 +628,7 @@ int ptlrpcd_start(struct ptlrpcd_ctl *pc) RETURN(0); } - task = kthread_run(ptlrpcd, pc, pc->pc_name); + task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name); if (IS_ERR(task)) GOTO(out_set, rc = PTR_ERR(task)); @@ -653,6 +673,7 @@ out: void ptlrpcd_free(struct ptlrpcd_ctl *pc) { struct ptlrpc_request_set *set = pc->pc_set; + ENTRY; if (!test_bit(LIOD_START, &pc->pc_flags)) { @@ -672,16 +693,15 @@ void ptlrpcd_free(struct ptlrpcd_ctl *pc) clear_bit(LIOD_FORCE, &pc->pc_flags); out: - if (pc->pc_npartners > 0) { - LASSERT(pc->pc_partners != NULL); - - OBD_FREE(pc->pc_partners, - sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners); - pc->pc_partners = NULL; - } - pc->pc_npartners = 0; + if (pc->pc_npartners > 0) { + LASSERT(pc->pc_partners != NULL); + + OBD_FREE_PTR_ARRAY(pc->pc_partners, pc->pc_npartners); + pc->pc_partners = NULL; + } + pc->pc_npartners = 0; pc->pc_error = 0; - EXIT; + EXIT; } static void ptlrpcd_fini(void) @@ -689,6 +709,7 @@ static void ptlrpcd_fini(void) int i; int j; int ncpts; + ENTRY; if (ptlrpcds != NULL) { @@ -702,7 +723,7 @@ static void ptlrpcd_fini(void) OBD_FREE(ptlrpcds[i], ptlrpcds[i]->pd_size); ptlrpcds[i] = NULL; } - OBD_FREE(ptlrpcds, sizeof(ptlrpcds[0]) * ptlrpcds_num); + OBD_FREE_PTR_ARRAY(ptlrpcds, ptlrpcds_num); } ptlrpcds_num = 0; @@ -710,8 +731,8 @@ static void ptlrpcd_fini(void) ptlrpcd_free(&ptlrpcd_rcv); if (ptlrpcds_cpt_idx != NULL) { - ncpts = cfs_cpt_number(cfs_cpt_table); - OBD_FREE(ptlrpcds_cpt_idx, ncpts * sizeof(ptlrpcds_cpt_idx[0])); + ncpts = cfs_cpt_number(cfs_cpt_tab); + OBD_FREE_PTR_ARRAY(ptlrpcds_cpt_idx, ncpts); ptlrpcds_cpt_idx = NULL; } @@ -731,15 +752,16 @@ static int ptlrpcd_init(void) int ncpts; int cpt; struct ptlrpcd *pd; + ENTRY; /* * Determine the CPTs that ptlrpcd threads will run on. */ - cptable = cfs_cpt_table; + cptable = cfs_cpt_tab; ncpts = cfs_cpt_number(cptable); if (ptlrpcd_cpts != NULL) { - struct cfs_expr_list *el; + struct cfs_expr_list *el; size = ncpts * sizeof(ptlrpcds_cpt_idx[0]); OBD_ALLOC(ptlrpcds_cpt_idx, size); @@ -884,6 +906,7 @@ static int ptlrpcd_init(void) size = offsetof(struct ptlrpcd, pd_threads[nthreads]); OBD_CPT_ALLOC(pd, cptable, cpt, size); + if (!pd) GOTO(out, rc = -ENOMEM); pd->pd_size = size; @@ -940,25 +963,26 @@ out: int ptlrpcd_addref(void) { - int rc = 0; - ENTRY; + int rc = 0; + + ENTRY; mutex_lock(&ptlrpcd_mutex); - if (++ptlrpcd_users == 1) { + if (++ptlrpcd_users == 1) { rc = ptlrpcd_init(); if (rc < 0) ptlrpcd_users--; } mutex_unlock(&ptlrpcd_mutex); - RETURN(rc); + RETURN(rc); } EXPORT_SYMBOL(ptlrpcd_addref); void ptlrpcd_decref(void) { mutex_lock(&ptlrpcd_mutex); - if (--ptlrpcd_users == 0) - ptlrpcd_fini(); + if (--ptlrpcd_users == 0) + ptlrpcd_fini(); mutex_unlock(&ptlrpcd_mutex); } EXPORT_SYMBOL(ptlrpcd_decref);