Whamcloud - gitweb
Branch b_release_1_6_7
authortianzy <tianzy>
Mon, 15 Dec 2008 07:58:59 +0000 (07:58 +0000)
committertianzy <tianzy>
Mon, 15 Dec 2008 07:58:59 +0000 (07:58 +0000)
meta blocks ignored can lead to occasional -EDQUOT occasionally
b=16542
i=johann
i=alex.zhuravlev

lustre/include/linux/lustre_fsfilt.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_quota.h
lustre/lvfs/fsfilt_ext3.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/obdfilter/filter_io_26.c
lustre/quota/quota_context.c
lustre/quota/quota_interface.c

index 96ba77d..0e9f801 100644 (file)
@@ -124,6 +124,8 @@ struct fsfilt_operations {
         int     (* fs_qids)(struct file *file, struct inode *inode, int type,
                             struct list_head *list);
         int     (* fs_dquot)(struct lustre_dquot *dquot, int cmd);
+        int     (* fs_get_mblk)(struct super_block *sb, int *count,
+                                struct inode *inode, int frags);
         lvfs_sbdev_type (* fs_journal_sbdev)(struct super_block *sb);
 };
 
@@ -442,6 +444,15 @@ static inline int fsfilt_dquot(struct obd_device *obd,
         return -ENOTSUPP;
 }
 
+static inline int fsfilt_get_mblk(struct obd_device *obd,
+                                  struct super_block *sb, int *count,
+                                  struct inode *inode, int frags)
+{
+        if (obd->obd_fsops->fs_get_mblk)
+                return obd->obd_fsops->fs_get_mblk(sb, count, inode, frags);
+        return -ENOTSUPP;
+}
+
 static inline int fsfilt_map_inode_pages(struct obd_device *obd,
                                          struct inode *inode,
                                          struct page **page, int pages,
index 5fc9206..61681f1 100644 (file)
@@ -1661,7 +1661,6 @@ typedef enum {
 #define QUOTA_RET_NOQUOTA      1 /* not support quota */
 #define QUOTA_RET_NOLIMIT      2 /* quota limit isn't set */
 #define QUOTA_RET_ACQUOTA      4 /* need to acquire extra quota */
-#define QUOTA_RET_INC_PENDING  8 /* pending value is increased */
 
 extern int quota_get_qunit_data_size(__u64 flag);
 #endif
index 8f01499..43a0d31 100644 (file)
@@ -367,7 +367,7 @@ typedef struct {
 
         /* For quota slave, check whether specified uid/gid is over quota */
         int (*quota_getflag) (struct obd_device *, struct obdo *);
-
+#ifdef __KERNEL__
         /* For quota slave, acquire/release quota from master if needed */
         int (*quota_acquire) (struct obd_device *, unsigned int, unsigned int,
                               struct obd_trans_info *);
@@ -377,18 +377,18 @@ typedef struct {
          * record of block and inode, acquires quota if necessary */
         int (*quota_chkquota) (struct obd_device *, unsigned int, unsigned int,
                                int, int *, quota_acquire,
-                               struct obd_trans_info *);
+                               struct obd_trans_info *, struct inode *, int);
 
+        /* For quota client, the actions after the pending write is committed */
+        int (*quota_pending_commit) (struct obd_device *, unsigned int,
+                                     unsigned int, int);
+#endif
         /* For quota client, poll if the quota check done */
         int (*quota_poll_check) (struct obd_export *, struct if_quotacheck *);
 
         /* For quota client, check whether specified uid/gid is over quota */
         int (*quota_chkdq) (struct client_obd *, unsigned int, unsigned int);
 
-        /* For quota client, the actions after the pending write is committed */
-        int (*quota_pending_commit) (struct obd_device *, unsigned int,
-                                     unsigned int, int);
-
         /* For quota client, set over quota flag for specifed uid/gid */
         int (*quota_setdq) (struct client_obd *, unsigned int, unsigned int,
                             obd_flag, obd_flag);
@@ -576,6 +576,7 @@ static inline int lquota_getflag(quota_interface_t *interface,
         RETURN(rc);
 }
 
+#ifdef __KERNEL__
 static inline int lquota_acquire(quota_interface_t *interface,
                                  struct obd_device *obd,
                                  unsigned int uid, unsigned int gid,
@@ -592,7 +593,8 @@ static inline int lquota_acquire(quota_interface_t *interface,
 static inline int lquota_chkquota(quota_interface_t *interface,
                                   struct obd_device *obd,
                                   unsigned int uid, unsigned int gid, int count,
-                                  int *flag, struct obd_trans_info *oti)
+                                  int *flag, struct obd_trans_info *oti,
+                                  struct inode *inode, int frags)
 {
         int rc;
         ENTRY;
@@ -600,22 +602,24 @@ static inline int lquota_chkquota(quota_interface_t *interface,
         QUOTA_CHECK_OP(interface, chkquota);
         QUOTA_CHECK_OP(interface, acquire);
         rc = QUOTA_OP(interface, chkquota)(obd, uid, gid, count, flag,
-                                           QUOTA_OP(interface, acquire), oti);
+                                           QUOTA_OP(interface, acquire), oti,
+                                           inode, frags);
         RETURN(rc);
 }
 
 static inline int lquota_pending_commit(quota_interface_t *interface,
                                         struct obd_device *obd,
                                         unsigned int uid, unsigned int gid,
-                                        int npage)
+                                        int pending)
 {
         int rc;
         ENTRY;
 
         QUOTA_CHECK_OP(interface, pending_commit);
-        rc = QUOTA_OP(interface, pending_commit)(obd, uid, gid, npage);
+        rc = QUOTA_OP(interface, pending_commit)(obd, uid, gid, pending);
         RETURN(rc);
 }
+#endif
 
 #ifndef __KERNEL__
 extern quota_interface_t osc_quota_interface;
index 825acbc..40281ae 100644 (file)
@@ -2125,6 +2125,16 @@ static int fsfilt_ext3_dquot(struct lustre_dquot *dquot, int cmd)
         }
         RETURN(rc);
 }
+
+static int fsfilt_ext3_get_mblk(struct super_block *sb, int *count,
+                                struct inode *inode, int frags)
+{
+        /* for an ost_write request, it needs <#fragments> * <tree depth + 1>
+         * metablocks at maxium b=16542 */
+        *count = frags * (EXT_DEPTH(inode) + 1) * EXT3_BLOCK_SIZE(sb);
+        return 0;
+}
+
 #endif
 
 static lvfs_sbdev_type fsfilt_ext3_journal_sbdev(struct super_block *sb)
@@ -2168,6 +2178,7 @@ static struct fsfilt_operations fsfilt_ext3_ops = {
         .fs_quotainfo           = fsfilt_ext3_quotainfo,
         .fs_qids                = fsfilt_ext3_qids,
         .fs_dquot               = fsfilt_ext3_dquot,
+        .fs_get_mblk            = fsfilt_ext3_get_mblk,
 #endif
         .fs_journal_sbdev       = fsfilt_ext3_journal_sbdev,
 };
index b471468..e943c02 100644 (file)
@@ -1100,7 +1100,8 @@ int mds_open(struct mds_update_record *rec, int offset,
                  * FIXME: after CMD is used, pointer to obd_trans_info* couldn't
                  * be NULL, b=14840 */
                 lquota_chkquota(mds_quota_interface_ref, obd,
-                                current->fsuid, gid, 1, &rec_pending, NULL);
+                                current->fsuid, gid, 1, &rec_pending,
+                                NULL, NULL, 0);
 
                 intent_set_disposition(rep, DISP_OPEN_CREATE);
                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE,
@@ -1274,7 +1275,7 @@ found_child:
  cleanup_no_trans:
         if (rec_pending)
                 lquota_pending_commit(mds_quota_interface_ref, obd,
-                                      current->fsuid, gid, 1);
+                                      current->fsuid, gid, rec_pending);
         switch (cleanup_phase) {
         case 2:
                 if (rc && created) {
index 9d1e347..5268880 100644 (file)
@@ -901,7 +901,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
          * FIXME: after CMD is used, pointer to obd_trans_info* couldn't
          * be NULL, b=14840 */
         lquota_chkquota(mds_quota_interface_ref, obd,
-                        current->fsuid, gid, 1, &rec_pending, NULL);
+                        current->fsuid, gid, 1, &rec_pending, NULL, NULL, 0);
 
         switch (type) {
         case S_IFREG:{
@@ -1029,7 +1029,7 @@ cleanup:
         err = mds_finish_transno(mds, dir, handle, req, rc, 0, 0);
         if (rec_pending)
                 lquota_pending_commit(mds_quota_interface_ref, obd,
-                                      current->fsuid, gid, 1);
+                                      current->fsuid, gid, rec_pending);
 
         if (rc && created) {
                 /* Destroy the file we just created.  This should not need
index a98d4fc..eea1c8e 100644 (file)
@@ -642,7 +642,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         struct lvfs_run_ctxt saved;
         struct fsfilt_objinfo fso;
         struct iattr iattr = { 0 };
-        struct inode *inode = NULL;
+        struct inode *inode = res->dentry->d_inode;
         unsigned long now = jiffies;
         int i, err, cleanup_phase = 0;
         struct obd_device *obd = exp->exp_obd;
@@ -661,8 +661,8 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
 
         /* we try to get enough quota to write here, and let ldiskfs
          * decide if it is out of quota or not b=14783 */
-        lquota_chkquota(filter_quota_interface_ref, obd, oa->o_uid,
-                        oa->o_gid, niocount, &rec_pending, oti);
+        lquota_chkquota(filter_quota_interface_ref, obd, oa->o_uid, oa->o_gid,
+                        niocount, &rec_pending, oti, inode, obj->ioo_bufcnt);
 
         iobuf = filter_iobuf_get(&obd->u.filter, oti);
         if (IS_ERR(iobuf))
@@ -671,7 +671,6 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
 
         fso.fso_dentry = res->dentry;
         fso.fso_bufcnt = obj->ioo_bufcnt;
-        inode = res->dentry->d_inode;
 
         iobuf->dr_ignore_quota = 0;
         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
@@ -795,7 +794,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
 cleanup:
         if (rec_pending)
                 lquota_pending_commit(filter_quota_interface_ref, obd, oa->o_uid,
-                                      oa->o_gid, niocount);
+                                      oa->o_gid, rec_pending);
 
         filter_grant_commit(exp, niocount, res);
 
index a238fab..daff303 100644 (file)
@@ -297,7 +297,7 @@ check_cur_qunit(struct obd_device *obd,
         if (QDATA_IS_BLK(qdata)) {
                 qunit_sz = lqs->lqs_bunit_sz;
                 tune_sz  = lqs->lqs_btune_sz;
-                pending_write = lqs->lqs_bwrite_pending * CFS_PAGE_SIZE;
+                pending_write = lqs->lqs_bwrite_pending;
                 record   = lqs->lqs_blk_rec;
                 LASSERT(!(qunit_sz % QUOTABLOCK_SIZE));
         } else {
index 4ede64d..7aa638c 100644 (file)
@@ -247,12 +247,14 @@ static int filter_quota_acquire(struct obd_device *obd, unsigned int uid,
 /* check whether the left quota of certain uid and gid can satisfy a block_write
  * or inode_create rpc. When need to acquire quota, return QUOTA_RET_ACQUOTA */
 static int quota_check_common(struct obd_device *obd, unsigned int uid,
-                              unsigned int gid, int count, int cycle, int isblk)
+                              unsigned int gid, int count, int cycle, int isblk,
+                              struct inode *inode, int frags, int *pending)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
         int i;
         __u32 id[MAXQUOTAS] = { uid, gid };
         struct qunit_data qdata[MAXQUOTAS];
+        int mb = 0;
         int rc = 0, rc2[2] = { 0, 0 };
         ENTRY;
 
@@ -287,20 +289,32 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid,
                 rc2[i] = compute_remquota(obd, qctxt, &qdata[i], isblk);
                 spin_lock(&lqs->lqs_lock);
                 if (!cycle) {
-                        rc = QUOTA_RET_INC_PENDING;
-                        if (isblk)
-                                lqs->lqs_bwrite_pending += count;
-                        else
-                                lqs->lqs_iwrite_pending += count;
+                        if (isblk) {
+                                *pending = count * CFS_PAGE_SIZE;
+                                /* in order to complete this write, we need extra
+                                 * meta blocks. This function can get it through
+                                 * data needed to be written b=16542 */
+                                mb = *pending;
+                                LASSERT(inode && frags > 0);
+                                if (fsfilt_get_mblk(obd, qctxt->lqc_sb, &mb,
+                                                    inode, frags) < 0)
+                                        CDEBUG(D_ERROR,
+                                               "can't get extra meta blocks.\n");
+                                else
+                                        *pending += mb;
+                                lqs->lqs_bwrite_pending += *pending;
+                        } else {
+                                *pending = count;
+                                lqs->lqs_iwrite_pending += *pending;
+                        }
                 }
 
-                CDEBUG(D_QUOTA, "count: %d, write pending: %lu, qd_count: "LPU64
-                       ".\n", count,
+                CDEBUG(D_QUOTA, "count: %d, lqs pending: %lu, qd_count: "LPU64
+                       ", metablocks: %d, isblk: %d, pending: %d.\n", count,
                        isblk ? lqs->lqs_bwrite_pending : lqs->lqs_iwrite_pending,
-                       qdata[i].qd_count);
+                       qdata[i].qd_count, mb, isblk, *pending);
                 if (rc2[i] == QUOTA_RET_OK) {
-                        if (isblk && qdata[i].qd_count <
-                            lqs->lqs_bwrite_pending * CFS_PAGE_SIZE)
+                        if (isblk && qdata[i].qd_count < lqs->lqs_bwrite_pending)
                                 rc2[i] = QUOTA_RET_ACQUOTA;
                         if (!isblk && qdata[i].qd_count <
                             lqs->lqs_iwrite_pending)
@@ -320,7 +334,7 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid,
         }
 
         if (rc2[0] == QUOTA_RET_ACQUOTA || rc2[1] == QUOTA_RET_ACQUOTA)
-                RETURN(rc | QUOTA_RET_ACQUOTA);
+                RETURN(QUOTA_RET_ACQUOTA);
         else
                 RETURN(rc);
 }
@@ -328,7 +342,8 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid,
 static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                                 unsigned int gid, int count, int *pending,
                                 int isblk, quota_acquire acquire,
-                                struct obd_trans_info *oti)
+                                struct obd_trans_info *oti, struct inode *inode,
+                                int frags)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
         struct timeval work_start;
@@ -344,8 +359,8 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
          * have to wait for the completion of in flight dqacq/dqrel,
          * in order to get enough quota for write b=12588 */
         do_gettimeofday(&work_start);
-        while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk)) &
-               QUOTA_RET_ACQUOTA) {
+        while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk,
+                                        inode, frags, pending)) & QUOTA_RET_ACQUOTA) {
 
                 spin_lock(&qctxt->lqc_lock);
                 if (!qctxt->lqc_import && oti) {
@@ -364,9 +379,6 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                         spin_unlock(&qctxt->lqc_lock);
                 }
 
-                if (rc & QUOTA_RET_INC_PENDING)
-                        *pending = 1;
-
                 cycle++;
                 if (isblk)
                         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
@@ -421,9 +433,6 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                        cycle);
         }
 
-        if (!cycle && rc & QUOTA_RET_INC_PENDING)
-                *pending = 1;
-
         do_gettimeofday(&work_end);
         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
         lprocfs_counter_add(qctxt->lqc_stats,
@@ -435,17 +444,18 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
 }
 
 static int filter_quota_check(struct obd_device *obd, unsigned int uid,
-                              unsigned int gid, int npage, int *flag,
-                              quota_acquire acquire, struct obd_trans_info *oti)
+                              unsigned int gid, int npage, int *pending,
+                              quota_acquire acquire, struct obd_trans_info *oti,
+                              struct inode *inode, int frags)
 {
-        return quota_chk_acq_common(obd, uid, gid, npage, flag, LQUOTA_FLAGS_BLK,
-                                    acquire, oti);
+        return quota_chk_acq_common(obd, uid, gid, npage, pending, LQUOTA_FLAGS_BLK,
+                                    acquire, oti, inode, frags);
 }
 
 /* when a block_write or inode_create rpc is finished, adjust the record for
  * pending blocks and inodes*/
 static int quota_pending_commit(struct obd_device *obd, unsigned int uid,
-                                unsigned int gid, int count, int isblk)
+                                unsigned int gid, int pending, int isblk)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
         struct timeval work_start;
@@ -478,27 +488,27 @@ static int quota_pending_commit(struct obd_device *obd, unsigned int uid,
                 if (lqs) {
                         int flag = 0;
                         spin_lock(&lqs->lqs_lock);
-                        CDEBUG(D_QUOTA, "pending: %lu, count: %d.\n",
-                               isblk ? lqs->lqs_bwrite_pending :
-                               lqs->lqs_iwrite_pending, count);
-
                         if (isblk) {
-                                if (lqs->lqs_bwrite_pending >= count) {
-                                        lqs->lqs_bwrite_pending -= count;
+                                if (lqs->lqs_bwrite_pending >= pending) {
+                                        lqs->lqs_bwrite_pending -= pending;
                                         flag = 1;
                                 } else {
                                         CDEBUG(D_ERROR,
                                                "there are too many blocks!\n");
                                 }
                         } else {
-                                if (lqs->lqs_iwrite_pending >= count) {
-                                        lqs->lqs_iwrite_pending -= count;
+                                if (lqs->lqs_iwrite_pending >= pending) {
+                                        lqs->lqs_iwrite_pending -= pending;
                                         flag = 1;
                                 } else {
                                         CDEBUG(D_ERROR,
                                                "there are too many files!\n");
                                 }
                         }
+                        CDEBUG(D_QUOTA, "lqs pending: %lu, pending: %d, "
+                               "isblk: %d.\n",
+                               isblk ? lqs->lqs_bwrite_pending :
+                               lqs->lqs_iwrite_pending, pending, isblk);
 
                         spin_unlock(&lqs->lqs_lock);
                         lqs_putref(lqs);
@@ -519,9 +529,9 @@ static int quota_pending_commit(struct obd_device *obd, unsigned int uid,
 }
 
 static int filter_quota_pending_commit(struct obd_device *obd, unsigned int uid,
-                                       unsigned int gid, int npage)
+                                       unsigned int gid, int blocks)
 {
-        return quota_pending_commit(obd, uid, gid, npage, LQUOTA_FLAGS_BLK);
+        return quota_pending_commit(obd, uid, gid, blocks, LQUOTA_FLAGS_BLK);
 }
 
 static int mds_quota_init(void)
@@ -581,11 +591,12 @@ static int mds_quota_fs_cleanup(struct obd_device *obd)
 }
 
 static int mds_quota_check(struct obd_device *obd, unsigned int uid,
-                           unsigned int gid, int inodes, int *flag,
-                           quota_acquire acquire, struct obd_trans_info *oti)
+                           unsigned int gid, int inodes, int *pending,
+                           quota_acquire acquire, struct obd_trans_info *oti,
+                           struct inode *inode, int frags)
 {
-        return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0,
-                                    acquire, oti);
+        return quota_chk_acq_common(obd, uid, gid, inodes, pending, 0,
+                                    acquire, oti, inode, frags);
 }
 
 static int mds_quota_acquire(struct obd_device *obd, unsigned int uid,