X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fptlrpcd.c;h=914f3261b09e7ad33cb239b18cbb829720fe8c1d;hb=4e5855ae4dea7954ce1891cd23abce033fe23f03;hp=8a9f04955170584d082c8b68428faa1f663e6bfd;hpb=e920be6814512b1aa8696ea36d697d3b698c13e8;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index 8a9f049..914f326 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -23,7 +23,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2015, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -224,9 +224,11 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) if (count == i) { wake_up(&new->set_waitq); - /* XXX: It maybe unnecessary to wakeup all the partners. But to + /* + * XXX: It maybe unnecessary to wakeup all the partners. But to * guarantee the async RPC can be processed ASAP, we have - * no other better choice. It maybe fixed in future. */ + * no other better choice. It maybe fixed in future. + */ for (i = 0; i < pc->pc_npartners; i++) wake_up(&pc->pc_partners[i]->pc_set->set_waitq); } @@ -236,7 +238,7 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) * Return transferred RPCs count. */ static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des, - struct ptlrpc_request_set *src) + struct ptlrpc_request_set *src) { struct list_head *tmp, *pos; struct ptlrpc_request *req; @@ -272,15 +274,18 @@ void ptlrpcd_add_req(struct ptlrpc_request *req) spin_lock(&req->rq_lock); if (req->rq_invalid_rqset) { - struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(5), - back_to_sleep, NULL); - req->rq_invalid_rqset = 0; spin_unlock(&req->rq_lock); - l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi); + if (wait_event_idle_timeout(req->rq_set_waitq, + req->rq_set == NULL, + cfs_time_seconds(5)) == 0) + l_wait_event_abortable(req->rq_set_waitq, + req->rq_set == NULL); } else if (req->rq_set) { - /* If we have a vaid "rq_set", just reuse it to avoid double - * linked. */ + /* + * If we have a vaid "rq_set", just reuse it to avoid double + * linked. + */ LASSERT(req->rq_phase == RQ_PHASE_NEW); LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY); @@ -295,7 +300,7 @@ void ptlrpcd_add_req(struct ptlrpc_request *req) pc = ptlrpcd_select_pc(req); - DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]", + DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s+%d]", req, pc->pc_name, pc->pc_index); ptlrpc_set_add_new_req(pc, req); @@ -314,11 +319,12 @@ static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set) static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) { struct list_head *tmp, *pos; - struct ptlrpc_request *req; - struct ptlrpc_request_set *set = pc->pc_set; - int rc = 0; - int rc2; - ENTRY; + struct ptlrpc_request *req; + struct ptlrpc_request_set *set = pc->pc_set; + int rc = 0; + int rc2; + + ENTRY; if (atomic_read(&set->set_new_count)) { spin_lock(&set->set_new_req_lock); @@ -336,7 +342,8 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) spin_unlock(&set->set_new_req_lock); } - /* We should call lu_env_refill() before handling new requests to make + /* + * We should call lu_env_refill() before handling new requests to make * sure that env key the requests depending on really exists. */ rc2 = lu_env_refill(env); @@ -358,8 +365,10 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) if (atomic_read(&set->set_remaining)) rc |= ptlrpc_check_set(env, set); - /* NB: ptlrpc_check_set has already moved complted request at the - * head of seq::set_requests */ + /* + * NB: ptlrpc_check_set has already moved complted request at the + * head of seq::set_requests + */ list_for_each_safe(pos, tmp, &set->set_requests) { req = list_entry(pos, struct ptlrpc_request, rq_set_chain); if (req->rq_phase != RQ_PHASE_COMPLETE) @@ -376,19 +385,21 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) */ rc = atomic_read(&set->set_new_count); - /* If we have nothing to do, check whether we can take some - * work from our partner threads. */ - if (rc == 0 && pc->pc_npartners > 0) { - struct ptlrpcd_ctl *partner; - struct ptlrpc_request_set *ps; - int first = pc->pc_cursor; - - do { - partner = pc->pc_partners[pc->pc_cursor++]; - if (pc->pc_cursor >= pc->pc_npartners) - pc->pc_cursor = 0; - if (partner == NULL) - continue; + /* + * If we have nothing to do, check whether we can take some + * work from our partner threads. + */ + if (rc == 0 && pc->pc_npartners > 0) { + struct ptlrpcd_ctl *partner; + struct ptlrpc_request_set *ps; + int first = pc->pc_cursor; + + do { + partner = pc->pc_partners[pc->pc_cursor++]; + if (pc->pc_cursor >= pc->pc_npartners) + pc->pc_cursor = 0; + if (partner == NULL) + continue; spin_lock(&partner->pc_lock); ps = partner->pc_set; @@ -403,8 +414,8 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) if (atomic_read(&ps->set_new_count)) { rc = ptlrpcd_steal_rqset(set, ps); if (rc > 0) - CDEBUG(D_RPCTRACE, "transfer %d" - " async RPCs [%d->%d]\n", + CDEBUG(D_RPCTRACE, + "transfer %d async RPCs [%d->%d]\n", rc, partner->pc_index, pc->pc_index); } @@ -420,7 +431,6 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) * Main ptlrpcd thread. * ptlrpc's code paths like to execute in process context, so we have this * thread which spins on a set which contains the rpcs and sends them. - * */ static int ptlrpcd(void *arg) { @@ -430,6 +440,7 @@ static int ptlrpcd(void *arg) struct lu_env env = { .le_ses = &ses }; int rc = 0; int exit = 0; + ENTRY; unshare_fs_struct(); @@ -468,23 +479,27 @@ static int ptlrpcd(void *arg) complete(&pc->pc_starting); - /* - * This mainloop strongly resembles ptlrpc_set_wait() except that our - * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when - * there are requests in the set. New requests come in on the set's - * new_req_list and ptlrpcd_check() moves them into the set. - */ - do { - struct l_wait_info lwi; + /* + * This mainloop strongly resembles ptlrpc_set_wait() except that our + * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when + * there are requests in the set. New requests come in on the set's + * new_req_list and ptlrpcd_check() moves them into the set. + */ + do { time64_t timeout; - timeout = ptlrpc_set_next_timeout(set); - lwi = LWI_TIMEOUT(cfs_time_seconds(timeout), - ptlrpc_expired_set, set); + timeout = ptlrpc_set_next_timeout(set); lu_context_enter(&env.le_ctx); lu_context_enter(env.le_ses); - l_wait_event(set->set_waitq, ptlrpcd_check(&env, pc), &lwi); + if (timeout == 0) + wait_event_idle(set->set_waitq, + ptlrpcd_check(&env, pc)); + else if (wait_event_idle_timeout(set->set_waitq, + ptlrpcd_check(&env, pc), + cfs_time_seconds(timeout)) + == 0) + ptlrpc_expired_set(set); lu_context_exit(&env.le_ctx); lu_context_exit(env.le_ses); @@ -493,21 +508,21 @@ static int ptlrpcd(void *arg) */ if (test_bit(LIOD_STOP, &pc->pc_flags)) { if (test_bit(LIOD_FORCE, &pc->pc_flags)) - ptlrpc_abort_set(set); - exit++; - } - - /* - * Let's make one more loop to make sure that ptlrpcd_check() - * copied all raced new rpcs into the set so we can kill them. - */ - } while (exit < 2); - - /* - * Wait for inflight requests to drain. - */ + ptlrpc_abort_set(set); + exit++; + } + + /* + * Let's make one more loop to make sure that ptlrpcd_check() + * copied all raced new rpcs into the set so we can kill them. + */ + } while (exit < 2); + + /* + * Wait for inflight requests to drain. + */ if (!list_empty(&set->set_requests)) - ptlrpc_set_wait(set); + ptlrpc_set_wait(&env, set); lu_context_fini(&env.le_ctx); lu_context_fini(env.le_ses); @@ -567,6 +582,7 @@ static int ptlrpcd_partners(struct ptlrpcd *pd, int index) int first; int i; int rc = 0; + ENTRY; LASSERT(index >= 0 && index < pd->pd_nthreads); @@ -597,6 +613,7 @@ int ptlrpcd_start(struct ptlrpcd_ctl *pc) { struct task_struct *task; int rc = 0; + ENTRY; /* @@ -653,6 +670,7 @@ out: void ptlrpcd_free(struct ptlrpcd_ctl *pc) { struct ptlrpc_request_set *set = pc->pc_set; + ENTRY; if (!test_bit(LIOD_START, &pc->pc_flags)) { @@ -672,16 +690,16 @@ void ptlrpcd_free(struct ptlrpcd_ctl *pc) clear_bit(LIOD_FORCE, &pc->pc_flags); out: - if (pc->pc_npartners > 0) { - LASSERT(pc->pc_partners != NULL); - - OBD_FREE(pc->pc_partners, - sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners); - pc->pc_partners = NULL; - } - pc->pc_npartners = 0; + if (pc->pc_npartners > 0) { + LASSERT(pc->pc_partners != NULL); + + OBD_FREE(pc->pc_partners, + sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners); + pc->pc_partners = NULL; + } + pc->pc_npartners = 0; pc->pc_error = 0; - EXIT; + EXIT; } static void ptlrpcd_fini(void) @@ -689,6 +707,7 @@ static void ptlrpcd_fini(void) int i; int j; int ncpts; + ENTRY; if (ptlrpcds != NULL) { @@ -731,6 +750,7 @@ static int ptlrpcd_init(void) int ncpts; int cpt; struct ptlrpcd *pd; + ENTRY; /* @@ -739,7 +759,7 @@ static int ptlrpcd_init(void) cptable = cfs_cpt_table; ncpts = cfs_cpt_number(cptable); if (ptlrpcd_cpts != NULL) { - struct cfs_expr_list *el; + struct cfs_expr_list *el; size = ncpts * sizeof(ptlrpcds_cpt_idx[0]); OBD_ALLOC(ptlrpcds_cpt_idx, size); @@ -884,6 +904,7 @@ static int ptlrpcd_init(void) size = offsetof(struct ptlrpcd, pd_threads[nthreads]); OBD_CPT_ALLOC(pd, cptable, cpt, size); + if (!pd) GOTO(out, rc = -ENOMEM); pd->pd_size = size; @@ -940,25 +961,26 @@ out: int ptlrpcd_addref(void) { - int rc = 0; - ENTRY; + int rc = 0; + + ENTRY; mutex_lock(&ptlrpcd_mutex); - if (++ptlrpcd_users == 1) { + if (++ptlrpcd_users == 1) { rc = ptlrpcd_init(); if (rc < 0) ptlrpcd_users--; } mutex_unlock(&ptlrpcd_mutex); - RETURN(rc); + RETURN(rc); } EXPORT_SYMBOL(ptlrpcd_addref); void ptlrpcd_decref(void) { mutex_lock(&ptlrpcd_mutex); - if (--ptlrpcd_users == 0) - ptlrpcd_fini(); + if (--ptlrpcd_users == 0) + ptlrpcd_fini(); mutex_unlock(&ptlrpcd_mutex); } EXPORT_SYMBOL(ptlrpcd_decref);