From 52133d804b2af4279fc8aa51eab09d0f231dce96 Mon Sep 17 00:00:00 2001 From: fanyong Date: Thu, 21 May 2009 03:37:46 +0000 Subject: [PATCH] Branch HEAD b=19450 i=tianzy i=andrew.perepechko i=vladimir.saveliev quota_setinfo after connection between MDS and OST complete. --- lustre/include/lustre/lustre_idl.h | 3 +- lustre/ldlm/ldlm_lib.c | 32 +++++--------- lustre/obdfilter/filter.c | 4 +- lustre/osc/osc_request.c | 6 --- lustre/ptlrpc/pack_generic.c | 89 +++++++++++++++----------------------- lustre/quota/quota_context.c | 54 ++++++++++++----------- lustre/quota/quota_ctl.c | 2 +- lustre/quota/quota_interface.c | 57 +++++++++++++++--------- 8 files changed, 115 insertions(+), 132 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index bf8fad1..d16508d 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -2633,8 +2633,7 @@ struct qunit_data { #define QDATA_CLR_CHANGE_QS(qdata) ((qdata)->qd_flags &= ~LQUOTA_FLAGS_CHG_QS) extern void lustre_swab_qdata(struct qunit_data *d); -extern int quota_get_qdata(void*req, struct qunit_data *qdata, - int is_req, int is_exp); +extern struct qunit_data *quota_get_qdata(void*req, int is_req, int is_exp); extern int quota_copy_qdata(void *request, struct qunit_data *qdata, int is_req, int is_exp); diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 234048d..6475ccb 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -1004,11 +1004,6 @@ dont_check_exports: revimp->imp_remote_handle = conn; revimp->imp_dlm_fake = 1; revimp->imp_state = LUSTRE_IMP_FULL; - /* this is a bit of a layering violation, but much less risk than - * changing this very complex and race-prone code. bug=16839 */ - if (data->ocd_connect_flags & OBD_CONNECT_MDS) - obd_set_info_async(export, sizeof(KEY_MDS_CONN), KEY_MDS_CONN, - 0, NULL, NULL); /* unknown versions will be caught in * ptlrpc_handle_server_req_in->lustre_unpack_msg() */ @@ -2284,12 +2279,11 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req) LASSERT(req->rq_export); - OBD_ALLOC(qdata, sizeof(struct qunit_data)); - if (!qdata) - RETURN(-ENOMEM); - rc = quota_get_qdata(req, qdata, QUOTA_REQUEST, QUOTA_EXPORT); - if (rc < 0) { + qdata = quota_get_qdata(req, QUOTA_REQUEST, QUOTA_EXPORT); + if (IS_ERR(qdata)) { + rc = PTR_ERR(qdata); CDEBUG(D_ERROR, "Can't unpack qunit_data(rc: %d)\n", rc); + req->rq_status = rc; GOTO(out, rc); } @@ -2297,7 +2291,7 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req) if (!obd->obd_observer || !obd->obd_observer->obd_observer) { CERROR("Can't find the observer, it is recovering\n"); req->rq_status = -EAGAIN; - GOTO(send_reply, rc = -EAGAIN); + GOTO(out, rc); } master_obd = obd->obd_observer->obd_observer; @@ -2311,7 +2305,6 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req) CDEBUG(D_QUOTA, "quota_type not processed yet, return " "-EAGAIN\n"); req->rq_status = -EAGAIN; - rc = ptlrpc_reply(req); GOTO(out, rc); } @@ -2324,7 +2317,6 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req) CDEBUG(D_QUOTA, "quota_ctxt is not ready yet, return " "-EAGAIN\n"); req->rq_status = -EAGAIN; - rc = ptlrpc_reply(req); GOTO(out, rc); } @@ -2334,24 +2326,22 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req) up_read(&obt->obt_rwsem); if (rc && rc != -EDQUOT) CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR, - "dqacq failed! (rc:%d)\n", rc); + "dqacq/dqrel failed! (rc:%d)\n", rc); req->rq_status = rc; - /* there are three forms of qunit(historic causes), so we need to - * adjust the same form to different forms slaves needed */ rc = quota_copy_qdata(req, qdata, QUOTA_REPLY, QUOTA_EXPORT); if (rc < 0) { - CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc); + CERROR("Can't pack qunit_data(rc: %d)\n", rc); GOTO(out, rc); } /* Block the quota req. b=14840 */ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_BLOCK_QUOTA_REQ, obd_timeout); -send_reply: - rc = ptlrpc_reply(req); + EXIT; + out: - OBD_FREE(qdata, sizeof(struct qunit_data)); - RETURN(rc); + rc = ptlrpc_reply(req); + return rc; #else return 0; #endif /* !__KERNEL__ */ diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index b19c0bb..6b77b66 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -4437,12 +4437,12 @@ static int filter_set_mds_conn(struct obd_export *exp, void *val) if (rc) goto out; - lquota_setinfo(filter_quota_interface_ref, obd, exp); - if (group == FILTER_GROUP_MDS0) { /* setup llog group 1 for interop */ filter_setup_llog_group(exp, obd, FILTER_GROUP_LLOG); } + + lquota_setinfo(filter_quota_interface_ref, obd, exp); out: RETURN(rc); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index f13fe08..8551464 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -3834,12 +3834,6 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen, if (!set && !KEY_IS(KEY_GRANT_SHRINK)) RETURN(-EINVAL); - /* If OST understood OBD_CONNECT_MDS we don't need to tell it we - * are the MDS again. Just do the local setup. b=16839 */ - if (KEY_IS(KEY_MDS_CONN) && - (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MDS)) - RETURN(osc_setinfo_mds_connect_import(imp)); - /* We pass all other commands directly to OST. Since nobody calls osc methods directly and everybody is supposed to go through LOV, we assume lov checked invalid values for us. diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 0647d6b..61a21db 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -2137,43 +2137,32 @@ void lustre_swab_qdata(struct qunit_data *d) /** * got qdata from request(req/rep) */ -int quota_get_qdata(void *request, struct qunit_data *qdata, - int is_req, int is_exp) +struct qunit_data *quota_get_qdata(void *request, int is_req, int is_exp) { struct ptlrpc_request *req = (struct ptlrpc_request *)request; - struct qunit_data *new; + struct qunit_data *qdata; __u64 flags = is_exp ? req->rq_export->exp_connect_flags : req->rq_import->imp_connect_data.ocd_connect_flags; - int rc = 0; LASSERT(req); - LASSERT(qdata); - - /* support for quota64 and change_qs */ - if (flags & OBD_CONNECT_CHANGE_QS) { - if (!(flags & OBD_CONNECT_QUOTA64)) { - CDEBUG(D_ERROR, "Wire protocol for qunit is broken!\n"); - return -EINVAL; - } - if (is_req == QUOTA_REQUEST) - new = lustre_swab_reqbuf(req, REQ_REC_OFF, - sizeof(struct qunit_data), - lustre_swab_qdata); - else - new = lustre_swab_repbuf(req, REPLY_REC_OFF, - sizeof(struct qunit_data), - lustre_swab_qdata); - if (new == NULL) - GOTO(out, rc = -EPROTO); - *qdata = *new; - QDATA_SET_CHANGE_QS(qdata); - return 0; - } else { - QDATA_CLR_CHANGE_QS(qdata); - } - -out: - return rc; + /* support for quota64 */ + LASSERT(flags & OBD_CONNECT_QUOTA64); + /* support for change_qs */ + LASSERT(flags & OBD_CONNECT_CHANGE_QS); + + if (is_req == QUOTA_REQUEST) + qdata = lustre_swab_reqbuf(req, REQ_REC_OFF, + sizeof(struct qunit_data), + lustre_swab_qdata); + else + qdata = lustre_swab_repbuf(req, REPLY_REC_OFF, + sizeof(struct qunit_data), + lustre_swab_qdata); + if (qdata == NULL) + return ERR_PTR(-EPROTO); + + QDATA_SET_CHANGE_QS(qdata); + return qdata; } EXPORT_SYMBOL(quota_get_qdata); @@ -2187,31 +2176,25 @@ int quota_copy_qdata(void *request, struct qunit_data *qdata, void *target; __u64 flags = is_exp ? req->rq_export->exp_connect_flags : req->rq_import->imp_connect_data.ocd_connect_flags; - int rc = 0; LASSERT(req); LASSERT(qdata); - - /* support for quota64 and change_qs */ - if (flags & OBD_CONNECT_CHANGE_QS) { - if (!(flags & OBD_CONNECT_QUOTA64)) { - CERROR("Wire protocol for qunit is broken!\n"); - return -EINVAL; - } - if (is_req == QUOTA_REQUEST) - target = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, - sizeof(struct qunit_data)); - else - target = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(struct qunit_data)); - if (!target) - GOTO(out, rc = -EPROTO); - memcpy(target, qdata, sizeof(*qdata)); - return 0; - } - -out: - return rc; + /* support for quota64 */ + LASSERT(flags & OBD_CONNECT_QUOTA64); + /* support for change_qs */ + LASSERT(flags & OBD_CONNECT_CHANGE_QS); + + if (is_req == QUOTA_REQUEST) + target = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, + sizeof(struct qunit_data)); + else + target = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, + sizeof(struct qunit_data)); + if (target == NULL) + return -EPROTO; + + memcpy(target, qdata, sizeof(*qdata)); + return 0; } EXPORT_SYMBOL(quota_copy_qdata); #endif /* __KERNEL__ */ diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index 1a23830..82a5762 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -113,15 +113,17 @@ struct lustre_qunit { spinlock_t lq_lock; /** Protect the whole structure */ enum qunit_state lq_state; /** Present the status of qunit */ int lq_rc; /** The rc of lq_data */ + pid_t lq_owner; }; #define QUNIT_SET_STATE(qunit, state) \ do { \ spin_lock(&qunit->lq_lock); \ QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ - "lq_rc(%d)\n", \ + "lq_rc(%d), lq_owner(%d)\n", \ qunit, qunit_state_names[qunit->lq_state], \ - qunit_state_names[state], qunit->lq_rc); \ + qunit_state_names[state], qunit->lq_rc, \ + qunit->lq_owner); \ qunit->lq_state = state; \ spin_unlock(&qunit->lq_lock); \ } while(0) @@ -131,9 +133,10 @@ do { \ spin_lock(&qunit->lq_lock); \ qunit->lq_rc = rc; \ QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ - "lq_rc(%d)\n", \ + "lq_rc(%d), lq_owner(%d)\n", \ qunit, qunit_state_names[qunit->lq_state], \ - qunit_state_names[state], qunit->lq_rc); \ + qunit_state_names[state], qunit->lq_rc, \ + qunit->lq_owner); \ qunit->lq_state = state; \ spin_unlock(&qunit->lq_lock); \ } while(0) @@ -435,6 +438,7 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt, qunit->lq_opc = opc; qunit->lq_lock = SPIN_LOCK_UNLOCKED; QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0); + qunit->lq_owner = cfs_curproc_pid(); RETURN(qunit); } @@ -703,30 +707,24 @@ static int dqacq_interpret(const struct lu_env *env, struct lustre_qunit *qunit = aa->aa_qunit; struct obd_device *obd = req->rq_import->imp_obd; struct qunit_data *qdata = NULL; - int rc1 = 0; ENTRY; LASSERT(req); LASSERT(req->rq_import); - /* there are several forms of qunit(historic causes), so we need to - * adjust qunit from slaves to the same form here */ - OBD_ALLOC(qdata, sizeof(struct qunit_data)); - if (!qdata) - RETURN(-ENOMEM); - down_read(&obt->obt_rwsem); /* if a quota req timeouts or is dropped, we should update quota * statistics which will be handled in dqacq_completion. And in * this situation we should get qdata from request instead of * reply */ - rc1 = quota_get_qdata(req, qdata, - (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY, - QUOTA_IMPORT); - if (rc1 < 0) { + qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY, + QUOTA_IMPORT); + if (IS_ERR(qdata)) { + rc = PTR_ERR(qdata); DEBUG_REQ(D_ERROR, req, - "error unpacking qunit_data(rc: %d)\n", rc1); - GOTO(exit, rc = rc1); + "error unpacking qunit_data(rc: %ld)\n", + PTR_ERR(qdata)); + RETURN(PTR_ERR(qdata)); } QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc); @@ -760,10 +758,7 @@ static int dqacq_interpret(const struct lu_env *env, rc = dqacq_completion(obd, qctxt, qdata, rc, lustre_msg_get_opc(req->rq_reqmsg)); -exit: up_read(&obt->obt_rwsem); - OBD_FREE(qdata, sizeof(struct qunit_data)); - RETURN(rc); } @@ -1002,8 +997,9 @@ wait_completion: spin_lock(&qunit->lq_lock); rc = qunit->lq_rc; spin_unlock(&qunit->lq_lock); - CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n", - qunit, rc); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) " + "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id, + qunit->lq_data.qd_flags, rc, qunit->lq_owner); } qunit_put(qunit); @@ -1092,8 +1088,8 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit); l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi); - CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n", - qunit, qunit->lq_rc); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) " + "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner); /* keep same as schedule_dqacq() b=17030 */ spin_lock(&qunit->lq_lock); rc = qunit->lq_rc; @@ -1257,11 +1253,15 @@ static int qslave_recovery_main(void *arg) complete(&data->comp); + spin_lock(&qctxt->lqc_lock); if (qctxt->lqc_recovery) { + spin_unlock(&qctxt->lqc_lock); class_decref(obd, "qslave_recovd_filter", obd); RETURN(0); + } else { + qctxt->lqc_recovery = 1; + spin_unlock(&qctxt->lqc_lock); } - qctxt->lqc_recovery = 1; for (type = USRQUOTA; type < MAXQUOTAS; type++) { struct qunit_data qdata; @@ -1321,8 +1321,10 @@ free: } } - class_decref(obd, "qslave_recovd_filter", obd); + spin_lock(&qctxt->lqc_lock); qctxt->lqc_recovery = 0; + spin_unlock(&qctxt->lqc_lock); + class_decref(obd, "qslave_recovd_filter", obd); RETURN(rc); } diff --git a/lustre/quota/quota_ctl.c b/lustre/quota/quota_ctl.c index 2ebc692..ad4f9a4 100644 --- a/lustre/quota/quota_ctl.c +++ b/lustre/quota/quota_ctl.c @@ -165,7 +165,7 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp, * dqacq/dqrel done then return the correct limits to master */ if (oqctl->qc_stat == QUOTA_RECOVERING) qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt, - oqctl->qc_id, oqctl->qc_type, + oqctl->qc_id, oqctl->qc_type, 1); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); diff --git a/lustre/quota/quota_interface.c b/lustre/quota/quota_interface.c index 16ed64e..60d7038 100644 --- a/lustre/quota/quota_interface.c +++ b/lustre/quota/quota_interface.c @@ -97,50 +97,65 @@ static int filter_quota_setinfo(struct obd_device *obd, void *data) { struct obd_export *exp = data; struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; - struct obd_import *imp; + struct obd_import *imp = exp->exp_imp_reverse; ENTRY; + LASSERT(imp != NULL); + /* setup the quota context import */ spin_lock(&qctxt->lqc_lock); - qctxt->lqc_import = exp->exp_imp_reverse; - spin_unlock(&qctxt->lqc_lock); - CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated now, \n", - obd->obd_name,exp->exp_imp_reverse, obd); - - /* make imp's connect flags equal relative exp's connect flags - * adding it to avoid the scan export list - */ - imp = qctxt->lqc_import; - if (likely(imp)) + if (qctxt->lqc_import != NULL) { + spin_unlock(&qctxt->lqc_lock); + if (qctxt->lqc_import == imp) + CDEBUG(D_WARNING, "%s: lqc_import(%p) of obd(%p) was " + "activated already.\n", obd->obd_name, imp, obd); + else + CDEBUG(D_ERROR, "%s: lqc_import(%p:%p) of obd(%p) was " + "activated by others.\n", obd->obd_name, + qctxt->lqc_import, imp, obd); + } else { + qctxt->lqc_import = imp; + /* make imp's connect flags equal relative exp's connect flags + * adding it to avoid the scan export list */ imp->imp_connect_data.ocd_connect_flags |= - (exp->exp_connect_flags & - (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS)); + (exp->exp_connect_flags & + (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS)); + spin_unlock(&qctxt->lqc_lock); + CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated " + "now.\n", obd->obd_name, imp, obd); - cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster); - /* start quota slave recovery thread. (release high limits) */ - qslave_start_recovery(obd, qctxt); + cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster); + /* start quota slave recovery thread. (release high limits) */ + qslave_start_recovery(obd, qctxt); + } RETURN(0); } static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd) { struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; + struct obd_import *imp = exp->exp_imp_reverse; ENTRY; /* lquota may be not set up before destroying export, b=14896 */ if (!obd->obd_set_up) RETURN(0); + if (unlikely(imp == NULL)) + RETURN(0); + /* when exp->exp_imp_reverse is destroyed, the corresponding lqc_import * should be invalid b=12374 */ - if (qctxt->lqc_import && qctxt->lqc_import == exp->exp_imp_reverse) { - spin_lock(&qctxt->lqc_lock); + spin_lock(&qctxt->lqc_lock); + if (qctxt->lqc_import == imp) { qctxt->lqc_import = NULL; spin_unlock(&qctxt->lqc_lock); - ptlrpc_cleanup_imp(exp->exp_imp_reverse); + CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is invalid now.\n", + obd->obd_name, imp, obd); + ptlrpc_cleanup_imp(imp); dqacq_interrupt(qctxt); - CDEBUG(D_QUOTA, "%s: lqc_import of obd(%p) is invalid now.\n", - obd->obd_name, obd); + } else { + spin_unlock(&qctxt->lqc_lock); } RETURN(0); } -- 1.8.3.1