Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / quota / quota_context.c
index 595f5e4..83f1171 100644 (file)
@@ -1,17 +1,45 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  lustre/quota/quota_context.c
- *  Lustre Quota Context
+ * GPL HEADER START
  *
- *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
- *   Author: Niu YaWei <niu@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   No redistribution or use is permitted outside of Cluster File Systems, Inc.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see [sun.com URL with a
+ * copy of GPLv2].
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/quota/quota_context.c
+ *
+ * Lustre Quota Context
+ *
+ * Author: Niu YaWei <niu@clusterfs.com>
  */
+
 #ifndef EXPORT_SYMTAB
 # define EXPORT_SYMTAB
 #endif
@@ -87,14 +115,14 @@ int qunit_cache_init(void)
 
         LASSERT(qunit_cachep == NULL);
         qunit_cachep = cfs_mem_cache_create("ll_qunit_cache",
-                                         sizeof(struct lustre_qunit),
-                                         0, 0);
+                                            sizeof(struct lustre_qunit),
+                                            0, 0);
         if (!qunit_cachep)
                 RETURN(-ENOMEM);
 
         spin_lock(&qunit_hash_lock);
         for (i = 0; i < NR_DQHASH; i++)
-                INIT_LIST_HEAD(qunit_hash + i);
+                CFS_INIT_LIST_HEAD(qunit_hash + i);
         spin_unlock(&qunit_hash_lock);
         RETURN(0);
 }
@@ -210,10 +238,6 @@ check_cur_qunit(struct obd_device *obd,
         if (!sb_any_quota_enabled(sb))
                 RETURN(0);
 
-        /* ignore root user */
-        if (qdata->qd_id == 0 && qdata_type == USRQUOTA)
-                RETURN(0);
-
         OBD_ALLOC_PTR(qctl);
         if (qctl == NULL)
                 RETURN(-ENOMEM);
@@ -294,8 +318,8 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
         if (qunit == NULL)
                 RETURN(NULL);
 
-        INIT_LIST_HEAD(&qunit->lq_hash);
-        INIT_LIST_HEAD(&qunit->lq_waiters);
+        CFS_INIT_LIST_HEAD(&qunit->lq_hash);
+        CFS_INIT_LIST_HEAD(&qunit->lq_waiters);
         atomic_set(&qunit->lq_refcnt, 1);
         qunit->lq_ctxt = qctxt;
         memcpy(&qunit->lq_data, qdata, sizeof(*qdata));
@@ -362,27 +386,31 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
 static int split_before_schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                                        struct qunit_data *qdata, int opc, int wait)
 {
-        int rc = 0, ret;
+        int rc = 0;
+        unsigned long factor;
         struct qunit_data tmp_qdata;
         ENTRY;
 
-        LASSERT(qdata);
-        if (qctxt->lqc_import)
-                while (should_translate_quota(qctxt->lqc_import) &&
-                       qdata->qd_count > MAX_QUOTA_COUNT32) {
+        LASSERT(qdata && qdata->qd_count);
+        QDATA_DEBUG(qdata, "%s quota split.\n",
+                    (qdata->qd_flags & QUOTA_IS_BLOCK) ? "block" : "inode");
+        if (qdata->qd_flags & QUOTA_IS_BLOCK)
+                factor = MAX_QUOTA_COUNT32 / qctxt->lqc_bunit_sz * 
+                        qctxt->lqc_bunit_sz;
+        else
+                factor = MAX_QUOTA_COUNT32 / qctxt->lqc_iunit_sz * 
+                        qctxt->lqc_iunit_sz;
 
+        if (qctxt->lqc_import && should_translate_quota(qctxt->lqc_import) &&
+            qdata->qd_count > factor) {
                         tmp_qdata = *qdata;
-                        tmp_qdata.qd_count = MAX_QUOTA_COUNT32;
+                tmp_qdata.qd_count = factor;
                         qdata->qd_count -= tmp_qdata.qd_count;
-                        ret = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait);
-                        if (!rc)
-                                rc = ret;
-                }
-
-        if (qdata->qd_count){
-                ret = schedule_dqacq(obd, qctxt, qdata, opc, wait);
-                if (!rc)
-                        rc = ret;
+                QDATA_DEBUG((&tmp_qdata), "be split.\n");
+                rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait);
+        } else{
+                QDATA_DEBUG(qdata, "don't be split.\n");
+                rc = schedule_dqacq(obd, qctxt, qdata, opc, wait);
         }
 
         RETURN(rc);
@@ -407,7 +435,8 @@ dqacq_completion(struct obd_device *obd,
         LASSERT(qdata);
         qunit_sz = is_blk ? qctxt->lqc_bunit_sz : qctxt->lqc_iunit_sz;
         div_r = do_div(qd_tmp, qunit_sz);
-        LASSERT(!div_r);
+        LASSERTF(!div_r, "qunit_sz: %lu, return qunit_sz: "LPU64"\n",
+                 qunit_sz, qd_tmp);
 
         /* update local operational quota file */
         if (rc == 0) {
@@ -441,10 +470,18 @@ dqacq_completion(struct obd_device *obd,
 
                 switch (opc) {
                 case QUOTA_DQACQ:
+                        CDEBUG(D_QUOTA, "%s(acq):count: %d, hardlimt: "LPU64 
+                               ",type: %s.\n", obd->obd_name, count, *hardlimit, 
+                               qdata_type ? "grp": "usr");
                         INC_QLIMIT(*hardlimit, count);
                         break;
                 case QUOTA_DQREL:
-                        LASSERT(count < *hardlimit);
+                        CDEBUG(D_QUOTA, "%s(rel):count: %d, hardlimt: "LPU64 
+                               ",type: %s.\n", obd->obd_name, count, *hardlimit, 
+                               qdata_type ? "grp": "usr");
+                        LASSERTF(count < *hardlimit, 
+                                 "count: %d, hardlimit: "LPU64".\n", 
+                                 count, *hardlimit);
                         *hardlimit -= count;
                         break;
                 default:
@@ -534,14 +571,21 @@ static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc)
 
         LASSERT(req);
         LASSERT(req->rq_import);
-        if ((req->rq_import->imp_connect_data.ocd_connect_flags & OBD_CONNECT_QUOTA64)  &&
+
+        if ((req->rq_import->imp_connect_data.ocd_connect_flags &
+             OBD_CONNECT_QUOTA64) &&
             !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT)) {
                 CDEBUG(D_QUOTA, "qd_count is 64bit!\n");
-                qdata = lustre_swab_reqbuf(req, REPLY_REC_OFF, sizeof(*qdata), lustre_swab_qdata);
+
+                qdata = req_capsule_server_swab_get(&req->rq_pill,
+                                                    &RMF_QUNIT_DATA,
+                                          (void*)lustre_swab_qdata);
         } else {
                 CDEBUG(D_QUOTA, "qd_count is 32bit!\n");
-                qdata_old = lustre_swab_reqbuf(req, REPLY_REC_OFF, sizeof(struct qunit_data_old),
-                                               lustre_swab_qdata_old);
+
+                qdata = req_capsule_server_swab_get(&req->rq_pill,
+                                                    &RMF_QUNIT_DATA,
+                                       (void*)lustre_swab_qdata_old);
                 qdata = lustre_quota_old_to_new(qdata_old);
         }
         if (qdata == NULL) {
@@ -550,7 +594,8 @@ static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc)
         }
 
         LASSERT(qdata->qd_id == qunit->lq_data.qd_id &&
-                (qdata->qd_flags & QUOTA_IS_GRP) == (qunit->lq_data.qd_flags & QUOTA_IS_GRP) &&
+                (qdata->qd_flags & QUOTA_IS_GRP) ==
+                 (qunit->lq_data.qd_flags & QUOTA_IS_GRP) &&
                 (qdata->qd_count == qunit->lq_data.qd_count ||
                  qdata->qd_count == 0));
 
@@ -585,11 +630,11 @@ schedule_dqacq(struct obd_device *obd,
         struct ptlrpc_request *req;
         struct qunit_data *reqdata;
         struct dqacq_async_args *aa;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*reqdata) };
+       unsigned long factor;   
         int rc = 0;
         ENTRY;
 
-        INIT_LIST_HEAD(&qw.qw_entry);
+        CFS_INIT_LIST_HEAD(&qw.qw_entry);
         init_waitqueue_head(&qw.qw_waitq);
         qw.qw_rc = 0;
 
@@ -616,7 +661,8 @@ schedule_dqacq(struct obd_device *obd,
         LASSERT(qunit);
 
         /* master is going to dqacq/dqrel from itself */
-        if (is_master(obd, qctxt, qdata->qd_id, qdata->qd_flags & QUOTA_IS_GRP)) {
+        if (is_master(obd, qctxt, qdata->qd_id, qdata->qd_flags & QUOTA_IS_GRP))
+        {
                 int rc2;
                 QDATA_DEBUG(qdata, "local %s.\n",
                             opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
@@ -627,33 +673,45 @@ schedule_dqacq(struct obd_device *obd,
 
         /* build dqacq/dqrel request */
         LASSERT(qctxt->lqc_import);
-        req = ptlrpc_prep_req(qctxt->lqc_import, LUSTRE_MDS_VERSION, opc, 2,
-                              size, NULL);
-        if (!req) {
+
+        req = ptlrpc_request_alloc_pack(qctxt->lqc_import, &RQF_MDS_QUOTA_DQACQ,
+                                        LUSTRE_MDS_VERSION, opc);
+        if (req == NULL) {
                 dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
                 RETURN(-ENOMEM);
         }
 
+       if (qdata->qd_flags & QUOTA_IS_BLOCK)
+               factor = MAX_QUOTA_COUNT32 / qctxt->lqc_bunit_sz * 
+                         qctxt->lqc_bunit_sz;
+        else
+                factor = MAX_QUOTA_COUNT32 / qctxt->lqc_iunit_sz * 
+                         qctxt->lqc_iunit_sz;
+
         LASSERT(!should_translate_quota(qctxt->lqc_import) || 
-                qdata->qd_count <= MAX_QUOTA_COUNT32);
+                qdata->qd_count <= factor);
         if (should_translate_quota(qctxt->lqc_import))
         {
                 struct qunit_data_old *reqdata_old, *tmp;
                         
-                reqdata_old = lustre_msg_buf(req->rq_reqmsg, REPLY_REC_OFF, 
-                                             sizeof(*reqdata_old));
+                reqdata_old = req_capsule_client_get(&req->rq_pill,
+                                                     &RMF_QUNIT_DATA);
+
                 tmp = lustre_quota_new_to_old(qdata);
                 *reqdata_old = *tmp;
-                size[1] = sizeof(*reqdata_old);
+                req_capsule_set_size(&req->rq_pill, &RMF_QUNIT_DATA, RCL_SERVER,
+                                     sizeof(*reqdata_old));
                 CDEBUG(D_QUOTA, "qd_count is 32bit!\n");
         } else {
-                reqdata = lustre_msg_buf(req->rq_reqmsg, REPLY_REC_OFF,
-                                         sizeof(*reqdata));
+                reqdata = req_capsule_client_get(&req->rq_pill,
+                                                 &RMF_QUNIT_DATA);
+
                 *reqdata = *qdata;
-                size[1] = sizeof(*reqdata);
+                req_capsule_set_size(&req->rq_pill, &RMF_QUNIT_DATA, RCL_SERVER,
+                                     sizeof(*reqdata));
                 CDEBUG(D_QUOTA, "qd_count is 64bit!\n");
         }
-        ptlrpc_req_set_repsize(req, 2, size);
+        ptlrpc_request_set_replen(req);
 
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = (struct dqacq_async_args *)&req->rq_async_args;
@@ -724,7 +782,7 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
         struct l_wait_info lwi = { 0 };
         ENTRY;
 
-        INIT_LIST_HEAD(&qw.qw_entry);
+        CFS_INIT_LIST_HEAD(&qw.qw_entry);
         init_waitqueue_head(&qw.qw_waitq);
         qw.qw_rc = 0;
 
@@ -847,7 +905,7 @@ static int qslave_recovery_main(void *arg)
                 }
 
                 LASSERT(dqopt->files[type] != NULL);
-                INIT_LIST_HEAD(&id_list);
+                CFS_INIT_LIST_HEAD(&id_list);
 #ifndef KERNEL_SUPPORTS_QUOTA_READ 
                 rc = fsfilt_qids(obd, dqopt->files[type], NULL, type, &id_list);
 #else
@@ -915,4 +973,3 @@ qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt)
 exit:
         EXIT;
 }
-