Whamcloud - gitweb
Branch HEAD
authorfanyong <fanyong>
Thu, 21 May 2009 03:37:46 +0000 (03:37 +0000)
committerfanyong <fanyong>
Thu, 21 May 2009 03:37:46 +0000 (03:37 +0000)
b=19450
i=tianzy
i=andrew.perepechko
i=vladimir.saveliev

quota_setinfo after connection between MDS and OST complete.

lustre/include/lustre/lustre_idl.h
lustre/ldlm/ldlm_lib.c
lustre/obdfilter/filter.c
lustre/osc/osc_request.c
lustre/ptlrpc/pack_generic.c
lustre/quota/quota_context.c
lustre/quota/quota_ctl.c
lustre/quota/quota_interface.c

index bf8fad1..d16508d 100644 (file)
@@ -2633,8 +2633,7 @@ struct qunit_data {
 #define QDATA_CLR_CHANGE_QS(qdata)  ((qdata)->qd_flags &= ~LQUOTA_FLAGS_CHG_QS)
 
 extern void lustre_swab_qdata(struct qunit_data *d);
-extern int quota_get_qdata(void*req, struct qunit_data *qdata,
-                           int is_req, int is_exp);
+extern struct qunit_data *quota_get_qdata(void*req, int is_req, int is_exp);
 extern int quota_copy_qdata(void *request, struct qunit_data *qdata,
                             int is_req, int is_exp);
 
index 234048d..6475ccb 100644 (file)
@@ -1004,11 +1004,6 @@ dont_check_exports:
         revimp->imp_remote_handle = conn;
         revimp->imp_dlm_fake = 1;
         revimp->imp_state = LUSTRE_IMP_FULL;
-        /* this is a bit of a layering violation, but much less risk than
-         * changing this very complex and race-prone code.  bug=16839 */
-        if (data->ocd_connect_flags & OBD_CONNECT_MDS)
-                obd_set_info_async(export, sizeof(KEY_MDS_CONN), KEY_MDS_CONN,
-                                   0, NULL, NULL);
 
         /* unknown versions will be caught in
          * ptlrpc_handle_server_req_in->lustre_unpack_msg() */
@@ -2284,12 +2279,11 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
 
         LASSERT(req->rq_export);
 
-        OBD_ALLOC(qdata, sizeof(struct qunit_data));
-        if (!qdata)
-                RETURN(-ENOMEM);
-        rc = quota_get_qdata(req, qdata, QUOTA_REQUEST, QUOTA_EXPORT);
-        if (rc < 0) {
+        qdata = quota_get_qdata(req, QUOTA_REQUEST, QUOTA_EXPORT);
+        if (IS_ERR(qdata)) {
+                rc = PTR_ERR(qdata);
                 CDEBUG(D_ERROR, "Can't unpack qunit_data(rc: %d)\n", rc);
+                req->rq_status = rc;
                 GOTO(out, rc);
         }
 
@@ -2297,7 +2291,7 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
         if (!obd->obd_observer || !obd->obd_observer->obd_observer) {
                 CERROR("Can't find the observer, it is recovering\n");
                 req->rq_status = -EAGAIN;
-                GOTO(send_reply, rc = -EAGAIN);
+                GOTO(out, rc);
         }
 
         master_obd = obd->obd_observer->obd_observer;
@@ -2311,7 +2305,6 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
                 CDEBUG(D_QUOTA, "quota_type not processed yet, return "
                        "-EAGAIN\n");
                 req->rq_status = -EAGAIN;
-                rc = ptlrpc_reply(req);
                 GOTO(out, rc);
         }
 
@@ -2324,7 +2317,6 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
                 CDEBUG(D_QUOTA, "quota_ctxt is not ready yet, return "
                        "-EAGAIN\n");
                 req->rq_status = -EAGAIN;
-                rc = ptlrpc_reply(req);
                 GOTO(out, rc);
         }
 
@@ -2334,24 +2326,22 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
         up_read(&obt->obt_rwsem);
         if (rc && rc != -EDQUOT)
                 CDEBUG(rc == -EBUSY  ? D_QUOTA : D_ERROR,
-                       "dqacq failed! (rc:%d)\n", rc);
+                       "dqacq/dqrel failed! (rc:%d)\n", rc);
         req->rq_status = rc;
 
-        /* there are three forms of qunit(historic causes), so we need to
-         * adjust the same form to different forms slaves needed */
         rc = quota_copy_qdata(req, qdata, QUOTA_REPLY, QUOTA_EXPORT);
         if (rc < 0) {
-                CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
+                CERROR("Can't pack qunit_data(rc: %d)\n", rc);
                 GOTO(out, rc);
         }
 
         /* Block the quota req. b=14840 */
         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_BLOCK_QUOTA_REQ, obd_timeout);
-send_reply:
-        rc = ptlrpc_reply(req);
+        EXIT;
+
 out:
-        OBD_FREE(qdata, sizeof(struct qunit_data));
-        RETURN(rc);
+        rc = ptlrpc_reply(req);
+        return rc;
 #else
         return 0;
 #endif /* !__KERNEL__ */
index b19c0bb..6b77b66 100644 (file)
@@ -4437,12 +4437,12 @@ static int filter_set_mds_conn(struct obd_export *exp, void *val)
         if (rc)
                 goto out;
 
-        lquota_setinfo(filter_quota_interface_ref, obd, exp);
-
         if (group == FILTER_GROUP_MDS0) {
                 /* setup llog group 1 for interop */
                 filter_setup_llog_group(exp, obd, FILTER_GROUP_LLOG);
         }
+
+        lquota_setinfo(filter_quota_interface_ref, obd, exp);
 out:
         RETURN(rc);
 }
index f13fe08..8551464 100644 (file)
@@ -3834,12 +3834,6 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
                 RETURN(-EINVAL);
 
-        /* If OST understood OBD_CONNECT_MDS we don't need to tell it we
-         * are the MDS again.  Just do the local setup.  b=16839 */
-        if (KEY_IS(KEY_MDS_CONN) &&
-            (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MDS))
-                RETURN(osc_setinfo_mds_connect_import(imp));
-
         /* We pass all other commands directly to OST. Since nobody calls osc
            methods directly and everybody is supposed to go through LOV, we
            assume lov checked invalid values for us.
index 0647d6b..61a21db 100644 (file)
@@ -2137,43 +2137,32 @@ void lustre_swab_qdata(struct qunit_data *d)
 /**
  * got qdata from request(req/rep)
  */
-int quota_get_qdata(void *request, struct qunit_data *qdata,
-                    int is_req, int is_exp)
+struct qunit_data *quota_get_qdata(void *request, int is_req, int is_exp)
 {
         struct ptlrpc_request *req = (struct ptlrpc_request *)request;
-        struct qunit_data *new;
+        struct qunit_data *qdata;
         __u64  flags = is_exp ? req->rq_export->exp_connect_flags :
                        req->rq_import->imp_connect_data.ocd_connect_flags;
-        int rc = 0;
 
         LASSERT(req);
-        LASSERT(qdata);
-
-        /* support for quota64 and change_qs */
-        if (flags & OBD_CONNECT_CHANGE_QS) {
-                if (!(flags & OBD_CONNECT_QUOTA64)) {
-                        CDEBUG(D_ERROR, "Wire protocol for qunit is broken!\n");
-                        return -EINVAL;
-                }
-                if (is_req == QUOTA_REQUEST)
-                        new = lustre_swab_reqbuf(req, REQ_REC_OFF,
-                                                 sizeof(struct qunit_data),
-                                                 lustre_swab_qdata);
-                else
-                        new = lustre_swab_repbuf(req, REPLY_REC_OFF,
-                                                 sizeof(struct qunit_data),
-                                                 lustre_swab_qdata);
-                if (new == NULL)
-                        GOTO(out, rc = -EPROTO);
-                *qdata = *new;
-                QDATA_SET_CHANGE_QS(qdata);
-                return 0;
-        } else {
-                QDATA_CLR_CHANGE_QS(qdata);
-        }
-
-out:
-        return rc;
+        /* support for quota64 */
+        LASSERT(flags & OBD_CONNECT_QUOTA64);
+        /* support for change_qs */
+        LASSERT(flags & OBD_CONNECT_CHANGE_QS);
+
+        if (is_req == QUOTA_REQUEST)
+                qdata = lustre_swab_reqbuf(req, REQ_REC_OFF,
+                                           sizeof(struct qunit_data),
+                                           lustre_swab_qdata);
+        else
+                qdata = lustre_swab_repbuf(req, REPLY_REC_OFF,
+                                           sizeof(struct qunit_data),
+                                           lustre_swab_qdata);
+        if (qdata == NULL)
+                return ERR_PTR(-EPROTO);
+
+        QDATA_SET_CHANGE_QS(qdata);
+        return qdata;
 }
 EXPORT_SYMBOL(quota_get_qdata);
 
@@ -2187,31 +2176,25 @@ int quota_copy_qdata(void *request, struct qunit_data *qdata,
         void *target;
         __u64  flags = is_exp ? req->rq_export->exp_connect_flags :
                 req->rq_import->imp_connect_data.ocd_connect_flags;
-        int rc = 0;
 
         LASSERT(req);
         LASSERT(qdata);
-
-        /* support for quota64 and change_qs */
-        if (flags & OBD_CONNECT_CHANGE_QS) {
-                if (!(flags & OBD_CONNECT_QUOTA64)) {
-                        CERROR("Wire protocol for qunit is broken!\n");
-                        return -EINVAL;
-                }
-                if (is_req == QUOTA_REQUEST)
-                        target = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
-                                                sizeof(struct qunit_data));
-                else
-                        target = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                                sizeof(struct qunit_data));
-                if (!target)
-                        GOTO(out, rc = -EPROTO);
-                memcpy(target, qdata, sizeof(*qdata));
-                return 0;
-        }
-
-out:
-        return rc;
+        /* support for quota64 */
+        LASSERT(flags & OBD_CONNECT_QUOTA64);
+        /* support for change_qs */
+        LASSERT(flags & OBD_CONNECT_CHANGE_QS);
+
+        if (is_req == QUOTA_REQUEST)
+                target = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
+                                        sizeof(struct qunit_data));
+        else
+                target = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
+                                        sizeof(struct qunit_data));
+        if (target == NULL)
+                return -EPROTO;
+
+        memcpy(target, qdata, sizeof(*qdata));
+        return 0;
 }
 EXPORT_SYMBOL(quota_copy_qdata);
 #endif /* __KERNEL__ */
index 1a23830..82a5762 100644 (file)
@@ -113,15 +113,17 @@ struct lustre_qunit {
         spinlock_t lq_lock;                /** Protect the whole structure */
         enum qunit_state lq_state;         /** Present the status of qunit */
         int lq_rc;                         /** The rc of lq_data */
+        pid_t lq_owner;
 };
 
 #define QUNIT_SET_STATE(qunit, state)                                   \
 do {                                                                    \
         spin_lock(&qunit->lq_lock);                                     \
         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
-                    "lq_rc(%d)\n",                                      \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
                     qunit, qunit_state_names[qunit->lq_state],          \
-                    qunit_state_names[state], qunit->lq_rc);            \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
         qunit->lq_state = state;                                        \
         spin_unlock(&qunit->lq_lock);                                   \
 } while(0)
@@ -131,9 +133,10 @@ do {                                                                    \
         spin_lock(&qunit->lq_lock);                                     \
         qunit->lq_rc = rc;                                              \
         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
-                    "lq_rc(%d)\n",                                      \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
                     qunit, qunit_state_names[qunit->lq_state],          \
-                    qunit_state_names[state], qunit->lq_rc);            \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
         qunit->lq_state = state;                                        \
         spin_unlock(&qunit->lq_lock);                                   \
 } while(0)
@@ -435,6 +438,7 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
         qunit->lq_opc = opc;
         qunit->lq_lock = SPIN_LOCK_UNLOCKED;
         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+        qunit->lq_owner = cfs_curproc_pid();
         RETURN(qunit);
 }
 
@@ -703,30 +707,24 @@ static int dqacq_interpret(const struct lu_env *env,
         struct lustre_qunit *qunit = aa->aa_qunit;
         struct obd_device *obd = req->rq_import->imp_obd;
         struct qunit_data *qdata = NULL;
-        int rc1 = 0;
         ENTRY;
 
         LASSERT(req);
         LASSERT(req->rq_import);
 
-        /* there are several forms of qunit(historic causes), so we need to
-         * adjust qunit from slaves to the same form here */
-        OBD_ALLOC(qdata, sizeof(struct qunit_data));
-        if (!qdata)
-                RETURN(-ENOMEM);
-
         down_read(&obt->obt_rwsem);
         /* if a quota req timeouts or is dropped, we should update quota
          * statistics which will be handled in dqacq_completion. And in
          * this situation we should get qdata from request instead of
          * reply */
-        rc1 = quota_get_qdata(req, qdata,
-                              (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
-                              QUOTA_IMPORT);
-        if (rc1 < 0) {
+        qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
+                                QUOTA_IMPORT);
+        if (IS_ERR(qdata)) {
+                rc = PTR_ERR(qdata);
                 DEBUG_REQ(D_ERROR, req,
-                          "error unpacking qunit_data(rc: %d)\n", rc1);
-                GOTO(exit, rc = rc1);
+                          "error unpacking qunit_data(rc: %ld)\n",
+                          PTR_ERR(qdata));
+                RETURN(PTR_ERR(qdata));
         }
 
         QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc);
@@ -760,10 +758,7 @@ static int dqacq_interpret(const struct lu_env *env,
         rc = dqacq_completion(obd, qctxt, qdata, rc,
                               lustre_msg_get_opc(req->rq_reqmsg));
 
-exit:
         up_read(&obt->obt_rwsem);
-        OBD_FREE(qdata, sizeof(struct qunit_data));
-
         RETURN(rc);
 }
 
@@ -1002,8 +997,9 @@ wait_completion:
                 spin_lock(&qunit->lq_lock);
                 rc = qunit->lq_rc;
                 spin_unlock(&qunit->lq_lock);
-                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
-                       qunit, rc);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) "
+                       "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id,
+                       qunit->lq_data.qd_flags, rc, qunit->lq_owner);
         }
 
         qunit_put(qunit);
@@ -1092,8 +1088,8 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
                 l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
-                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
-                       qunit, qunit->lq_rc);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) "
+                       "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner);
                 /* keep same as schedule_dqacq() b=17030 */
                 spin_lock(&qunit->lq_lock);
                 rc = qunit->lq_rc;
@@ -1257,11 +1253,15 @@ static int qslave_recovery_main(void *arg)
 
         complete(&data->comp);
 
+        spin_lock(&qctxt->lqc_lock);
         if (qctxt->lqc_recovery) {
+                spin_unlock(&qctxt->lqc_lock);
                 class_decref(obd, "qslave_recovd_filter", obd);
                 RETURN(0);
+        } else {
+                qctxt->lqc_recovery = 1;
+                spin_unlock(&qctxt->lqc_lock);
         }
-        qctxt->lqc_recovery = 1;
 
         for (type = USRQUOTA; type < MAXQUOTAS; type++) {
                 struct qunit_data qdata;
@@ -1321,8 +1321,10 @@ free:
                 }
         }
 
-        class_decref(obd, "qslave_recovd_filter", obd);
+        spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_recovery = 0;
+        spin_unlock(&qctxt->lqc_lock);
+        class_decref(obd, "qslave_recovd_filter", obd);
         RETURN(rc);
 }
 
index 2ebc692..ad4f9a4 100644 (file)
@@ -165,7 +165,7 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
                  * dqacq/dqrel done then return the correct limits to master */
                 if (oqctl->qc_stat == QUOTA_RECOVERING)
                         qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
-                                                 oqctl->qc_id, oqctl->qc_type, 
+                                                 oqctl->qc_id, oqctl->qc_type,
                                                  1);
 
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
index 16ed64e..60d7038 100644 (file)
@@ -97,50 +97,65 @@ static int filter_quota_setinfo(struct obd_device *obd, void *data)
 {
         struct obd_export *exp = data;
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
-        struct obd_import *imp;
+        struct obd_import *imp = exp->exp_imp_reverse;
         ENTRY;
 
+        LASSERT(imp != NULL);
+
         /* setup the quota context import */
         spin_lock(&qctxt->lqc_lock);
-        qctxt->lqc_import = exp->exp_imp_reverse;
-        spin_unlock(&qctxt->lqc_lock);
-        CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated now, \n",
-               obd->obd_name,exp->exp_imp_reverse, obd);
-
-        /* make imp's connect flags equal relative exp's connect flags
-         * adding it to avoid the scan export list
-         */
-        imp = qctxt->lqc_import;
-        if (likely(imp))
+        if (qctxt->lqc_import != NULL) {
+                spin_unlock(&qctxt->lqc_lock);
+                if (qctxt->lqc_import == imp)
+                        CDEBUG(D_WARNING, "%s: lqc_import(%p) of obd(%p) was "
+                               "activated already.\n", obd->obd_name, imp, obd);
+                else
+                        CDEBUG(D_ERROR, "%s: lqc_import(%p:%p) of obd(%p) was "
+                               "activated by others.\n", obd->obd_name,
+                               qctxt->lqc_import, imp, obd);
+        } else {
+                qctxt->lqc_import = imp;
+                /* make imp's connect flags equal relative exp's connect flags
+                 * adding it to avoid the scan export list */
                 imp->imp_connect_data.ocd_connect_flags |=
-                        (exp->exp_connect_flags &
-                         (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
+                                (exp->exp_connect_flags &
+                                 (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
+                spin_unlock(&qctxt->lqc_lock);
+                CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated "
+                       "now.\n", obd->obd_name, imp, obd);
 
-        cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
-        /* start quota slave recovery thread. (release high limits) */
-        qslave_start_recovery(obd, qctxt);
+                cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
+                /* start quota slave recovery thread. (release high limits) */
+                qslave_start_recovery(obd, qctxt);
+        }
         RETURN(0);
 }
 
 static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        struct obd_import *imp = exp->exp_imp_reverse;
         ENTRY;
 
         /* lquota may be not set up before destroying export, b=14896 */
         if (!obd->obd_set_up)
                 RETURN(0);
 
+        if (unlikely(imp == NULL))
+                RETURN(0);
+
         /* when exp->exp_imp_reverse is destroyed, the corresponding lqc_import
          * should be invalid b=12374 */
-        if (qctxt->lqc_import && qctxt->lqc_import == exp->exp_imp_reverse) {
-                spin_lock(&qctxt->lqc_lock);
+        spin_lock(&qctxt->lqc_lock);
+        if (qctxt->lqc_import == imp) {
                 qctxt->lqc_import = NULL;
                 spin_unlock(&qctxt->lqc_lock);
-                ptlrpc_cleanup_imp(exp->exp_imp_reverse);
+                CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is invalid now.\n",
+                       obd->obd_name, imp, obd);
+                ptlrpc_cleanup_imp(imp);
                 dqacq_interrupt(qctxt);
-                CDEBUG(D_QUOTA, "%s: lqc_import of obd(%p) is invalid now.\n",
-                       obd->obd_name, obd);
+        } else {
+                spin_unlock(&qctxt->lqc_lock);
         }
         RETURN(0);
 }