X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fnrs.c;h=52d3225deba6bb7ed1c7193cd87539cc3931b846;hp=ccfc4e96c813ce8e679b2760b1196045529964c1;hb=HEAD;hpb=f77e53d3656504c804fe3dd0a3fb72080229b648 diff --git a/lustre/ptlrpc/nrs.c b/lustre/ptlrpc/nrs.c index ccfc4e9..3f5b59d 100644 --- a/lustre/ptlrpc/nrs.c +++ b/lustre/ptlrpc/nrs.c @@ -61,6 +61,7 @@ static int nrs_policy_init(struct ptlrpc_nrs_policy *policy) static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy) { LASSERT(policy->pol_ref == 0); + LASSERT(refcount_read(&policy->pol_start_ref) == 0); LASSERT(policy->pol_req_queued == 0); if (policy->pol_desc->pd_ops->op_policy_fini != NULL) @@ -96,8 +97,10 @@ static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy) policy->pol_req_started == 0); policy->pol_private = NULL; + policy->pol_arg[0] = '\0'; policy->pol_state = NRS_POL_STATE_STOPPED; + wake_up(&policy->pol_wq); if (atomic_dec_and_test(&policy->pol_desc->pd_refs)) module_put(policy->pol_desc->pd_owner); @@ -105,6 +108,26 @@ static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy) EXIT; } +/** + * Increases the policy's usage started reference count. + */ +static inline void nrs_policy_started_get(struct ptlrpc_nrs_policy *policy) +{ + refcount_inc(&policy->pol_start_ref); +} + +/** + * Decreases the policy's usage started reference count, and stops the policy + * in case it was already stopping and have no more outstanding usage + * references (which indicates it has no more queued or started requests, and + * can be safely stopped). + */ +static void nrs_policy_started_put(struct ptlrpc_nrs_policy *policy) +{ + if (refcount_dec_and_test(&policy->pol_start_ref)) + nrs_policy_stop0(policy); +} + static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy) { struct ptlrpc_nrs *nrs = policy->pol_nrs; @@ -131,9 +154,18 @@ static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy) nrs->nrs_policy_fallback = NULL; } - /* I have the only refcount */ - if (policy->pol_ref == 1) - nrs_policy_stop0(policy); + /* Drop started ref and wait for requests to be drained */ + spin_unlock(&nrs->nrs_lock); + nrs_policy_started_put(policy); + + wait_event_timeout(policy->pol_wq, + policy->pol_state == NRS_POL_STATE_STOPPED, + cfs_time_seconds(30)); + + spin_lock(&nrs->nrs_lock); + + if (policy->pol_state != NRS_POL_STATE_STOPPED) + RETURN(-EBUSY); RETURN(0); } @@ -165,8 +197,10 @@ static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs) LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED); tmp->pol_state = NRS_POL_STATE_STOPPING; - if (tmp->pol_ref == 0) - nrs_policy_stop0(tmp); + /* Drop started ref to free the policy */ + spin_unlock(&nrs->nrs_lock); + nrs_policy_started_put(tmp); + spin_lock(&nrs->nrs_lock); EXIT; } @@ -191,8 +225,10 @@ static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs) */ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg) { - struct ptlrpc_nrs *nrs = policy->pol_nrs; - int rc = 0; + struct ptlrpc_nrs *nrs = policy->pol_nrs; + struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt; + char *srv_name = svcpt->scp_service->srv_name; + int rc = 0; ENTRY; /** @@ -207,6 +243,13 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg) if (policy->pol_state == NRS_POL_STATE_STOPPING) RETURN(-EAGAIN); + if (arg && strlen(arg) >= sizeof(policy->pol_arg)) { + rc = -EINVAL; + CWARN("%s.%d NRS: arg '%s' is too long: rc = %d\n", + srv_name, svcpt->scp_cpt, arg, rc); + return rc; + } + if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) { /** * This is for cases in which the user sets the policy to the @@ -239,16 +282,13 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg) * stop the policy first and start it again with the new * argument. */ - if ((arg != NULL) && (strlen(arg) >= NRS_POL_ARG_MAX)) - return -EINVAL; - if ((arg == NULL && strlen(policy->pol_arg) == 0) || (arg != NULL && strcmp(policy->pol_arg, arg) == 0)) RETURN(0); rc = nrs_policy_stop_locked(policy); if (rc) - RETURN(-EAGAIN); + RETURN(rc); } } @@ -259,9 +299,10 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg) if (atomic_inc_return(&policy->pol_desc->pd_refs) == 1 && !try_module_get(policy->pol_desc->pd_owner)) { atomic_dec(&policy->pol_desc->pd_refs); - CERROR("NRS: cannot get module for policy %s; is it alive?\n", - policy->pol_desc->pd_name); - RETURN(-ENODEV); + rc = -ENODEV; + CERROR("%s.%d NRS: cannot get module for policy %s (is it alive?): rc = %d\n", + srv_name, svcpt->scp_cpt, policy->pol_desc->pd_name, rc); + RETURN(rc); } /** @@ -286,16 +327,11 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg) } } - if (arg != NULL) { - if (strlcpy(policy->pol_arg, arg, sizeof(policy->pol_arg)) >= - sizeof(policy->pol_arg)) { - CERROR("NRS: arg '%s' is too long\n", arg); - GOTO(out, rc = -E2BIG); - } - } else { - policy->pol_arg[0] = '\0'; - } + if (arg) + strscpy(policy->pol_arg, arg, sizeof(policy->pol_arg)); + /* take the started reference */ + refcount_set(&policy->pol_start_ref, 1); policy->pol_state = NRS_POL_STATE_STARTED; if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) { @@ -322,34 +358,23 @@ out: } /** - * Increases the policy's usage reference count. + * Increases the policy's usage reference count (caller count). */ static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy) +__must_hold(&policy->pol_nrs->nrs_lock) { policy->pol_ref++; } /** - * Decreases the policy's usage reference count, and stops the policy in case it - * was already stopping and have no more outstanding usage references (which - * indicates it has no more queued or started requests, and can be safely - * stopped). + * Decreases the policy's usage reference count. */ static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy) +__must_hold(&policy->pol_nrs->nrs_lock) { LASSERT(policy->pol_ref > 0); policy->pol_ref--; - if (unlikely(policy->pol_ref == 0 && - policy->pol_state == NRS_POL_STATE_STOPPING)) - nrs_policy_stop0(policy); -} - -static void nrs_policy_put(struct ptlrpc_nrs_policy *policy) -{ - spin_lock(&policy->pol_nrs->nrs_lock); - nrs_policy_put_locked(policy); - spin_unlock(&policy->pol_nrs->nrs_lock); } /** @@ -472,11 +497,11 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs, spin_lock(&nrs->nrs_lock); fallback = nrs->nrs_policy_fallback; - nrs_policy_get_locked(fallback); + nrs_policy_started_get(fallback); primary = nrs->nrs_policy_primary; if (primary != NULL) - nrs_policy_get_locked(primary); + nrs_policy_started_get(primary); spin_unlock(&nrs->nrs_lock); @@ -496,7 +521,7 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs, * request. */ if (resp[NRS_RES_PRIMARY] == NULL) - nrs_policy_put(primary); + nrs_policy_started_put(primary); } } @@ -513,8 +538,7 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs, static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp) { struct ptlrpc_nrs_policy *pols[NRS_RES_MAX]; - struct ptlrpc_nrs *nrs = NULL; - int i; + int i; for (i = 0; i < NRS_RES_MAX; i++) { if (resp[i] != NULL) { @@ -530,15 +554,8 @@ static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp) if (pols[i] == NULL) continue; - if (nrs == NULL) { - nrs = pols[i]->pol_nrs; - spin_lock(&nrs->nrs_lock); - } - nrs_policy_put_locked(pols[i]); + nrs_policy_started_put(pols[i]); } - - if (nrs != NULL) - spin_unlock(&nrs->nrs_lock); } /** @@ -565,6 +582,10 @@ struct ptlrpc_nrs_request * nrs_request_get(struct ptlrpc_nrs_policy *policy, LASSERT(policy->pol_req_queued > 0); + /* for a non-started policy, use force mode to drain requests */ + if (unlikely(policy->pol_state != NRS_POL_STATE_STARTED)) + force = true; + nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force); LASSERT(ergo(nrq != NULL, nrs_request_policy(nrq) == policy)); @@ -603,6 +624,11 @@ static inline void nrs_request_enqueue(struct ptlrpc_nrs_request *nrq) if (rc == 0) { policy->pol_nrs->nrs_req_queued++; policy->pol_req_queued++; + /** + * Take an extra ref to avoid stopping policy with + * pending request in it + */ + nrs_policy_started_get(policy); return; } } @@ -709,48 +735,57 @@ out: static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name) { struct ptlrpc_nrs_policy *policy = NULL; + struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt; + char *srv_name = svcpt->scp_service->srv_name; + int rc = 0; ENTRY; spin_lock(&nrs->nrs_lock); policy = nrs_policy_find_locked(nrs, name); if (policy == NULL) { - spin_unlock(&nrs->nrs_lock); - - CERROR("Can't find NRS policy %s\n", name); - RETURN(-ENOENT); + rc = -ENOENT; + CERROR("%s.%d NRS: cannot find policy '%s': rc = %d\n", + srv_name, svcpt->scp_cpt, name, rc); + GOTO(out_unlock, rc); } if (policy->pol_ref > 1) { - CERROR("Policy %s is busy with %d references\n", name, - (int)policy->pol_ref); - nrs_policy_put_locked(policy); - - spin_unlock(&nrs->nrs_lock); - RETURN(-EBUSY); + rc = -EBUSY; + CERROR("%s.%d NRS: policy '%s' is busy with %ld references: rc = %d\n", + srv_name, svcpt->scp_cpt, name, policy->pol_ref, rc); + GOTO(out_put, rc); } LASSERT(policy->pol_req_queued == 0); LASSERT(policy->pol_req_started == 0); if (policy->pol_state != NRS_POL_STATE_STOPPED) { - nrs_policy_stop_locked(policy); - LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED); + rc = nrs_policy_stop_locked(policy); + if (rc) { + CERROR("%s.%d NRS: failed to stop policy '%s' with refcount %d: rc = %d\n", + srv_name, svcpt->scp_cpt, name, + refcount_read(&policy->pol_start_ref), rc); + GOTO(out_put, rc); + } } + LASSERT(policy->pol_private == NULL); list_del(&policy->pol_list); nrs->nrs_num_pols--; + EXIT; +out_put: nrs_policy_put_locked(policy); - +out_unlock: spin_unlock(&nrs->nrs_lock); - nrs_policy_fini(policy); - - LASSERT(policy->pol_private == NULL); - OBD_FREE_PTR(policy); + if (rc == 0) { + nrs_policy_fini(policy); + OBD_FREE_PTR(policy); + } - RETURN(0); + return rc; } /** @@ -766,10 +801,11 @@ static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name) static int nrs_policy_register(struct ptlrpc_nrs *nrs, struct ptlrpc_nrs_pol_desc *desc) { - struct ptlrpc_nrs_policy *policy; - struct ptlrpc_nrs_policy *tmp; - struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt; - int rc; + struct ptlrpc_nrs_policy *policy; + struct ptlrpc_nrs_policy *tmp; + struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt; + char *srv_name = svcpt->scp_service->srv_name; + int rc; ENTRY; LASSERT(svcpt != NULL); @@ -793,6 +829,8 @@ static int nrs_policy_register(struct ptlrpc_nrs *nrs, INIT_LIST_HEAD(&policy->pol_list); INIT_LIST_HEAD(&policy->pol_list_queued); + init_waitqueue_head(&policy->pol_wq); + rc = nrs_policy_init(policy); if (rc != 0) { OBD_FREE_PTR(policy); @@ -803,16 +841,17 @@ static int nrs_policy_register(struct ptlrpc_nrs *nrs, tmp = nrs_policy_find_locked(nrs, policy->pol_desc->pd_name); if (tmp != NULL) { - CERROR("NRS policy %s has been registered, can't register it " - "for %s\n", policy->pol_desc->pd_name, - svcpt->scp_service->srv_name); + rc = -EEXIST; + CERROR("%s.%d NRS: policy %s has been registered, can't register it: rc = %d\n", + srv_name, svcpt->scp_cpt, policy->pol_desc->pd_name, + rc); nrs_policy_put_locked(tmp); spin_unlock(&nrs->nrs_lock); nrs_policy_fini(policy); OBD_FREE_PTR(policy); - RETURN(-EEXIST); + RETURN(rc); } list_add_tail(&policy->pol_list, &nrs->nrs_policy_list); @@ -918,10 +957,9 @@ static int nrs_register_policies_locked(struct ptlrpc_nrs *nrs) if (nrs_policy_compatible(svc, desc)) { rc = nrs_policy_register(nrs, desc); if (rc != 0) { - CERROR("Failed to register NRS policy %s for " - "partition %d of service %s: %d\n", - desc->pd_name, svcpt->scp_cpt, - svc->srv_name, rc); + CERROR("%s.%d NRS: Failed to register policy %s: rc = %d\n", + svc->srv_name, svcpt->scp_cpt, + desc->pd_name, rc); /** * Fail registration if any of the policies' * registration fails. @@ -1117,6 +1155,7 @@ static int nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc) continue; ptlrpc_service_for_each_part(svcpt, i, svc) { + char *srv_name = svcpt->scp_service->srv_name; bool hp = false; again: @@ -1129,10 +1168,9 @@ again: if (rc == -ENOENT) { rc = 0; } else if (rc != 0) { - CERROR("Failed to unregister NRS policy %s for " - "partition %d of service %s: %d\n", - desc->pd_name, svcpt->scp_cpt, - svcpt->scp_service->srv_name, rc); + CERROR("%s.%d NRS: Failed to unregister policy %s: rc = %d\n", + srv_name, svcpt->scp_cpt, desc->pd_name, + rc); RETURN(rc); } @@ -1193,27 +1231,29 @@ static int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf) if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) && (conf->nc_flags & (PTLRPC_NRS_FL_FALLBACK | PTLRPC_NRS_FL_REG_START))) { + rc = -EINVAL; CERROR("NRS: failing to register policy %s. Please check " "policy flags; external policies cannot act as fallback " "policies, or be started immediately upon registration " - "without interaction with lprocfs\n", conf->nc_name); - RETURN(-EINVAL); + "without interaction with lprocfs: rc = %d\n", + conf->nc_name, rc); + RETURN(rc); } mutex_lock(&nrs_core.nrs_mutex); if (nrs_policy_find_desc_locked(conf->nc_name) != NULL) { - CERROR("NRS: failing to register policy %s which has already " - "been registered with NRS core!\n", - conf->nc_name); - GOTO(fail, rc = -EEXIST); + rc = -EEXIST; + CERROR("NRS: failing to register policy %s which has already been registered with NRS core: rc = %d\n", + conf->nc_name, rc); + GOTO(fail, rc); } OBD_ALLOC_PTR(desc); if (desc == NULL) GOTO(fail, rc = -ENOMEM); - if (strlcpy(desc->pd_name, conf->nc_name, sizeof(desc->pd_name)) >= + if (strscpy(desc->pd_name, conf->nc_name, sizeof(desc->pd_name)) >= sizeof(desc->pd_name)) { OBD_FREE_PTR(desc); GOTO(fail, rc = -E2BIG); @@ -1252,16 +1292,16 @@ static int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf) continue; ptlrpc_service_for_each_part(svcpt, i, svc) { - struct ptlrpc_nrs *nrs; - bool hp = false; + struct ptlrpc_nrs *nrs; + char *srv_name = svcpt->scp_service->srv_name; + bool hp = false; again: nrs = nrs_svcpt2nrs(svcpt, hp); rc = nrs_policy_register(nrs, desc); if (rc != 0) { - CERROR("Failed to register NRS policy %s for " - "partition %d of service %s: %d\n", - desc->pd_name, svcpt->scp_cpt, - svcpt->scp_service->srv_name, rc); + CERROR("%s.%d NRS: Failed to register policy %s: rc = %d\n", + srv_name, svcpt->scp_cpt, + desc->pd_name, rc); rc2 = nrs_policy_unregister_locked(desc); /** @@ -1495,6 +1535,9 @@ static void nrs_request_removed(struct ptlrpc_nrs_policy *policy) list_move_tail(&policy->pol_list_queued, &policy->pol_nrs->nrs_policy_queued); } + + /* remove the extra ref for policy pending requests */ + nrs_policy_started_put(policy); } /** @@ -1782,5 +1825,3 @@ void ptlrpc_nrs_fini(void) OBD_FREE_PTR(desc); } } - -/** @} nrs */