Whamcloud - gitweb
LU-1842 quota: zfs local enforcement
authorNiu Yawei <niu@whamcloud.com>
Tue, 11 Sep 2012 07:35:32 +0000 (03:35 -0400)
committerOleg Drokin <green@whamcloud.com>
Mon, 17 Sep 2012 22:54:06 +0000 (18:54 -0400)
Quota local enforcement for zfs osd.

Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: I91d6698cc5a3f1eb42dd7fc9120f83b70d2a7a6f
Reviewed-on: http://review.whamcloud.com/3933
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/osd-zfs/osd_handler.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_io.c
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_quota.c

index c34ab99..6bd4bf4 100644 (file)
@@ -131,6 +131,7 @@ static void osd_trans_commit_cb(void *cb_data, int error)
 {
        struct osd_thandle      *oh = cb_data;
        struct thandle          *th = &oh->ot_super;
 {
        struct osd_thandle      *oh = cb_data;
        struct thandle          *th = &oh->ot_super;
+       struct osd_device       *osd = osd_dt_dev(th->th_dev);
        struct lu_device        *lud = &th->th_dev->dd_lu_dev;
        struct dt_txn_commit_cb *dcb, *tmp;
 
        struct lu_device        *lud = &th->th_dev->dd_lu_dev;
        struct dt_txn_commit_cb *dcb, *tmp;
 
@@ -151,6 +152,14 @@ static void osd_trans_commit_cb(void *cb_data, int error)
        cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
                dcb->dcb_func(NULL, th, dcb, error);
 
        cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
                dcb->dcb_func(NULL, th, dcb, error);
 
+       /* Unlike ldiskfs, zfs updates space accounting at commit time.
+        * As a consequence, op_end is called only now to inform the quota slave
+        * component that reserved quota space is now accounted in usage and
+        * should be released. Quota space won't be adjusted at this point since
+        * we can't provide a suitable environment. It will be performed
+        * asynchronously by a lquota thread. */
+       qsd_op_end(NULL, osd->od_quota_slave, &oh->ot_quota_trans);
+
        lu_device_put(lud);
        th->th_dev = NULL;
        lu_context_exit(&th->th_ctx);
        lu_device_put(lud);
        th->th_dev = NULL;
        lu_context_exit(&th->th_ctx);
@@ -234,10 +243,23 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
                LASSERT(oh->ot_tx);
                dmu_tx_abort(oh->ot_tx);
                osd_object_sa_dirty_rele(oh);
                LASSERT(oh->ot_tx);
                dmu_tx_abort(oh->ot_tx);
                osd_object_sa_dirty_rele(oh);
+               /* there won't be any commit, release reserved quota space now,
+                * if any */
+               qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
                OBD_FREE_PTR(oh);
                RETURN(0);
        }
 
                OBD_FREE_PTR(oh);
                RETURN(0);
        }
 
+       /* When doing our own inode accounting, the ZAPs storing per-uid/gid
+        * usage are updated at operation execution time, so we should call
+        * qsd_op_end() straight away. Otherwise (for blk accounting maintained
+        * by ZFS and when #inode is estimated from #blks) accounting is updated
+        * at commit time and the call to qsd_op_end() must be delayed */
+       if (oh->ot_quota_trans.lqt_id_cnt > 0 &&
+                       !oh->ot_quota_trans.lqt_ids[0].lqi_is_blk &&
+                       !osd->od_quota_iused_est)
+               qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
+
        rc = dt_txn_hook_stop(env, th);
        if (rc != 0)
                CDEBUG(D_OTHER, "%s: transaction hook failed: rc = %d\n",
        rc = dt_txn_hook_stop(env, th);
        if (rc != 0)
                CDEBUG(D_OTHER, "%s: transaction hook failed: rc = %d\n",
@@ -279,6 +301,7 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
        CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
        CFS_INIT_LIST_HEAD(&oh->ot_sa_list);
        cfs_sema_init(&oh->ot_sa_lock, 1);
        CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
        CFS_INIT_LIST_HEAD(&oh->ot_sa_list);
        cfs_sema_init(&oh->ot_sa_lock, 1);
+       memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
        th = &oh->ot_super;
        th->th_dev = dt;
        th->th_result = 0;
        th = &oh->ot_super;
        th->th_dev = dt;
        th->th_result = 0;
index 540f2ec..cb9b8a5 100644 (file)
@@ -153,6 +153,8 @@ struct osd_thread_info {
        zap_attribute_t          oti_za;
        dmu_object_info_t        oti_doi;
        struct luz_direntry      oti_zde;
        zap_attribute_t          oti_za;
        dmu_object_info_t        oti_doi;
        struct luz_direntry      oti_zde;
+
+       struct lquota_id_info    oti_qi;
 };
 
 extern struct lu_context_key osd_key;
 };
 
 extern struct lu_context_key osd_key;
@@ -168,6 +170,7 @@ struct osd_thandle {
        cfs_list_t               ot_sa_list;
        cfs_semaphore_t          ot_sa_lock;
        dmu_tx_t                *ot_tx;
        cfs_list_t               ot_sa_list;
        cfs_semaphore_t          ot_sa_lock;
        dmu_tx_t                *ot_tx;
+       struct lquota_trans      ot_quota_trans;
        __u32                    ot_write_commit:1,
                                 ot_assigned:1;
 };
        __u32                    ot_write_commit:1,
                                 ot_assigned:1;
 };
@@ -268,6 +271,10 @@ int osd_statfs(const struct lu_env *, struct dt_device *, struct obd_statfs *);
 extern const struct dt_index_operations osd_acct_index_ops;
 uint64_t osd_quota_fid2dmu(const struct lu_fid *fid);
 extern struct lu_device_operations  osd_lu_ops;
 extern const struct dt_index_operations osd_acct_index_ops;
 uint64_t osd_quota_fid2dmu(const struct lu_fid *fid);
 extern struct lu_device_operations  osd_lu_ops;
+int osd_declare_quota(const struct lu_env *env, struct osd_device *osd,
+                     qid_t uid, qid_t gid, long long space,
+                     struct osd_thandle *oh, bool is_blk, int *flags,
+                     bool force);
 
 /*
  * Helpers.
 
 /*
  * Helpers.
index 4dafcfb..3cc3c62 100644 (file)
@@ -116,6 +116,7 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
                                struct thandle *th)
 {
        struct osd_object  *obj  = osd_dt_obj(dt);
                                struct thandle *th)
 {
        struct osd_object  *obj  = osd_dt_obj(dt);
+       struct osd_device  *osd = osd_obj2dev(obj);
        struct osd_thandle *oh;
        uint64_t            oid;
        ENTRY;
        struct osd_thandle *oh;
        uint64_t            oid;
        ENTRY;
@@ -142,7 +143,12 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
 
        dmu_tx_hold_write(oh->ot_tx, oid, pos, size);
 
 
        dmu_tx_hold_write(oh->ot_tx, oid, pos, size);
 
-       RETURN(0);
+       /* dt_declare_write() is usually called for system objects, such
+        * as llog or last_rcvd files. We needn't enforce quota on those
+        * objects, so always set the lqi_space as 0. */
+       RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
+                                obj->oo_attr.la_gid, 0, oh, true, NULL,
+                                false));
 }
 
 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
 }
 
 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
@@ -452,16 +458,86 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt,
        return 0;
 }
 
        return 0;
 }
 
+/* Return number of blocks that aren't mapped in the [start, start + size]
+ * region */
+static int osd_count_not_mapped(struct osd_object *obj, uint64_t start,
+                               uint32_t size)
+{
+       dmu_buf_impl_t  *dbi = (dmu_buf_impl_t *)obj->oo_db;
+       dmu_buf_impl_t  *db;
+       dnode_t         *dn;
+       uint32_t         blkshift;
+       uint64_t         end, blkid;
+       int              rc;
+       ENTRY;
+
+       DB_DNODE_ENTER(dbi);
+       dn = DB_DNODE(dbi);
+
+       if (dn->dn_maxblkid == 0) {
+               if (start + size <= dn->dn_datablksz)
+                       GOTO(out, size = 0);
+               if (start < dn->dn_datablksz)
+                       start = dn->dn_datablksz;
+               /* assume largest block size */
+               blkshift = SPA_MAXBLOCKSHIFT;
+       } else {
+               /* blocksize can't change */
+               blkshift = dn->dn_datablkshift;
+       }
+
+       /* compute address of last block */
+       end = (start + size - 1) >> blkshift;
+       /* align start on block boundaries */
+       start >>= blkshift;
+
+       /* size is null, can't be mapped */
+       if (obj->oo_attr.la_size == 0 || dn->dn_maxblkid == 0)
+               GOTO(out, size = (end - start + 1) << blkshift);
+
+       /* beyond EOF, can't be mapped */
+       if (start > dn->dn_maxblkid)
+               GOTO(out, size = (end - start + 1) << blkshift);
+
+       size = 0;
+       for (blkid = start; blkid <= end; blkid++) {
+               if (blkid == dn->dn_maxblkid)
+                       /* this one is mapped for sure */
+                       continue;
+               if (blkid > dn->dn_maxblkid) {
+                       size += (end - blkid + 1) << blkshift;
+                       GOTO(out, size);
+               }
+
+               rc = dbuf_hold_impl(dn, 0, blkid, TRUE, FTAG, &db);
+               if (rc) {
+                       /* for ENOENT (block not mapped) and any other errors,
+                        * assume the block isn't mapped */
+                       size += 1 << blkshift;
+                       continue;
+               }
+               dbuf_rele(db, FTAG);
+       }
+
+       GOTO(out, size);
+out:
+       DB_DNODE_EXIT(dbi);
+       return size;
+}
+
 static int osd_declare_write_commit(const struct lu_env *env,
                                struct dt_object *dt,
                                struct niobuf_local *lnb, int npages,
                                struct thandle *th)
 {
        struct osd_object  *obj = osd_dt_obj(dt);
 static int osd_declare_write_commit(const struct lu_env *env,
                                struct dt_object *dt,
                                struct niobuf_local *lnb, int npages,
                                struct thandle *th)
 {
        struct osd_object  *obj = osd_dt_obj(dt);
+       struct osd_device  *osd = osd_obj2dev(obj);
        struct osd_thandle *oh;
        uint64_t            offset = 0;
        uint32_t            size = 0;
        struct osd_thandle *oh;
        uint64_t            offset = 0;
        uint32_t            size = 0;
-       int                 i;
+       int                 i, rc, flags = 0;
+       bool                ignore_quota = false, synced = false;
+       long long           space = 0;
        ENTRY;
 
        LASSERT(dt_object_exists(dt));
        ENTRY;
 
        LASSERT(dt_object_exists(dt));
@@ -479,6 +555,14 @@ static int osd_declare_write_commit(const struct lu_env *env,
                         * skipped in osd_write_commit(). Hence we skip pages
                         * with lnb_rc != 0 here too */
                        continue;
                         * skipped in osd_write_commit(). Hence we skip pages
                         * with lnb_rc != 0 here too */
                        continue;
+               /* ignore quota for the whole request if any page is from
+                * client cache or written by root.
+                *
+                * XXX we could handle this on per-lnb basis as done by
+                * grant. */
+               if ((lnb[i].flags & OBD_BRW_NOQUOTA) ||
+                   !(lnb[i].flags & OBD_BRW_SYNC))
+                       ignore_quota = true;
                if (size == 0) {
                        /* first valid lnb */
                        offset = lnb[i].lnb_file_offset;
                if (size == 0) {
                        /* first valid lnb */
                        offset = lnb[i].lnb_file_offset;
@@ -493,18 +577,55 @@ static int osd_declare_write_commit(const struct lu_env *env,
 
                dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size);
 
 
                dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size);
 
+               /* estimating space that will be consumed by a write is rather
+                * complicated with ZFS. As a consequence, we don't account for
+                * indirect blocks and quota overrun will be adjusted once the
+                * operation is committed, if required. */
+               space += osd_count_not_mapped(obj, offset, size);
+
                offset = lnb->lnb_file_offset;
                size = lnb->len;
        }
 
                offset = lnb->lnb_file_offset;
                size = lnb->len;
        }
 
-       if (size)
+       if (size) {
                dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size);
                dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size);
+               space += osd_count_not_mapped(obj, offset, size);
+       }
 
        dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
 
        oh->ot_write_commit = 1; /* used in osd_trans_start() for fail_loc */
 
 
        dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
 
        oh->ot_write_commit = 1; /* used in osd_trans_start() for fail_loc */
 
-       RETURN(0);
+       /* backend zfs filesystem might be configured to store multiple data
+        * copies */
+       space  *= osd->od_objset.os->os_copies;
+       space   = toqb(space);
+       CDEBUG(D_QUOTA, "writting %d pages, reserving "LPD64"K of quota "
+              "space\n", npages, space);
+
+retry:
+       /* acquire quota space if needed */
+       rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
+                              obj->oo_attr.la_gid, space, oh, true, &flags,
+                              ignore_quota);
+
+       if (!synced && rc == -EDQUOT && (flags & QUOTA_FL_SYNC) != 0) {
+               dt_sync(env, th->th_dev);
+               synced = true;
+               CDEBUG(D_QUOTA, "retry after sync\n");
+               flags = 0;
+               goto retry;
+       }
+
+       /* we need only to store the overquota flags in the first lnb for
+        * now, once we support multiple objects BRW, this code needs be
+        * revised. */
+       if (flags & QUOTA_FL_OVER_USRQUOTA)
+               lnb[0].flags |= OBD_BRW_OVER_USRQUOTA;
+       if (flags & QUOTA_FL_OVER_GRPQUOTA)
+               lnb[0].flags |= OBD_BRW_OVER_GRPQUOTA;
+
+       RETURN(rc);
 }
 
 static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
 }
 
 static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
@@ -697,6 +818,7 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt,
                        __u64 start, __u64 end, struct thandle *handle)
 {
        struct osd_object  *obj = osd_dt_obj(dt);
                        __u64 start, __u64 end, struct thandle *handle)
 {
        struct osd_object  *obj = osd_dt_obj(dt);
+       struct osd_device  *osd = osd_obj2dev(obj);
        struct osd_thandle *oh;
        __u64               len;
        ENTRY;
        struct osd_thandle *oh;
        __u64               len;
        ENTRY;
@@ -720,7 +842,9 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt,
        /* ... and we'll modify size attribute */
        dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
 
        /* ... and we'll modify size attribute */
        dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
 
-       RETURN(0);
+       RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
+                                obj->oo_attr.la_gid, 0, oh, true, NULL,
+                                false));
 }
 
 
 }
 
 
index bd12851..6b1373d 100644 (file)
@@ -458,6 +458,7 @@ static int osd_declare_object_destroy(const struct lu_env *env,
        struct osd_device       *osd = osd_obj2dev(obj);
        struct osd_thandle      *oh;
        uint64_t                 zapid;
        struct osd_device       *osd = osd_obj2dev(obj);
        struct osd_thandle      *oh;
        uint64_t                 zapid;
+       int                      rc;
        ENTRY;
 
        LASSERT(th != NULL);
        ENTRY;
 
        LASSERT(th != NULL);
@@ -480,7 +481,16 @@ static int osd_declare_object_destroy(const struct lu_env *env,
        dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
        dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, 0, buf);
 
        dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
        dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, 0, buf);
 
-       RETURN(0);
+       /* one less inode */
+       rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
+                              obj->oo_attr.la_gid, -1, oh, false, NULL, false);
+       if (rc)
+               RETURN(rc);
+
+       /* data to be truncated */
+       rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
+                              obj->oo_attr.la_gid, 0, oh, true, NULL, false);
+       RETURN(rc);
 }
 
 int __osd_object_free(udmu_objset_t *uos, uint64_t oid, dmu_tx_t *tx)
 }
 
 int __osd_object_free(udmu_objset_t *uos, uint64_t oid, dmu_tx_t *tx)
@@ -732,15 +742,78 @@ static int osd_attr_get(const struct lu_env *env,
        return 0;
 }
 
        return 0;
 }
 
+/* Simple wrapper on top of qsd API which implement quota transfer for osd
+ * setattr needs. As a reminder, only the root user can change ownership of
+ * a file, that's why EDQUOT & EINPROGRESS errors are discarded */
+static inline int qsd_transfer(const struct lu_env *env,
+                              struct qsd_instance *qsd,
+                              struct lquota_trans *trans, int qtype,
+                              __u64 orig_id, __u64 new_id, __u64 bspace,
+                              struct lquota_id_info *qi)
+{
+       int     rc;
+
+       if (unlikely(qsd == NULL))
+               return 0;
+
+       LASSERT(qtype >= 0 && qtype < MAXQUOTAS);
+       qi->lqi_type = qtype;
+
+       /* inode accounting */
+       qi->lqi_is_blk = false;
+
+       /* one more inode for the new owner ... */
+       qi->lqi_id.qid_uid = new_id;
+       qi->lqi_space      = 1;
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
+               rc = 0;
+       if (rc)
+               return rc;
+
+       /* and one less inode for the current id */
+       qi->lqi_id.qid_uid = orig_id;;
+       qi->lqi_space      = -1;
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
+               rc = 0;
+       if (rc)
+               return rc;
+
+       /* block accounting */
+       qi->lqi_is_blk = true;
+
+       /* more blocks for the new owner ... */
+       qi->lqi_id.qid_uid = new_id;
+       qi->lqi_space      = bspace;
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
+               rc = 0;
+       if (rc)
+               return rc;
+
+       /* and finally less blocks for the current owner */
+       qi->lqi_id.qid_uid = orig_id;
+       qi->lqi_space      = -bspace;
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
+               rc = 0;
+       return rc;
+}
+
 static int osd_declare_attr_set(const struct lu_env *env,
                                struct dt_object *dt,
                                const struct lu_attr *attr,
                                struct thandle *handle)
 {
 static int osd_declare_attr_set(const struct lu_env *env,
                                struct dt_object *dt,
                                const struct lu_attr *attr,
                                struct thandle *handle)
 {
+       struct osd_thread_info  *info = osd_oti_get(env);
        char                    *buf = osd_oti_get(env)->oti_str;
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
        struct osd_thandle      *oh;
        char                    *buf = osd_oti_get(env)->oti_str;
        struct osd_object       *obj = osd_dt_obj(dt);
        struct osd_device       *osd = osd_obj2dev(obj);
        struct osd_thandle      *oh;
+       uint64_t                 bspace;
+       uint32_t                 blksize;
+       int                      rc;
        ENTRY;
 
        if (!dt_object_exists(dt)) {
        ENTRY;
 
        if (!dt_object_exists(dt)) {
@@ -756,15 +829,38 @@ static int osd_declare_attr_set(const struct lu_env *env,
        LASSERT(obj->oo_sa_hdl != NULL);
        dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
 
        LASSERT(obj->oo_sa_hdl != NULL);
        dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
 
+       sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
+       bspace = toqb(bspace * blksize);
+
        if (attr && attr->la_valid & LA_UID) {
                /* account for user inode tracking ZAP update */
                dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
                dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf);
        if (attr && attr->la_valid & LA_UID) {
                /* account for user inode tracking ZAP update */
                dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
                dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf);
+
+               /* quota enforcement for user */
+               if (attr->la_uid != obj->oo_attr.la_uid) {
+                       rc = qsd_transfer(env, osd->od_quota_slave,
+                                         &oh->ot_quota_trans, USRQUOTA,
+                                         obj->oo_attr.la_uid, attr->la_uid,
+                                         bspace, &info->oti_qi);
+                       if (rc)
+                               RETURN(rc);
+               }
        }
        if (attr && attr->la_valid & LA_GID) {
                /* account for user inode tracking ZAP update */
                dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
                dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf);
        }
        if (attr && attr->la_valid & LA_GID) {
                /* account for user inode tracking ZAP update */
                dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
                dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf);
+
+               /* quota enforcement for group */
+               if (attr->la_gid != obj->oo_attr.la_gid) {
+                       rc = qsd_transfer(env, osd->od_quota_slave,
+                                         &oh->ot_quota_trans, GRPQUOTA,
+                                         obj->oo_attr.la_gid, attr->la_gid,
+                                         bspace, &info->oti_qi);
+                       if (rc)
+                               RETURN(rc);
+               }
        }
 
        RETURN(0);
        }
 
        RETURN(0);
@@ -982,7 +1078,8 @@ static int osd_declare_object_create(const struct lu_env *env,
 
        dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
 
 
        dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
 
-       RETURN(0);
+       RETURN(osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh,
+                                false, NULL, false));
 }
 
 int __osd_attr_init(const struct lu_env *env, udmu_objset_t *uos,
 }
 
 int __osd_attr_init(const struct lu_env *env, udmu_objset_t *uos,
index 1f9e2d2..3c2fa2b 100644 (file)
@@ -395,5 +395,67 @@ const struct dt_index_operations osd_acct_index_ops = {
 
 /**
  * Quota Enforcement Management
 
 /**
  * Quota Enforcement Management
- * TODO
  */
  */
+
+/*
+ * Wrapper for qsd_op_begin().
+ *
+ * \param env    - the environment passed by the caller
+ * \param osd    - is the osd_device
+ * \param uid    - user id of the inode
+ * \param gid    - group id of the inode
+ * \param space  - how many blocks/inodes will be consumed/released
+ * \param oh     - osd transaction handle
+ * \param is_blk - block quota or inode quota?
+ * \param flags  - if the operation is write, return no user quota, no
+ *                  group quota, or sync commit flags to the caller
+ * \param force  - set to 1 when changes are performed by root user and thus
+ *                  can't failed with EDQUOT
+ *
+ * \retval 0      - success
+ * \retval -ve    - failure
+ */
+int osd_declare_quota(const struct lu_env *env, struct osd_device *osd,
+                     qid_t uid, qid_t gid, long long space,
+                     struct osd_thandle *oh, bool is_blk, int *flags,
+                     bool force)
+{
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct lquota_id_info   *qi = &info->oti_qi;
+       struct qsd_instance     *qsd = osd->od_quota_slave;
+       int                      rcu, rcg; /* user & group rc */
+       ENTRY;
+
+       if (unlikely(qsd == NULL))
+               /* quota slave instance hasn't been allocated yet */
+               RETURN(0);
+
+       /* let's start with user quota */
+       qi->lqi_id.qid_uid = uid;
+       qi->lqi_type       = USRQUOTA;
+       qi->lqi_space      = space;
+       qi->lqi_is_blk     = is_blk;
+       rcu = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
+
+       if (force && (rcu == -EDQUOT || rcu == -EINPROGRESS))
+               /* ignore EDQUOT & EINPROGRESS when changes are done by root */
+               rcu = 0;
+
+       /* For non-fatal error, we want to continue to get the noquota flags
+        * for group id. This is only for commit write, which has @flags passed
+        * in. See osd_declare_write_commit().
+        * When force is set to true, we also want to proceed with the gid */
+       if (rcu && (rcu != -EDQUOT || flags == NULL))
+               RETURN(rcu);
+
+       /* and now group quota */
+       qi->lqi_id.qid_gid = gid;
+       qi->lqi_type       = GRPQUOTA;
+       rcg = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
+
+       if (force && (rcg == -EDQUOT || rcg == -EINPROGRESS))
+               /* as before, ignore EDQUOT & EINPROGRESS for root */
+               rcg = 0;
+
+       RETURN(rcu ? rcu : rcg);
+}