Whamcloud - gitweb
b=14149
[fs/lustre-release.git] / lustre / quota / quota_context.c
index e932307..6c5de20 100644 (file)
@@ -54,7 +54,8 @@ int should_translate_quota (struct obd_import *imp)
         ENTRY;
 
         LASSERT(imp);
-        if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_QUOTA64)
+        if ((imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_QUOTA64) && 
+            !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT))
                 RETURN(0);
         else
                 RETURN(1);
@@ -86,8 +87,8 @@ int qunit_cache_init(void)
 
         LASSERT(qunit_cachep == NULL);
         qunit_cachep = cfs_mem_cache_create("ll_qunit_cache",
-                                         sizeof(struct lustre_qunit),
-                                         0, 0);
+                                            sizeof(struct lustre_qunit),
+                                            0, 0);
         if (!qunit_cachep)
                 RETURN(-ENOMEM);
 
@@ -209,10 +210,6 @@ check_cur_qunit(struct obd_device *obd,
         if (!sb_any_quota_enabled(sb))
                 RETURN(0);
 
-        /* ignore root user */
-        if (qdata->qd_id == 0 && qdata_type == USRQUOTA)
-                RETURN(0);
-
         OBD_ALLOC_PTR(qctl);
         if (qctl == NULL)
                 RETURN(-ENOMEM);
@@ -361,27 +358,31 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
 static int split_before_schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                                        struct qunit_data *qdata, int opc, int wait)
 {
-        int rc = 0, ret;
+        int rc = 0;
+        unsigned long factor;
         struct qunit_data tmp_qdata;
         ENTRY;
 
-        LASSERT(qdata);
-        if (qctxt->lqc_import)
-                while (should_translate_quota(qctxt->lqc_import) &&
-                       qdata->qd_count > MAX_QUOTA_COUNT32) {
+        LASSERT(qdata && qdata->qd_count);
+        QDATA_DEBUG(qdata, "%s quota split.\n",
+                    (qdata->qd_flags & QUOTA_IS_BLOCK) ? "block" : "inode");
+        if (qdata->qd_flags & QUOTA_IS_BLOCK)
+                factor = MAX_QUOTA_COUNT32 / qctxt->lqc_bunit_sz * 
+                        qctxt->lqc_bunit_sz;
+        else
+                factor = MAX_QUOTA_COUNT32 / qctxt->lqc_iunit_sz * 
+                        qctxt->lqc_iunit_sz;
 
+        if (qctxt->lqc_import && should_translate_quota(qctxt->lqc_import) &&
+            qdata->qd_count > factor) {
                         tmp_qdata = *qdata;
-                        tmp_qdata.qd_count = MAX_QUOTA_COUNT32;
+                tmp_qdata.qd_count = factor;
                         qdata->qd_count -= tmp_qdata.qd_count;
-                        ret = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait);
-                        if (!rc)
-                                rc = ret;
-                }
-
-        if (qdata->qd_count){
-                ret = schedule_dqacq(obd, qctxt, qdata, opc, wait);
-                if (!rc)
-                        rc = ret;
+                QDATA_DEBUG((&tmp_qdata), "be split.\n");
+                rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait);
+        } else{
+                QDATA_DEBUG(qdata, "don't be split.\n");
+                rc = schedule_dqacq(obd, qctxt, qdata, opc, wait);
         }
 
         RETURN(rc);
@@ -406,7 +407,8 @@ dqacq_completion(struct obd_device *obd,
         LASSERT(qdata);
         qunit_sz = is_blk ? qctxt->lqc_bunit_sz : qctxt->lqc_iunit_sz;
         div_r = do_div(qd_tmp, qunit_sz);
-        LASSERT(!div_r);
+        LASSERTF(!div_r, "qunit_sz: %lu, return qunit_sz: "LPU64"\n",
+                 qunit_sz, qd_tmp);
 
         /* update local operational quota file */
         if (rc == 0) {
@@ -440,10 +442,18 @@ dqacq_completion(struct obd_device *obd,
 
                 switch (opc) {
                 case QUOTA_DQACQ:
+                        CDEBUG(D_QUOTA, "%s(acq):count: %d, hardlimt: "LPU64 
+                               ",type: %s.\n", obd->obd_name, count, *hardlimit, 
+                               qdata_type ? "grp": "usr");
                         INC_QLIMIT(*hardlimit, count);
                         break;
                 case QUOTA_DQREL:
-                        LASSERT(count < *hardlimit);
+                        CDEBUG(D_QUOTA, "%s(rel):count: %d, hardlimt: "LPU64 
+                               ",type: %s.\n", obd->obd_name, count, *hardlimit, 
+                               qdata_type ? "grp": "usr");
+                        LASSERTF(count < *hardlimit, 
+                                 "count: %d, hardlimit: "LPU64".\n", 
+                                 count, *hardlimit);
                         *hardlimit -= count;
                         break;
                 default:
@@ -533,14 +543,21 @@ static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc)
 
         LASSERT(req);
         LASSERT(req->rq_import);
-        if ((req->rq_import->imp_connect_data.ocd_connect_flags & OBD_CONNECT_QUOTA64)  &&
+
+        if ((req->rq_import->imp_connect_data.ocd_connect_flags &
+             OBD_CONNECT_QUOTA64) &&
             !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT)) {
                 CDEBUG(D_QUOTA, "qd_count is 64bit!\n");
-                qdata = lustre_swab_reqbuf(req, REPLY_REC_OFF, sizeof(*qdata), lustre_swab_qdata);
+
+                qdata = req_capsule_server_swab_get(&req->rq_pill,
+                                                    &RMF_QUNIT_DATA,
+                                          (void*)lustre_swab_qdata);
         } else {
                 CDEBUG(D_QUOTA, "qd_count is 32bit!\n");
-                qdata_old = lustre_swab_reqbuf(req, REPLY_REC_OFF, sizeof(struct qunit_data_old),
-                                               lustre_swab_qdata_old);
+
+                qdata = req_capsule_server_swab_get(&req->rq_pill,
+                                                    &RMF_QUNIT_DATA,
+                                       (void*)lustre_swab_qdata_old);
                 qdata = lustre_quota_old_to_new(qdata_old);
         }
         if (qdata == NULL) {
@@ -549,7 +566,8 @@ static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc)
         }
 
         LASSERT(qdata->qd_id == qunit->lq_data.qd_id &&
-                (qdata->qd_flags & QUOTA_IS_GRP) == (qunit->lq_data.qd_flags & QUOTA_IS_GRP) &&
+                (qdata->qd_flags & QUOTA_IS_GRP) ==
+                 (qunit->lq_data.qd_flags & QUOTA_IS_GRP) &&
                 (qdata->qd_count == qunit->lq_data.qd_count ||
                  qdata->qd_count == 0));
 
@@ -584,7 +602,7 @@ schedule_dqacq(struct obd_device *obd,
         struct ptlrpc_request *req;
         struct qunit_data *reqdata;
         struct dqacq_async_args *aa;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*reqdata) };
+       unsigned long factor;   
         int rc = 0;
         ENTRY;
 
@@ -615,7 +633,8 @@ schedule_dqacq(struct obd_device *obd,
         LASSERT(qunit);
 
         /* master is going to dqacq/dqrel from itself */
-        if (is_master(obd, qctxt, qdata->qd_id, qdata->qd_flags & QUOTA_IS_GRP)) {
+        if (is_master(obd, qctxt, qdata->qd_id, qdata->qd_flags & QUOTA_IS_GRP))
+        {
                 int rc2;
                 QDATA_DEBUG(qdata, "local %s.\n",
                             opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
@@ -626,34 +645,45 @@ schedule_dqacq(struct obd_device *obd,
 
         /* build dqacq/dqrel request */
         LASSERT(qctxt->lqc_import);
-        req = ptlrpc_prep_req(qctxt->lqc_import, LUSTRE_MDS_VERSION, opc, 2,
-                              size, NULL);
-        if (!req) {
+
+        req = ptlrpc_request_alloc_pack(qctxt->lqc_import, &RQF_MDS_QUOTA_DQACQ,
+                                        LUSTRE_MDS_VERSION, opc);
+        if (req == NULL) {
                 dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
                 RETURN(-ENOMEM);
         }
 
+       if (qdata->qd_flags & QUOTA_IS_BLOCK)
+               factor = MAX_QUOTA_COUNT32 / qctxt->lqc_bunit_sz * 
+                         qctxt->lqc_bunit_sz;
+        else
+                factor = MAX_QUOTA_COUNT32 / qctxt->lqc_iunit_sz * 
+                         qctxt->lqc_iunit_sz;
+
         LASSERT(!should_translate_quota(qctxt->lqc_import) || 
-                qdata->qd_count <= MAX_QUOTA_COUNT32);
-        if (should_translate_quota(qctxt->lqc_import) ||
-            OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT))
+                qdata->qd_count <= factor);
+        if (should_translate_quota(qctxt->lqc_import))
         {
                 struct qunit_data_old *reqdata_old, *tmp;
                         
-                reqdata_old = lustre_msg_buf(req->rq_reqmsg, REPLY_REC_OFF, 
-                                             sizeof(*reqdata_old));
+                reqdata_old = req_capsule_client_get(&req->rq_pill,
+                                                     &RMF_QUNIT_DATA);
+
                 tmp = lustre_quota_new_to_old(qdata);
                 *reqdata_old = *tmp;
-                size[1] = sizeof(*reqdata_old);
+                req_capsule_set_size(&req->rq_pill, &RMF_QUNIT_DATA, RCL_SERVER,
+                                     sizeof(*reqdata_old));
                 CDEBUG(D_QUOTA, "qd_count is 32bit!\n");
         } else {
-                reqdata = lustre_msg_buf(req->rq_reqmsg, REPLY_REC_OFF,
-                                         sizeof(*reqdata));
+                reqdata = req_capsule_client_get(&req->rq_pill,
+                                                 &RMF_QUNIT_DATA);
+
                 *reqdata = *qdata;
-                size[1] = sizeof(*reqdata);
+                req_capsule_set_size(&req->rq_pill, &RMF_QUNIT_DATA, RCL_SERVER,
+                                     sizeof(*reqdata));
                 CDEBUG(D_QUOTA, "qd_count is 64bit!\n");
         }
-        ptlrpc_req_set_repsize(req, 2, size);
+        ptlrpc_request_set_replen(req);
 
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = (struct dqacq_async_args *)&req->rq_async_args;