Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index d7a9969..0cf918a 100644 (file)
@@ -54,6 +54,7 @@
 /* fid_be_cpu(), fid_cpu_to_be(). */
 #include <lustre_fid.h>
 
+#include <lustre_param.h>
 #include <linux/ldiskfs_fs.h>
 #include <lustre_mds.h>
 #include <lustre/lustre_idl.h>
 
 static const struct lu_object_operations mdd_lu_obj_ops;
 
+static int mdd_xattr_get(const struct lu_env *env,
+                         struct md_object *obj, struct lu_buf *buf,
+                         const char *name);
+
+int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
+                 void **data)
+{
+        LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
+                 PFID(mdd_object_fid(obj)));
+        mdo_data_get(env, obj, data);
+        return 0;
+}
+
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
                struct lu_attr *la, struct lustre_capa *capa)
 {
@@ -81,6 +95,15 @@ static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
                 obj->mod_flags |= IMMUTE_OBJ;
 }
 
+struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
+{
+        struct mdd_thread_info *info;
+
+        info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
+        LASSERT(info != NULL);
+        return info;
+}
+
 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
 {
         struct lu_buf *buf;
@@ -91,6 +114,90 @@ struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
         return buf;
 }
 
+void mdd_buf_put(struct lu_buf *buf)
+{
+        if (buf == NULL || buf->lb_buf == NULL)
+                return;
+        if (buf->lb_vmalloc)
+                OBD_VFREE(buf->lb_buf, buf->lb_len);
+        else
+                OBD_FREE(buf->lb_buf, buf->lb_len);
+        buf->lb_buf = NULL;
+}
+
+const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
+                                       const void *area, ssize_t len)
+{
+        struct lu_buf *buf;
+
+        buf = &mdd_env_info(env)->mti_buf;
+        buf->lb_buf = (void *)area;
+        buf->lb_len = len;
+        return buf;
+}
+
+#define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
+struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
+{
+        struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
+
+        if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
+                if (buf->lb_vmalloc)
+                        OBD_VFREE(buf->lb_buf, buf->lb_len);
+                else
+                        OBD_FREE(buf->lb_buf, buf->lb_len);
+                buf->lb_buf = NULL;
+        }
+        if (buf->lb_buf == NULL) {
+                buf->lb_len = len;
+                if (buf->lb_len <= BUF_VMALLOC_SIZE) {
+                        OBD_ALLOC(buf->lb_buf, buf->lb_len);
+                        buf->lb_vmalloc = 0;
+                }
+                if (buf->lb_buf == NULL) {
+                        OBD_VMALLOC(buf->lb_buf, buf->lb_len);
+                        buf->lb_vmalloc = 1;
+                }
+                if (buf->lb_buf == NULL)
+                        buf->lb_len = 0;
+        }
+        return buf;
+}
+
+/** Increase the size of the \a mti_big_buf.
+ * preserves old data in buffer
+ * old buffer remains unchanged on error
+ * \retval 0 or -ENOMEM
+ */
+int mdd_buf_grow(const struct lu_env *env, ssize_t len)
+{
+        struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
+        struct lu_buf buf;
+
+        LASSERT(len >= oldbuf->lb_len);
+        if (len > BUF_VMALLOC_SIZE) {
+                OBD_VMALLOC(buf.lb_buf, len);
+                buf.lb_vmalloc = 1;
+        } else {
+                OBD_ALLOC(buf.lb_buf, len);
+                buf.lb_vmalloc = 0;
+        }
+        if (buf.lb_buf == NULL)
+                return -ENOMEM;
+
+        buf.lb_len = len;
+        memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
+
+        if (oldbuf->lb_vmalloc)
+                OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
+        else
+                OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
+
+        memcpy(oldbuf, &buf, sizeof(buf));
+
+        return 0;
+}
+
 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
                                        struct mdd_device *mdd)
 {
@@ -135,26 +242,6 @@ struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
         return mti->mti_max_lmm;
 }
 
-const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
-                                       const void *area, ssize_t len)
-{
-        struct lu_buf *buf;
-
-        buf = &mdd_env_info(env)->mti_buf;
-        buf->lb_buf = (void *)area;
-        buf->lb_len = len;
-        return buf;
-}
-
-struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
-{
-        struct mdd_thread_info *info;
-
-        info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
-        LASSERT(info != NULL);
-        return info;
-}
-
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *d)
@@ -181,17 +268,20 @@ static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
                            const struct lu_object_conf *_)
 {
         struct mdd_device *d = lu2mdd_dev(o->lo_dev);
+        struct mdd_object *mdd_obj = lu2mdd_obj(o);
         struct lu_object  *below;
         struct lu_device  *under;
         ENTRY;
 
+        mdd_obj->mod_cltime = 0;
         under = &d->mdd_child->dd_lu_dev;
         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
-        mdd_pdlock_init(lu2mdd_obj(o));
+        mdd_pdlock_init(mdd_obj);
         if (below == NULL)
                 RETURN(-ENOMEM);
 
         lu_object_add(o, below);
+
         RETURN(0);
 }
 
@@ -218,10 +308,10 @@ static int mdd_object_print(const struct lu_env *env, void *cookie,
 }
 
 static const struct lu_object_operations mdd_lu_obj_ops = {
-       .loo_object_init    = mdd_object_init,
-       .loo_object_start   = mdd_object_start,
-       .loo_object_free    = mdd_object_free,
-       .loo_object_print   = mdd_object_print,
+        .loo_object_init    = mdd_object_init,
+        .loo_object_start   = mdd_object_start,
+        .loo_object_free    = mdd_object_free,
+        .loo_object_print   = mdd_object_print,
 };
 
 struct mdd_object *mdd_object_find(const struct lu_env *env,
@@ -231,6 +321,235 @@ struct mdd_object *mdd_object_find(const struct lu_env *env,
         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
 }
 
+static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
+                        const char *path, struct lu_fid *fid)
+{
+        struct lu_buf *buf;
+        struct lu_fid *f = &mdd_env_info(env)->mti_fid;
+        struct mdd_object *obj;
+        struct lu_name *lname = &mdd_env_info(env)->mti_name;
+        char *name;
+        int rc = 0;
+        ENTRY;
+
+        /* temp buffer for path element */
+        buf = mdd_buf_alloc(env, PATH_MAX);
+        if (buf->lb_buf == NULL)
+                RETURN(-ENOMEM);
+
+        lname->ln_name = name = buf->lb_buf;
+        lname->ln_namelen = 0;
+        *f = mdd->mdd_root_fid;
+
+        while(1) {
+                while (*path == '/')
+                        path++;
+                if (*path == '\0')
+                        break;
+                while (*path != '/' && *path != '\0') {
+                        *name = *path;
+                        path++;
+                        name++;
+                        lname->ln_namelen++;
+                }
+
+                *name = '\0';
+                /* find obj corresponding to fid */
+                obj = mdd_object_find(env, mdd, f);
+                if (obj == NULL)
+                        GOTO(out, rc = -EREMOTE);
+                if (IS_ERR(obj))
+                        GOTO(out, rc = -PTR_ERR(obj));
+                /* get child fid from parent and name */
+                rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
+                mdd_object_put(env, obj);
+                if (rc)
+                        break;
+
+                name = buf->lb_buf;
+                lname->ln_namelen = 0;
+        }
+
+        if (!rc)
+                *fid = *f;
+out:
+        RETURN(rc);
+}
+
+/** The maximum depth that fid2path() will search.
+ * This is limited only because we want to store the fids for
+ * historical path lookup purposes.
+ */
+#define MAX_PATH_DEPTH 100
+
+/** mdd_path() lookup structure. */
+struct path_lookup_info {
+        __u64                pli_recno;        /**< history point */
+        struct lu_fid        pli_fid;
+        struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
+        struct mdd_object   *pli_mdd_obj;
+        char                *pli_path;         /**< full path */
+        int                  pli_pathlen;
+        int                  pli_linkno;       /**< which hardlink to follow */
+        int                  pli_fidcount;     /**< number of \a pli_fids */
+};
+
+static int mdd_path_current(const struct lu_env *env,
+                            struct path_lookup_info *pli)
+{
+        struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
+        struct mdd_object *mdd_obj;
+        struct lu_buf     *buf = NULL;
+        struct link_ea_header *leh;
+        struct link_ea_entry  *lee;
+        struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
+        struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
+        char *ptr;
+        int reclen;
+        int rc;
+        ENTRY;
+
+        ptr = pli->pli_path + pli->pli_pathlen - 1;
+        *ptr = 0;
+        --ptr;
+        pli->pli_fidcount = 0;
+        pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
+
+        while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
+                mdd_obj = mdd_object_find(env, mdd,
+                                          &pli->pli_fids[pli->pli_fidcount]);
+                if (mdd_obj == NULL)
+                        GOTO(out, rc = -EREMOTE);
+                if (IS_ERR(mdd_obj))
+                        GOTO(out, rc = -PTR_ERR(mdd_obj));
+                rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
+                if (rc <= 0) {
+                        mdd_object_put(env, mdd_obj);
+                        if (rc == -1)
+                                rc = -EREMOTE;
+                        else if (rc == 0)
+                                /* Do I need to error out here? */
+                                rc = -ENOENT;
+                        GOTO(out, rc);
+                }
+
+                /* Get parent fid and object name */
+                mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
+                buf = mdd_links_get(env, mdd_obj);
+                mdd_read_unlock(env, mdd_obj);
+                mdd_object_put(env, mdd_obj);
+                if (IS_ERR(buf))
+                        GOTO(out, rc = PTR_ERR(buf));
+
+                leh = buf->lb_buf;
+                lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
+                mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
+
+                /* If set, use link #linkno for path lookup, otherwise use
+                   link #0.  Only do this for the final path element. */
+                if ((pli->pli_fidcount == 0) &&
+                    (pli->pli_linkno < leh->leh_reccount)) {
+                        int count;
+                        for (count = 0; count < pli->pli_linkno; count++) {
+                                lee = (struct link_ea_entry *)
+                                     ((char *)lee + reclen);
+                                mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
+                        }
+                        if (pli->pli_linkno < leh->leh_reccount - 1)
+                                /* indicate to user there are more links */
+                                pli->pli_linkno++;
+                }
+
+                /* Pack the name in the end of the buffer */
+                ptr -= tmpname->ln_namelen;
+                if (ptr - 1 <= pli->pli_path)
+                        GOTO(out, rc = -EOVERFLOW);
+                strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
+                *(--ptr) = '/';
+
+                /* Store the parent fid for historic lookup */
+                if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
+                        GOTO(out, rc = -EOVERFLOW);
+                pli->pli_fids[pli->pli_fidcount] = *tmpfid;
+        }
+
+        /* Verify that our path hasn't changed since we started the lookup */
+        rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
+        if (rc) {
+                CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
+                GOTO (out, rc = -EAGAIN);
+        }
+        if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
+                CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
+                       " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
+                       PFID(&pli->pli_fid));
+                GOTO(out, rc = -EAGAIN);
+        }
+
+        memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
+
+        EXIT;
+out:
+        if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
+                /* if we vmalloced a large buffer drop it */
+                mdd_buf_put(buf);
+
+        return rc;
+}
+
+/* Returns the full path to this fid, as of changelog record recno. */
+static int mdd_path(const struct lu_env *env, struct md_object *obj,
+                    char *path, int pathlen, __u64 recno, int *linkno)
+{
+        struct path_lookup_info *pli;
+        int tries = 3;
+        int rc = -EAGAIN;
+        ENTRY;
+
+        if (pathlen < 3)
+                RETURN(-EOVERFLOW);
+
+        if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
+                path[0] = '/';
+                path[1] = '\0';
+                RETURN(0);
+        }
+
+        OBD_ALLOC_PTR(pli);
+        if (pli == NULL)
+                RETURN(-ENOMEM);
+
+        pli->pli_mdd_obj = md2mdd_obj(obj);
+        pli->pli_recno = recno;
+        pli->pli_path = path;
+        pli->pli_pathlen = pathlen;
+        pli->pli_linkno = *linkno;
+
+        /* Retry multiple times in case file is being moved */
+        while (tries-- && rc == -EAGAIN)
+                rc = mdd_path_current(env, pli);
+
+#if 0   /* We need old path names only for replication */
+        /* For historical path lookup, the current links may not have existed
+         * at "recno" time.  We must switch over to earlier links/parents
+         * by using the changelog records.  If the earlier parent doesn't
+         * exist, we must search back through the changelog to reconstruct
+         * its parents, then check if it exists, etc.
+         * We may ignore this problem for the initial implementation and
+         * state that an "original" hardlink must still exist for us to find
+         * historic path name. */
+        if (pli->pli_recno != -1)
+                rc = mdd_path_historic(env, pli);
+#endif
+
+        /* return next link index to caller */
+        *linkno = pli->pli_linkno;
+
+        OBD_FREE_PTR(pli);
+
+        RETURN (rc);
+}
+
 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
 {
         struct lu_attr *la = &mdd_env_info(env)->mti_la;
@@ -297,7 +616,7 @@ static int __mdd_lmm_get(const struct lu_env *env,
                 RETURN(0);
 
         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
-                        MDS_LOV_MD_NAME);
+                        XATTR_NAME_LOV);
 
         if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
@@ -334,7 +653,7 @@ static int __mdd_lmv_get(const struct lu_env *env,
                 RETURN(0);
 
         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
-                        MDS_LMV_MD_NAME);
+                        XATTR_NAME_LMV);
         if (rc > 0) {
                 ma->ma_valid |= MA_LMV;
                 rc = 0;
@@ -595,9 +914,9 @@ int mdd_attr_check_set_internal_locked(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
-                           const struct lu_buf *buf, const char *name,
-                           int fl, struct thandle *handle)
+int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
+                    const struct lu_buf *buf, const char *name,
+                    int fl, struct thandle *handle)
 {
         struct lustre_capa *capa = mdd_object_capa(env, obj);
         int rc = -EINVAL;
@@ -819,6 +1138,60 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         RETURN(0);
 }
 
+/** Store a data change changelog record
+ * If this fails, we must fail the whole transaction; we don't
+ * want the change to commit without the log entry.
+ * \param mdd_obj - mdd_object of change
+ * \param handle - transacion handle
+ */
+static int mdd_changelog_data_store(const struct lu_env     *env,
+                                    struct mdd_device       *mdd,
+                                    enum changelog_rec_type type,
+                                    struct mdd_object       *mdd_obj,
+                                    struct thandle          *handle)
+{
+        const struct lu_fid *tfid = mdo2fid(mdd_obj);
+        struct llog_changelog_rec *rec;
+        struct lu_buf *buf;
+        int reclen;
+        int rc;
+
+        if (!(mdd->mdd_cl.mc_flags & CLM_ON))
+                RETURN(0);
+
+        LASSERT(handle != NULL);
+        LASSERT(mdd_obj != NULL);
+
+        if ((type == CL_SETATTR) &&
+            cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
+                /* Don't need multiple updates in this log */
+                /* Don't check under lock - no big deal if we get an extra
+                   entry */
+                RETURN(0);
+        }
+
+        reclen = llog_data_len(sizeof(*rec));
+        buf = mdd_buf_alloc(env, reclen);
+        if (buf->lb_buf == NULL)
+                RETURN(-ENOMEM);
+        rec = (struct llog_changelog_rec *)buf->lb_buf;
+
+        rec->cr_flags = CLF_VERSION;
+        rec->cr_type = (__u32)type;
+        rec->cr_tfid = *tfid;
+        rec->cr_namelen = 0;
+        mdd_obj->mod_cltime = cfs_time_current_64();
+
+        rc = mdd_changelog_llog_write(mdd, rec, handle);
+        if (rc < 0) {
+                CERROR("changelog failed: rc=%d op%d t"DFID"\n",
+                       rc, type, PFID(tfid));
+                return -EFAULT;
+        }
+
+        return 0;
+}
+
 /* set attr and LOV EA at once, return updated attr */
 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                         const struct md_attr *ma)
@@ -855,13 +1228,13 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                         GOTO(cleanup, rc = -ENOMEM);
 
                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
-                                MDS_LOV_MD_NAME);
+                                XATTR_NAME_LOV);
 
                 if (rc < 0)
                         GOTO(cleanup, rc);
         }
 
-        if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
+        if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
 
@@ -882,15 +1255,19 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                         /* get file quota for new owner */
                         lquota_chkquota(mds_quota_interface_ref, obd,
                                         qnids[USRQUOTA], qnids[GRPQUOTA], 1,
-                                        &inode_pending, NULL, 0);
+                                        &inode_pending, NULL, 0, NULL, 0);
                         block_count = (la_tmp->la_blocks + 7) >> 3;
-                        if (block_count)
+                        if (block_count) {
+                                void *data = NULL;
+                                mdd_data_get(env, mdd_obj, &data);
                                 /* get block quota for new owner */
                                 lquota_chkquota(mds_quota_interface_ref, obd,
                                                 qnids[USRQUOTA],
                                                 qnids[GRPQUOTA],
                                                 block_count, &block_pending,
-                                                NULL, LQUOTA_FLAGS_BLK);
+                                                NULL, LQUOTA_FLAGS_BLK,
+                                                data, 1);
+                        }
                 }
         }
 #endif
@@ -931,6 +1308,9 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
 
         }
 cleanup:
+        if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
+                rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
+                                              handle);
         mdd_trans_stop(env, mdd, rc, handle);
         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
                 /*set obd attr, if needed*/
@@ -942,11 +1322,11 @@ cleanup:
                 if (inode_pending)
                         lquota_pending_commit(mds_quota_interface_ref, obd,
                                               qnids[USRQUOTA], qnids[GRPQUOTA],
-                                              1, 0);
+                                              inode_pending, 0);
                 if (block_pending)
                         lquota_pending_commit(mds_quota_interface_ref, obd,
                                               qnids[USRQUOTA], qnids[GRPQUOTA],
-                                              block_count, 1);
+                                              block_pending, 1);
                 /* Trigger dqrel/dqacq for original owner and new owner.
                  * If failed, the next call for lquota_chkquota will
                  * process it. */
@@ -1017,6 +1397,12 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                 RETURN(PTR_ERR(handle));
 
         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
+
+        /* Only record user xattr changes */
+        if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
+            (strncmp("user.", name, 5) == 0))
+                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
+                                              handle);
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
@@ -1048,6 +1434,13 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
         rc = mdo_xattr_del(env, mdd_obj, name, handle,
                            mdd_object_capa(env, mdd_obj));
         mdd_write_unlock(env, mdd_obj);
+
+        /* Only record user xattr changes */
+        if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
+            (strncmp("user.", name, 5) != 0))
+                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
+                                              handle);
+
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
@@ -1183,7 +1576,8 @@ static int mdd_object_create(const struct lu_env *env,
                 mdd_quota_wrapper(&ma->ma_attr, qids);
                 /* get file quota for child */
                 lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA],
-                                qids[GRPQUOTA], 1, &inode_pending, NULL, 0);
+                                qids[GRPQUOTA], 1, &inode_pending, NULL, 0,
+                                NULL, 0);
                 switch (ma->ma_attr.la_mode & S_IFMT) {
                 case S_IFLNK:
                 case S_IFDIR:
@@ -1198,7 +1592,7 @@ static int mdd_object_create(const struct lu_env *env,
                         lquota_chkquota(mds_quota_interface_ref, obd,
                                         qids[USRQUOTA], qids[GRPQUOTA],
                                         block_count, &block_pending, NULL,
-                                        LQUOTA_FLAGS_BLK);
+                                        LQUOTA_FLAGS_BLK, NULL, 0);
         }
 #endif
 
@@ -1226,7 +1620,7 @@ static int mdd_object_create(const struct lu_env *env,
 
                 rc = __mdd_xattr_set(env, mdd_obj,
                                      mdd_buf_get_const(env, lmv, lmv_size),
-                                     MDS_LMV_MD_NAME, 0, handle);
+                                     XATTR_NAME_LMV, 0, handle);
                 if (rc)
                         GOTO(unlock, rc);
 
@@ -1252,7 +1646,8 @@ static int mdd_object_create(const struct lu_env *env,
                         pfid = spec->u.sp_ea.fid;
                 }
 #endif
-                rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle, spec);
+                rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
+                                           spec);
         }
         EXIT;
 unlock:
@@ -1267,11 +1662,11 @@ out_pending:
                 if (inode_pending)
                         lquota_pending_commit(mds_quota_interface_ref, obd,
                                               qids[USRQUOTA], qids[GRPQUOTA],
-                                              1, 0);
+                                              inode_pending, 0);
                 if (block_pending)
                         lquota_pending_commit(mds_quota_interface_ref, obd,
                                               qids[USRQUOTA], qids[GRPQUOTA],
-                                              block_count, 1);
+                                              block_pending, 1);
                 /* Trigger dqacq on the owner of child. If failed,
                  * the next call for lquota_chkquota will process it. */
                 lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
@@ -1760,4 +2155,5 @@ const struct md_object_operations mdd_obj_ops = {
         .moo_readlink      = mdd_readlink,
         .moo_capa_get      = mdd_capa_get,
         .moo_object_sync   = mdd_object_sync,
+        .moo_path          = mdd_path,
 };