Whamcloud - gitweb
LU-1842 quota: ldiskfs local enforcement
authorNiu Yawei <niu@whamcloud.com>
Mon, 10 Sep 2012 06:30:34 +0000 (02:30 -0400)
committerOleg Drokin <green@whamcloud.com>
Mon, 17 Sep 2012 22:53:49 +0000 (18:53 -0400)
Quota local enforcement for ldiskfs osd

Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: Ica4c1ae74c4af5a913691e60c080cb431e273c3c
Reviewed-on: http://review.whamcloud.com/3915
Tested-by: Hudson
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
lustre/include/lquota.h
lustre/include/lustre/lustre_idl.h
lustre/ofd/ofd_io.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osd-ldiskfs/osd_io.c
lustre/osd-ldiskfs/osd_quota.c

index 9f2e3e5..947e2bc 100644 (file)
@@ -48,6 +48,50 @@ union lquota_rec {
 #define QUOTA_DATAPOOL_NAME   "ost="
 
 /*
+ * Quota information attached to a transaction
+ */
+
+struct lquota_entry;
+
+struct lquota_id_info {
+       /* quota identifier */
+       union lquota_id          lqi_id;
+
+       /* USRQUOTA or GRPQUOTA for now, could be expanded for
+        * directory quota or other types later.  */
+       int                      lqi_type;
+
+       /* inodes or kbytes to be consumed or released, it could
+        * be negative when releasing space.  */
+       long long                lqi_space;
+
+       /* quota slave entry structure associated with this ID */
+       struct lquota_entry     *lqi_qentry;
+
+       /* whether we are reporting blocks or inodes */
+       bool                     lqi_is_blk;
+};
+
+/* Since we enforce only inode quota in meta pool (MDTs), and block quota in
+ * data pool (OSTs), there are at most 4 quota ids being enforced in a single
+ * transaction, which is chown transaction:
+ * original uid and gid, new uid and gid.
+ *
+ * This value might need to be revised when directory quota is added.  */
+#define QUOTA_MAX_TRANSIDS    4
+
+/* all qids involved in a single transaction */
+struct lquota_trans {
+        unsigned short         lqt_id_cnt;
+        struct lquota_id_info  lqt_ids[QUOTA_MAX_TRANSIDS];
+};
+
+/* flags for quota local enforcement */
+#define QUOTA_FL_OVER_USRQUOTA  0x01
+#define QUOTA_FL_OVER_GRPQUOTA  0x02
+#define QUOTA_FL_SYNC           0x04
+
+/*
  * Quota enforcement support on slaves
  */
 
@@ -79,6 +123,23 @@ void qsd_fini(const struct lu_env *, struct qsd_instance *);
 int lquotactl_slv(const struct lu_env *, struct dt_device *,
                  struct obd_quotactl *);
 
+/* XXX: dummy qsd_op_begin() & qsd_op_end(), will be replaced with the real
+ *      one once all the enforcement code landed. */
+static inline int qsd_op_begin(const struct lu_env *env,
+                              struct qsd_instance *qsd,
+                              struct lquota_trans *trans,
+                              struct lquota_id_info *qi,
+                              int *flags)
+{
+       return 0;
+}
+
+static inline void qsd_op_end(const struct lu_env *env,
+                             struct qsd_instance *qsd,
+                             struct lquota_trans *trans)
+{
+}
+
 #ifdef LPROCFS
 /* dumb procfs handler which always report success, for backward compatibility
  * purpose */
index 0622b44..cd4ed5f 100644 (file)
@@ -1527,6 +1527,8 @@ extern void lustre_swab_obd_statfs (struct obd_statfs *os);
 #define OBD_BRW_SRVLOCK        0x200 /* Client holds no lock over this page */
 #define OBD_BRW_ASYNC          0x400 /* Server may delay commit to disk */
 #define OBD_BRW_MEMALLOC       0x800 /* Client runs in the "kswapd" context */
+#define OBD_BRW_OVER_USRQUOTA 0x1000 /* Running out of user quota */
+#define OBD_BRW_OVER_GRPQUOTA 0x2000 /* Running out of group quota */
 
 #define OBD_OBJECT_EOF 0xffffffffffffffffULL
 
index fa74959..320deab 100644 (file)
@@ -167,6 +167,9 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                                lnb[j+k].lnb_rc = -ENOSPC;
                        if (!(rnb[i].rnb_flags & OBD_BRW_ASYNC))
                                oti->oti_sync_write = 1;
+                       /* remote client can't break through quota */
+                       if (exp_connect_rmtclient(exp))
+                               lnb[j+k].lnb_flags &= ~OBD_BRW_NOQUOTA;
                }
                j += rc;
                LASSERT(j <= PTLRPC_MAX_BRW_PAGES);
@@ -508,12 +511,27 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                else
                        obdo_from_la(oa, &info->fti_attr, LA_GID | LA_UID);
 
-               if (ofd_grant_prohibit(exp, ofd))
-                       /* Trick to prevent clients from waiting for bulk write
-                        * in flight since they won't get any grant in the reply
-                        * anyway so they had better firing the sync write RPC
-                        * straight away */
+               /* don't report overquota flag if we failed before reaching
+                * commit */
+               if (old_rc == 0 && (rc == 0 || rc == -EDQUOT)) {
+                       /* return the overquota flags to client */
+                       if (lnb[0].lnb_flags & OBD_BRW_OVER_USRQUOTA) {
+                               if (oa->o_valid & OBD_MD_FLFLAGS)
+                                       oa->o_flags |= OBD_FL_NO_USRQUOTA;
+                               else
+                                       oa->o_flags = OBD_FL_NO_USRQUOTA;
+                       }
+
+                       if (lnb[0].lnb_flags & OBD_BRW_OVER_GRPQUOTA) {
+                               if (oa->o_valid & OBD_MD_FLFLAGS)
+                                       oa->o_flags |= OBD_FL_NO_GRPQUOTA;
+                               else
+                                       oa->o_flags = OBD_FL_NO_GRPQUOTA;
+                       }
+
+                       oa->o_valid |= OBD_MD_FLFLAGS;
                        oa->o_valid |= OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA;
+               }
        } else if (cmd == OBD_BRW_READ) {
                struct ldlm_namespace *ns = ofd->ofd_namespace;
 
index 76a2d19..aee20d7 100644 (file)
@@ -176,51 +176,6 @@ static int osd_root_get(const struct lu_env *env,
         return 0;
 }
 
-static inline int osd_qid_type(struct osd_thandle *oh, int i)
-{
-        return (oh->ot_id_type & (1 << i)) ? GRPQUOTA : USRQUOTA;
-}
-
-static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type)
-{
-        oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0);
-}
-
-void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
-                     int type, uid_t id, struct inode *inode)
-{
-#ifdef CONFIG_QUOTA
-        int i, allocated = 0;
-        struct osd_object *obj;
-
-        LASSERT(dt != NULL);
-        LASSERT(oh != NULL);
-        LASSERTF(oh->ot_id_cnt <= OSD_MAX_UGID_CNT, "count=%u",
-                 oh->ot_id_cnt);
-
-        /* id entry is allocated in the quota file */
-        if (inode && inode->i_dquot[type] && inode->i_dquot[type]->dq_off)
-                allocated = 1;
-
-        for (i = 0; i < oh->ot_id_cnt; i++) {
-                if (oh->ot_id_array[i] == id && osd_qid_type(oh, i) == type)
-                        return;
-        }
-
-        if (unlikely(i >= OSD_MAX_UGID_CNT)) {
-                CERROR("more than %d uid/gids for a transaction?\n", i);
-                return;
-        }
-
-        oh->ot_id_array[i] = id;
-        osd_qid_set_type(oh, i, type);
-        oh->ot_id_cnt++;
-        obj = osd_dt_obj(dt);
-        oh->ot_credits += (allocated || id == 0) ?
-                1 : LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(osd_obj2dev(obj)));
-#endif
-}
-
 /*
  * OSD object methods.
  */
@@ -745,6 +700,8 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
         th = ERR_PTR(-ENOMEM);
         OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
         if (oh != NULL) {
+               oh->ot_quota_trans = &oti->oti_quota_trans;
+               memset(oh->ot_quota_trans, 0, sizeof(*oh->ot_quota_trans));
                 th = &oh->ot_super;
                 th->th_dev = d;
                 th->th_result = 0;
@@ -855,11 +812,17 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
         struct osd_thandle     *oh;
         struct osd_thread_info *oti = osd_oti_get(env);
         struct osd_iobuf       *iobuf = &oti->oti_iobuf;
-
+       struct qsd_instance    *qsd = oti->oti_dev->od_quota_slave;
         ENTRY;
 
         oh = container_of0(th, struct osd_thandle, ot_super);
 
+       if (qsd != NULL)
+               /* inform the quota slave device that the transaction is
+                * stopping */
+               qsd_op_end(env, qsd, oh->ot_quota_trans);
+       oh->ot_quota_trans = NULL;
+
         if (oh->ot_handle != NULL) {
                 handle_t *hdl = oh->ot_handle;
 
@@ -1453,35 +1416,139 @@ static int osd_declare_attr_set(const struct lu_env *env,
                                 const struct lu_attr *attr,
                                 struct thandle *handle)
 {
-        struct osd_thandle *oh;
-        struct osd_object *obj;
+       struct osd_thandle     *oh;
+       struct osd_object      *obj;
+       struct osd_thread_info *info = osd_oti_get(env);
+       struct lquota_id_info  *qi = &info->oti_qi;
+       long long               bspace;
+       int                     rc = 0;
+       bool                    allocated;
+       ENTRY;
 
-        LASSERT(dt != NULL);
-        LASSERT(handle != NULL);
+       LASSERT(dt != NULL);
+       LASSERT(handle != NULL);
 
-        obj = osd_dt_obj(dt);
-        LASSERT(osd_invariant(obj));
+       obj = osd_dt_obj(dt);
+       LASSERT(osd_invariant(obj));
 
-        oh = container_of0(handle, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle == NULL);
+       oh = container_of0(handle, struct osd_thandle, ot_super);
+       LASSERT(oh->ot_handle == NULL);
 
-        OSD_DECLARE_OP(oh, attr_set);
-        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+       OSD_DECLARE_OP(oh, attr_set);
+       oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
 
-        if (attr && attr->la_valid & LA_UID) {
-                if (obj->oo_inode)
-                        osd_declare_qid(dt, oh, USRQUOTA, obj->oo_inode->i_uid,
-                                        obj->oo_inode);
-                osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL);
-        }
-        if (attr && attr->la_valid & LA_GID) {
-                if (obj->oo_inode)
-                        osd_declare_qid(dt, oh, GRPQUOTA, obj->oo_inode->i_gid,
-                                        obj->oo_inode);
-                osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL);
-        }
+       if (attr == NULL || obj->oo_inode == NULL)
+               RETURN(rc);
 
-        return 0;
+       bspace   = obj->oo_inode->i_blocks;
+       bspace <<= obj->oo_inode->i_sb->s_blocksize_bits;
+       bspace   = toqb(bspace);
+
+       /* Changing ownership is always preformed by super user, it should not
+        * fail with EDQUOT.
+        *
+        * We still need to call the osd_declare_qid() to calculate the journal
+        * credits for updating quota accounting files and to trigger quota
+        * space adjustment once the operation is completed.*/
+       if ((attr->la_valid & LA_UID) != 0 &&
+            attr->la_uid != obj->oo_inode->i_uid) {
+               qi->lqi_type = USRQUOTA;
+
+               /* inode accounting */
+               qi->lqi_is_blk = false;
+
+               /* one more inode for the new owner ... */
+               qi->lqi_id.qid_uid = attr->la_uid;
+               qi->lqi_space      = 1;
+               allocated = (attr->la_uid == 0) ? true : false;
+               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               if (rc == -EDQUOT || rc == -EINPROGRESS)
+                       rc = 0;
+               if (rc)
+                       RETURN(rc);
+
+               /* and one less inode for the current uid */
+               qi->lqi_id.qid_uid = obj->oo_inode->i_uid;
+               qi->lqi_space      = -1;
+               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               if (rc == -EDQUOT || rc == -EINPROGRESS)
+                       rc = 0;
+               if (rc)
+                       RETURN(rc);
+
+               /* block accounting */
+               qi->lqi_is_blk = true;
+
+               /* more blocks for the new owner ... */
+               qi->lqi_id.qid_uid = attr->la_uid;
+               qi->lqi_space      = bspace;
+               allocated = (attr->la_uid == 0) ? true : false;
+               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               if (rc == -EDQUOT || rc == -EINPROGRESS)
+                       rc = 0;
+               if (rc)
+                       RETURN(rc);
+
+               /* and finally less blocks for the current owner */
+               qi->lqi_id.qid_uid = obj->oo_inode->i_uid;
+               qi->lqi_space      = -bspace;
+               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               if (rc == -EDQUOT || rc == -EINPROGRESS)
+                       rc = 0;
+               if (rc)
+                       RETURN(rc);
+       }
+
+       if (attr->la_valid & LA_GID &&
+           attr->la_gid != obj->oo_inode->i_gid) {
+               qi->lqi_type = GRPQUOTA;
+
+               /* inode accounting */
+               qi->lqi_is_blk = false;
+
+               /* one more inode for the new group owner ... */
+               qi->lqi_id.qid_gid = attr->la_gid;
+               qi->lqi_space      = 1;
+               allocated = (attr->la_gid == 0) ? true : false;
+               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               if (rc == -EDQUOT || rc == -EINPROGRESS)
+                       rc = 0;
+               if (rc)
+                       RETURN(rc);
+
+               /* and one less inode for the current gid */
+               qi->lqi_id.qid_gid = obj->oo_inode->i_gid;
+               qi->lqi_space      = -1;
+               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               if (rc == -EDQUOT || rc == -EINPROGRESS)
+                       rc = 0;
+               if (rc)
+                       RETURN(rc);
+
+               /* block accounting */
+               qi->lqi_is_blk = true;
+
+               /* more blocks for the new owner ... */
+               qi->lqi_id.qid_gid = attr->la_gid;
+               qi->lqi_space      = bspace;
+               allocated = (attr->la_gid == 0) ? true : false;
+               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               if (rc == -EDQUOT || rc == -EINPROGRESS)
+                       rc = 0;
+               if (rc)
+                       RETURN(rc);
+
+               /* and finally less blocks for the current owner */
+               qi->lqi_id.qid_gid = obj->oo_inode->i_gid;
+               qi->lqi_space      = -bspace;
+               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               if (rc == -EDQUOT || rc == -EINPROGRESS)
+                       rc = 0;
+               if (rc)
+                       RETURN(rc);
+       }
+
+       RETURN(rc);
 }
 
 static int osd_inode_setattr(const struct lu_env *env,
@@ -1952,7 +2019,9 @@ static int osd_declare_object_create(const struct lu_env *env,
                                      struct dt_object_format *dof,
                                      struct thandle *handle)
 {
-        struct osd_thandle *oh;
+       struct osd_thandle      *oh;
+       int                      rc;
+       ENTRY;
 
         LASSERT(handle != NULL);
 
@@ -1981,11 +2050,12 @@ static int osd_declare_object_create(const struct lu_env *env,
                 oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BASE];
         }
 
-        if (attr) {
-                osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL);
-                osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL);
-        }
-        return 0;
+       if (!attr)
+               RETURN(0);
+
+       rc = osd_declare_inode_qid(env, attr->la_uid, attr->la_gid, 1, oh,
+                                  false, false, NULL, false);
+       RETURN(rc);
 }
 
 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
@@ -2034,7 +2104,7 @@ static int osd_declare_object_destroy(const struct lu_env *env,
         struct osd_object  *obj = osd_dt_obj(dt);
         struct inode       *inode = obj->oo_inode;
         struct osd_thandle *oh;
-
+       int                 rc;
         ENTRY;
 
         oh = container_of0(th, struct osd_thandle, ot_super);
@@ -2053,10 +2123,15 @@ static int osd_declare_object_destroy(const struct lu_env *env,
                oh->ot_credits += 3;
         }
 
-        osd_declare_qid(dt, oh, USRQUOTA, inode->i_uid, inode);
-        osd_declare_qid(dt, oh, GRPQUOTA, inode->i_gid, inode);
-
-        RETURN(0);
+       /* one less inode */
+        rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, -1, oh,
+                                  false, true, NULL, false);
+       if (rc)
+               RETURN(rc);
+       /* data to be truncated */
+       rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh, true,
+                                  true, NULL, false);
+        RETURN(rc);
 }
 
 static int osd_object_destroy(const struct lu_env *env,
@@ -2932,6 +3007,9 @@ static int osd_index_declare_ea_delete(const struct lu_env *env,
                                        struct thandle *handle)
 {
         struct osd_thandle *oh;
+       struct inode       *inode;
+       int                 rc;
+       ENTRY;
 
         LASSERT(dt_object_exists(dt));
         LASSERT(handle != NULL);
@@ -2942,13 +3020,12 @@ static int osd_index_declare_ea_delete(const struct lu_env *env,
         OSD_DECLARE_OP(oh, delete);
         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
 
-        LASSERT(osd_dt_obj(dt)->oo_inode);
-        osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
-                        osd_dt_obj(dt)->oo_inode);
-        osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
-                        osd_dt_obj(dt)->oo_inode);
+       inode = osd_dt_obj(dt)->oo_inode;
+       LASSERT(inode);
 
-        return 0;
+       rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
+                                  true, true, NULL, false);
+       RETURN(rc);
 }
 
 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
@@ -3524,6 +3601,9 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
                                        struct thandle *handle)
 {
         struct osd_thandle *oh;
+       struct inode       *inode;
+       int                 rc;
+       ENTRY;
 
         LASSERT(dt_object_exists(dt));
         LASSERT(handle != NULL);
@@ -3534,13 +3614,15 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
         OSD_DECLARE_OP(oh, insert);
         oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
 
-        LASSERT(osd_dt_obj(dt)->oo_inode);
-        osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
-                        osd_dt_obj(dt)->oo_inode);
-        osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
-                        osd_dt_obj(dt)->oo_inode);
+       inode = osd_dt_obj(dt)->oo_inode;
+       LASSERT(inode);
 
-        return 0;
+       /* We ignore block quota on meta pool (MDTs), so needn't
+        * calculate how many blocks will be consumed by this index
+        * insert */
+       rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
+                                  true, true, NULL, false);
+       RETURN(rc);
 }
 
 /**
index add8dd4..898ee70 100644 (file)
@@ -355,6 +355,7 @@ struct osd_thandle {
         unsigned short          ot_id_cnt;
         unsigned short          ot_id_type;
         uid_t                   ot_id_array[OSD_MAX_UGID_CNT];
+       struct lquota_trans    *ot_quota_trans;
 
 #ifdef OSD_TRACK_DECLARES
         unsigned char           ot_declare_attr_set;
@@ -613,6 +614,8 @@ struct osd_thread_info {
                struct if_dqblk         oti_dqblk;
                struct if_dqinfo        oti_dqinfo;
        };
+       struct lquota_id_info  oti_qi;
+       struct lquota_trans    oti_quota_trans;
 };
 
 extern int ldiskfs_pdo;
@@ -632,8 +635,6 @@ int osd_statfs(const struct lu_env *env, struct dt_device *dev,
                struct obd_statfs *sfs);
 int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
                     struct lustre_capa *capa, __u64 opc);
-void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
-                     int type, uid_t id, struct inode *inode);
 struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev,
                       struct osd_inode_id *id);
 struct inode *osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev,
@@ -681,6 +682,13 @@ loff_t find_tree_dqentry(const struct lu_env *env,
                          struct osd_object *obj, int type,
                          qid_t dqid, uint blk, int depth,
                          struct osd_it_quota *it);
+/* osd_quota.c */
+int osd_declare_qid(const struct lu_env *env, struct osd_thandle *oh,
+                   struct lquota_id_info *qi, bool allocated, int *flags);
+int osd_declare_inode_qid(const struct lu_env *env, qid_t uid, qid_t gid,
+                         long long space, struct osd_thandle *oh,
+                         bool is_blk, bool allocated, int *flags, bool force);
+
 /*
  * Invariants, assertions.
  */
index af3d6af..11181b6 100644 (file)
@@ -594,6 +594,31 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt,
         RETURN(rc);
 }
 
+/* Check if a block is allocated or not */
+static int osd_is_mapped(struct inode *inode, obd_size offset)
+{
+       sector_t (*fs_bmap)(struct address_space *, sector_t);
+
+       fs_bmap = inode->i_mapping->a_ops->bmap;
+
+       /* We can't know if we are overwriting or not */
+       if (unlikely(fs_bmap == NULL))
+               return 0;
+
+       if (i_size_read(inode) == 0)
+               return 0;
+
+       /* Beyond EOF, must not be mapped */
+       if (((i_size_read(inode) - 1) >> inode->i_blkbits) <
+           (offset >> inode->i_blkbits))
+               return 0;
+
+       if (fs_bmap(inode->i_mapping, offset >> inode->i_blkbits) == 0)
+               return 0;
+
+       return 1;
+}
+
 static int osd_declare_write_commit(const struct lu_env *env,
                                     struct dt_object *dt,
                                     struct niobuf_local *lnb, int npages,
@@ -606,20 +631,36 @@ static int osd_declare_write_commit(const struct lu_env *env,
         int                      depth;
         int                      i;
         int                      newblocks;
-        int                      old;
+       int                      rc = 0;
+       int                      flags = 0;
+       bool                     ignore_quota = false;
+       long long                quota_space = 0;
+       ENTRY;
 
         LASSERT(handle != NULL);
         oh = container_of0(handle, struct osd_thandle, ot_super);
         LASSERT(oh->ot_handle == NULL);
 
-        old = oh->ot_credits;
         newblocks = npages;
 
         /* calculate number of extents (probably better to pass nb) */
-        for (i = 1; i < npages; i++)
-                if (lnb[i].offset !=
-                    lnb[i - 1].offset + lnb[i - 1].len)
-                        extents++;
+       for (i = 0; i < npages; i++) {
+               if (i && lnb[i].offset !=
+                   lnb[i - 1].offset + lnb[i - 1].len)
+                       extents++;
+
+               if (!osd_is_mapped(inode, lnb[i].offset))
+                       quota_space += CFS_PAGE_SIZE;
+
+               /* ignore quota for the whole request if any page is from
+                * client cache or written by root.
+                *
+                * XXX we could handle this on per-lnb basis as done by
+                * grant. */
+               if ((lnb[i].flags & OBD_BRW_NOQUOTA) ||
+                   !(lnb[i].flags & OBD_BRW_SYNC))
+                       ignore_quota = true;
+       }
 
         /*
          * each extent can go into new leaf causing a split
@@ -643,6 +684,12 @@ static int osd_declare_write_commit(const struct lu_env *env,
                 oh->ot_credits += depth * extents;
         }
 
+       /* quota space for metadata blocks */
+       quota_space += depth * extents * LDISKFS_BLOCK_SIZE(osd_sb(osd));
+
+       /* quota space should be reported in 1K blocks */
+       quota_space = toqb(quota_space);
+
         /* each new block can go in different group (bitmap + gd) */
 
         /* we can't dirty more bitmap blocks than exist */
@@ -657,26 +704,25 @@ static int osd_declare_write_commit(const struct lu_env *env,
         else
                 oh->ot_credits += newblocks;
 
-        RETURN(0);
-}
+       /* make sure the over quota flags were not set */
+       lnb[0].flags &= ~(OBD_BRW_OVER_USRQUOTA | OBD_BRW_OVER_GRPQUOTA);
 
-/* Check if a block is allocated or not */
-static int osd_is_mapped(struct inode *inode, obd_size offset)
-{
-        sector_t (*fs_bmap)(struct address_space *, sector_t);
-
-        fs_bmap = inode->i_mapping->a_ops->bmap;
-
-        /* We can't know if we are overwriting or not */
-        if (fs_bmap == NULL)
-                return 0;
+       rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid,
+                                  quota_space, oh, true, true, &flags,
+                                  ignore_quota);
 
-        if (fs_bmap(inode->i_mapping, offset >> inode->i_blkbits) == 0)
-                return 0;
+       /* we need only to store the overquota flags in the first lnb for
+        * now, once we support multiple objects BRW, this code needs be
+        * revised. */
+       if (flags & QUOTA_FL_OVER_USRQUOTA)
+               lnb[0].flags |= OBD_BRW_OVER_USRQUOTA;
+       if (flags & QUOTA_FL_OVER_GRPQUOTA)
+               lnb[0].flags |= OBD_BRW_OVER_GRPQUOTA;
 
-        return 1;
+       RETURN(rc);
 }
 
+/* Check if a block is allocated or not */
 static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
                             struct niobuf_local *lnb, int npages,
                             struct thandle *thandle)
@@ -931,6 +977,9 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
 {
         struct osd_thandle *oh;
         int                 credits;
+       struct inode       *inode;
+       int                 rc;
+       ENTRY;
 
         LASSERT(handle != NULL);
 
@@ -952,14 +1001,18 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
         OSD_DECLARE_OP(oh, write);
         oh->ot_credits += credits;
 
-        if (osd_dt_obj(dt)->oo_inode == NULL)
-                return 0;
+       inode = osd_dt_obj(dt)->oo_inode;
 
-        osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
-                        osd_dt_obj(dt)->oo_inode);
-        osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
-                        osd_dt_obj(dt)->oo_inode);
-        return 0;
+       /* we may declare write to non-exist llog */
+       if (inode == NULL)
+               RETURN(0);
+
+       /* dt_declare_write() is usually called for system objects, such
+        * as llog or last_rcvd files. We needn't enforce quota on those
+        * objects, so always set the lqi_space as 0. */
+       rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
+                                  true, true, NULL, false);
+       RETURN(rc);
 }
 
 static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
@@ -1109,6 +1162,8 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt,
                              __u64 start, __u64 end, struct thandle *th)
 {
         struct osd_thandle *oh;
+       struct inode       *inode;
+       int                 rc;
         ENTRY;
 
         LASSERT(th);
@@ -1127,7 +1182,12 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt,
         oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
         oh->ot_credits += 3;
 
-        RETURN(0);
+       inode = osd_dt_obj(dt)->oo_inode;
+       LASSERT(inode);
+
+       rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
+                                  true, true, NULL, false);
+       RETURN(rc);
 }
 
 static int osd_punch(const struct lu_env *env, struct dt_object *dt,
index 1d26b48..5abfafa 100644 (file)
@@ -415,3 +415,130 @@ const struct dt_index_operations osd_acct_index_ops = {
        }
 };
 
+static inline int osd_qid_type(struct osd_thandle *oh, int i)
+{
+       return (oh->ot_id_type & (1 << i)) ? GRPQUOTA : USRQUOTA;
+}
+
+static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type)
+{
+       oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0);
+}
+
+/**
+ * Reserve journal credits for quota files update first, then call
+ * ->op_begin() to perform quota enforcement.
+ *
+ * \param  env    - the environment passed by the caller
+ * \param  oh     - osd transaction handle
+ * \param  qi     - quota id & space required for this operation
+ * \param  allocated - dquot entry in quota accounting file has been allocated
+ * \param  flags  - if the operation is write, return no user quota, no
+ *                  group quota, or sync commit flags to the caller
+ *
+ * \retval 0      - success
+ * \retval -ve    - failure
+ */
+int osd_declare_qid(const struct lu_env *env, struct osd_thandle *oh,
+                    struct lquota_id_info *qi, bool allocated, int *flags)
+{
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct osd_device       *dev = info->oti_dev;
+       struct qsd_instance     *qsd = dev->od_quota_slave;
+       int                      i, rc;
+       bool                     found = false;
+       ENTRY;
+
+       LASSERT(oh != NULL);
+       LASSERTF(oh->ot_id_cnt <= OSD_MAX_UGID_CNT, "count=%d\n",
+                oh->ot_id_cnt);
+
+       for (i = 0; i < oh->ot_id_cnt; i++) {
+               if (oh->ot_id_array[i] == qi->lqi_id.qid_uid &&
+                   osd_qid_type(oh, i) == qi->lqi_type) {
+                       found = true;
+                       break;
+               }
+       }
+
+       if (!found) {
+               /* we need to account for credits for this new ID */
+               if (i >= OSD_MAX_UGID_CNT) {
+                       CERROR("Too many(%d) trans qids!\n", i + 1);
+                       RETURN(-EOVERFLOW);
+               }
+
+               oh->ot_credits += (allocated || qi->lqi_id.qid_uid == 0) ?
+                       1 : LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(dev));
+
+               oh->ot_id_array[i] = qi->lqi_id.qid_uid;
+               osd_qid_set_type(oh, i, qi->lqi_type);
+               oh->ot_id_cnt++;
+       }
+
+       if (unlikely(qsd == NULL))
+               /* quota slave instance hasn't been allocated yet */
+               RETURN(0);
+
+       /* check quota */
+       rc = qsd_op_begin(env, qsd, oh->ot_quota_trans, qi, flags);
+       RETURN(rc);
+}
+
+/**
+ * Wrapper for osd_declare_qid()
+ *
+ * \param  env    - the environment passed by the caller
+ * \param  uid    - user id of the inode
+ * \param  gid    - group id of the inode
+ * \param  space  - how many blocks/inodes will be consumed/released
+ * \param  oh     - osd transaction handle
+ * \param  is_blk - block quota or inode quota?
+ * \param  allocated - dquot entry in quota accounting file has been allocated
+ * \param  flags  - if the operation is write, return no user quota, no
+ *                  group quota, or sync commit flags to the caller
+ * \param force   - set to 1 when changes are performed by root user and thus
+ *                  can't failed with EDQUOT
+ *
+ * \retval 0      - success
+ * \retval -ve    - failure
+ */
+int osd_declare_inode_qid(const struct lu_env *env, qid_t uid, qid_t gid,
+                         long long space, struct osd_thandle *oh,
+                         bool is_blk, bool allocated, int *flags, bool force)
+{
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct lquota_id_info   *qi = &info->oti_qi;
+       int                      rcu, rcg; /* user & group rc */
+       ENTRY;
+
+       /* let's start with user quota */
+       qi->lqi_id.qid_uid = uid;
+       qi->lqi_type       = USRQUOTA;
+       qi->lqi_space      = space;
+       qi->lqi_is_blk     = is_blk;
+       rcu = osd_declare_qid(env, oh, qi, allocated, flags);
+
+       if (force && (rcu == -EDQUOT || rcu == -EINPROGRESS))
+               /* ignore EDQUOT & EINPROGRESS when changes are done by root */
+               rcu = 0;
+
+       /* For non-fatal error, we want to continue to get the noquota flags
+        * for group id. This is only for commit write, which has @flags passed
+        * in. See osd_declare_write_commit().
+        * When force is set to true, we also want to proceed with the gid */
+       if (rcu && (rcu != -EDQUOT || flags == NULL))
+               RETURN(rcu);
+
+       /* and now group quota */
+       qi->lqi_id.qid_gid = gid;
+       qi->lqi_type       = GRPQUOTA;
+       rcg = osd_declare_qid(env, oh, qi, allocated, flags);
+
+       if (force && (rcg == -EDQUOT || rcg == -EINPROGRESS))
+               /* as before, ignore EDQUOT & EINPROGRESS for root */
+               rcg = 0;
+
+       RETURN(rcu ? rcu : rcg);
+}
+