Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdd / mdd_object.c
index 2503006..0cf918a 100644 (file)
@@ -54,6 +54,7 @@
 /* fid_be_cpu(), fid_cpu_to_be(). */
 #include <lustre_fid.h>
 
+#include <lustre_param.h>
 #include <linux/ldiskfs_fs.h>
 #include <lustre_mds.h>
 #include <lustre/lustre_idl.h>
 
 static const struct lu_object_operations mdd_lu_obj_ops;
 
+static int mdd_xattr_get(const struct lu_env *env,
+                         struct md_object *obj, struct lu_buf *buf,
+                         const char *name);
+
+int mdd_data_get(const struct lu_env *env, struct mdd_object *obj,
+                 void **data)
+{
+        LASSERTF(mdd_object_exists(obj), "FID is "DFID"\n",
+                 PFID(mdd_object_fid(obj)));
+        mdo_data_get(env, obj, data);
+        return 0;
+}
+
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
                struct lu_attr *la, struct lustre_capa *capa)
 {
@@ -81,6 +95,15 @@ static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
                 obj->mod_flags |= IMMUTE_OBJ;
 }
 
+struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
+{
+        struct mdd_thread_info *info;
+
+        info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
+        LASSERT(info != NULL);
+        return info;
+}
+
 struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
 {
         struct lu_buf *buf;
@@ -91,6 +114,90 @@ struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
         return buf;
 }
 
+void mdd_buf_put(struct lu_buf *buf)
+{
+        if (buf == NULL || buf->lb_buf == NULL)
+                return;
+        if (buf->lb_vmalloc)
+                OBD_VFREE(buf->lb_buf, buf->lb_len);
+        else
+                OBD_FREE(buf->lb_buf, buf->lb_len);
+        buf->lb_buf = NULL;
+}
+
+const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
+                                       const void *area, ssize_t len)
+{
+        struct lu_buf *buf;
+
+        buf = &mdd_env_info(env)->mti_buf;
+        buf->lb_buf = (void *)area;
+        buf->lb_len = len;
+        return buf;
+}
+
+#define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
+struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
+{
+        struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
+
+        if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
+                if (buf->lb_vmalloc)
+                        OBD_VFREE(buf->lb_buf, buf->lb_len);
+                else
+                        OBD_FREE(buf->lb_buf, buf->lb_len);
+                buf->lb_buf = NULL;
+        }
+        if (buf->lb_buf == NULL) {
+                buf->lb_len = len;
+                if (buf->lb_len <= BUF_VMALLOC_SIZE) {
+                        OBD_ALLOC(buf->lb_buf, buf->lb_len);
+                        buf->lb_vmalloc = 0;
+                }
+                if (buf->lb_buf == NULL) {
+                        OBD_VMALLOC(buf->lb_buf, buf->lb_len);
+                        buf->lb_vmalloc = 1;
+                }
+                if (buf->lb_buf == NULL)
+                        buf->lb_len = 0;
+        }
+        return buf;
+}
+
+/** Increase the size of the \a mti_big_buf.
+ * preserves old data in buffer
+ * old buffer remains unchanged on error
+ * \retval 0 or -ENOMEM
+ */
+int mdd_buf_grow(const struct lu_env *env, ssize_t len)
+{
+        struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
+        struct lu_buf buf;
+
+        LASSERT(len >= oldbuf->lb_len);
+        if (len > BUF_VMALLOC_SIZE) {
+                OBD_VMALLOC(buf.lb_buf, len);
+                buf.lb_vmalloc = 1;
+        } else {
+                OBD_ALLOC(buf.lb_buf, len);
+                buf.lb_vmalloc = 0;
+        }
+        if (buf.lb_buf == NULL)
+                return -ENOMEM;
+
+        buf.lb_len = len;
+        memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
+
+        if (oldbuf->lb_vmalloc)
+                OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
+        else
+                OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
+
+        memcpy(oldbuf, &buf, sizeof(buf));
+
+        return 0;
+}
+
 struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
                                        struct mdd_device *mdd)
 {
@@ -106,9 +213,11 @@ struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
         }
         if (unlikely(mti->mti_max_cookie == NULL)) {
                 OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
-                if (unlikely(mti->mti_max_cookie != NULL))
+                if (likely(mti->mti_max_cookie != NULL))
                         mti->mti_max_cookie_size = max_cookie_size;
         }
+        if (likely(mti->mti_max_cookie != NULL))
+                memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
         return mti->mti_max_cookie;
 }
 
@@ -133,26 +242,6 @@ struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
         return mti->mti_max_lmm;
 }
 
-const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
-                                       const void *area, ssize_t len)
-{
-        struct lu_buf *buf;
-
-        buf = &mdd_env_info(env)->mti_buf;
-        buf->lb_buf = (void *)area;
-        buf->lb_len = len;
-        return buf;
-}
-
-struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
-{
-        struct mdd_thread_info *info;
-
-        info = lu_context_key_get(&env->le_ctx, &mdd_thread_key);
-        LASSERT(info != NULL);
-        return info;
-}
-
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *d)
@@ -178,18 +267,21 @@ struct lu_object *mdd_object_alloc(const struct lu_env *env,
 static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
                            const struct lu_object_conf *_)
 {
-       struct mdd_device *d = lu2mdd_dev(o->lo_dev);
-       struct lu_object  *below;
+        struct mdd_device *d = lu2mdd_dev(o->lo_dev);
+        struct mdd_object *mdd_obj = lu2mdd_obj(o);
+        struct lu_object  *below;
         struct lu_device  *under;
         ENTRY;
 
-       under = &d->mdd_child->dd_lu_dev;
-       below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
-        mdd_pdlock_init(lu2mdd_obj(o));
+        mdd_obj->mod_cltime = 0;
+        under = &d->mdd_child->dd_lu_dev;
+        below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
+        mdd_pdlock_init(mdd_obj);
         if (below == NULL)
-               RETURN(-ENOMEM);
+                RETURN(-ENOMEM);
 
         lu_object_add(o, below);
+
         RETURN(0);
 }
 
@@ -204,43 +296,22 @@ static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
 static void mdd_object_free(const struct lu_env *env, struct lu_object *o)
 {
         struct mdd_object *mdd = lu2mdd_obj(o);
-       
+
         lu_object_fini(o);
         OBD_FREE_PTR(mdd);
 }
 
-/* orphan handling is here */
-static void mdd_object_delete(const struct lu_env *env, struct lu_object *o)
+static int mdd_object_print(const struct lu_env *env, void *cookie,
+                            lu_printer_t p, const struct lu_object *o)
 {
-        struct mdd_object *mdd_obj = lu2mdd_obj(o);
-        struct thandle *handle = NULL;
-        ENTRY;
-
-        if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
-                return;
-
-        if (mdd_obj->mod_flags & ORPHAN_OBJ) {
-                mdd_txn_param_build(env, lu2mdd_dev(o->lo_dev),
-                                    MDD_TXN_INDEX_DELETE_OP);
-                handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev));
-                if (IS_ERR(handle))
-                        CERROR("Cannot get thandle\n");
-                else {
-                        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-                        /* let's remove obj from the orphan list */
-                        __mdd_orphan_del(env, mdd_obj, handle);
-                        mdd_write_unlock(env, mdd_obj);
-                        mdd_trans_stop(env, lu2mdd_dev(o->lo_dev),
-                                       0, handle);
-                }
-        }
+        return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
 }
 
 static const struct lu_object_operations mdd_lu_obj_ops = {
-       .loo_object_init    = mdd_object_init,
-       .loo_object_start   = mdd_object_start,
-       .loo_object_free    = mdd_object_free,
-        .loo_object_delete  = mdd_object_delete
+        .loo_object_init    = mdd_object_init,
+        .loo_object_start   = mdd_object_start,
+        .loo_object_free    = mdd_object_free,
+        .loo_object_print   = mdd_object_print,
 };
 
 struct mdd_object *mdd_object_find(const struct lu_env *env,
@@ -250,6 +321,235 @@ struct mdd_object *mdd_object_find(const struct lu_env *env,
         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
 }
 
+static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
+                        const char *path, struct lu_fid *fid)
+{
+        struct lu_buf *buf;
+        struct lu_fid *f = &mdd_env_info(env)->mti_fid;
+        struct mdd_object *obj;
+        struct lu_name *lname = &mdd_env_info(env)->mti_name;
+        char *name;
+        int rc = 0;
+        ENTRY;
+
+        /* temp buffer for path element */
+        buf = mdd_buf_alloc(env, PATH_MAX);
+        if (buf->lb_buf == NULL)
+                RETURN(-ENOMEM);
+
+        lname->ln_name = name = buf->lb_buf;
+        lname->ln_namelen = 0;
+        *f = mdd->mdd_root_fid;
+
+        while(1) {
+                while (*path == '/')
+                        path++;
+                if (*path == '\0')
+                        break;
+                while (*path != '/' && *path != '\0') {
+                        *name = *path;
+                        path++;
+                        name++;
+                        lname->ln_namelen++;
+                }
+
+                *name = '\0';
+                /* find obj corresponding to fid */
+                obj = mdd_object_find(env, mdd, f);
+                if (obj == NULL)
+                        GOTO(out, rc = -EREMOTE);
+                if (IS_ERR(obj))
+                        GOTO(out, rc = -PTR_ERR(obj));
+                /* get child fid from parent and name */
+                rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
+                mdd_object_put(env, obj);
+                if (rc)
+                        break;
+
+                name = buf->lb_buf;
+                lname->ln_namelen = 0;
+        }
+
+        if (!rc)
+                *fid = *f;
+out:
+        RETURN(rc);
+}
+
+/** The maximum depth that fid2path() will search.
+ * This is limited only because we want to store the fids for
+ * historical path lookup purposes.
+ */
+#define MAX_PATH_DEPTH 100
+
+/** mdd_path() lookup structure. */
+struct path_lookup_info {
+        __u64                pli_recno;        /**< history point */
+        struct lu_fid        pli_fid;
+        struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
+        struct mdd_object   *pli_mdd_obj;
+        char                *pli_path;         /**< full path */
+        int                  pli_pathlen;
+        int                  pli_linkno;       /**< which hardlink to follow */
+        int                  pli_fidcount;     /**< number of \a pli_fids */
+};
+
+static int mdd_path_current(const struct lu_env *env,
+                            struct path_lookup_info *pli)
+{
+        struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
+        struct mdd_object *mdd_obj;
+        struct lu_buf     *buf = NULL;
+        struct link_ea_header *leh;
+        struct link_ea_entry  *lee;
+        struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
+        struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
+        char *ptr;
+        int reclen;
+        int rc;
+        ENTRY;
+
+        ptr = pli->pli_path + pli->pli_pathlen - 1;
+        *ptr = 0;
+        --ptr;
+        pli->pli_fidcount = 0;
+        pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
+
+        while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
+                mdd_obj = mdd_object_find(env, mdd,
+                                          &pli->pli_fids[pli->pli_fidcount]);
+                if (mdd_obj == NULL)
+                        GOTO(out, rc = -EREMOTE);
+                if (IS_ERR(mdd_obj))
+                        GOTO(out, rc = -PTR_ERR(mdd_obj));
+                rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
+                if (rc <= 0) {
+                        mdd_object_put(env, mdd_obj);
+                        if (rc == -1)
+                                rc = -EREMOTE;
+                        else if (rc == 0)
+                                /* Do I need to error out here? */
+                                rc = -ENOENT;
+                        GOTO(out, rc);
+                }
+
+                /* Get parent fid and object name */
+                mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
+                buf = mdd_links_get(env, mdd_obj);
+                mdd_read_unlock(env, mdd_obj);
+                mdd_object_put(env, mdd_obj);
+                if (IS_ERR(buf))
+                        GOTO(out, rc = PTR_ERR(buf));
+
+                leh = buf->lb_buf;
+                lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
+                mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
+
+                /* If set, use link #linkno for path lookup, otherwise use
+                   link #0.  Only do this for the final path element. */
+                if ((pli->pli_fidcount == 0) &&
+                    (pli->pli_linkno < leh->leh_reccount)) {
+                        int count;
+                        for (count = 0; count < pli->pli_linkno; count++) {
+                                lee = (struct link_ea_entry *)
+                                     ((char *)lee + reclen);
+                                mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
+                        }
+                        if (pli->pli_linkno < leh->leh_reccount - 1)
+                                /* indicate to user there are more links */
+                                pli->pli_linkno++;
+                }
+
+                /* Pack the name in the end of the buffer */
+                ptr -= tmpname->ln_namelen;
+                if (ptr - 1 <= pli->pli_path)
+                        GOTO(out, rc = -EOVERFLOW);
+                strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
+                *(--ptr) = '/';
+
+                /* Store the parent fid for historic lookup */
+                if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
+                        GOTO(out, rc = -EOVERFLOW);
+                pli->pli_fids[pli->pli_fidcount] = *tmpfid;
+        }
+
+        /* Verify that our path hasn't changed since we started the lookup */
+        rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
+        if (rc) {
+                CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
+                GOTO (out, rc = -EAGAIN);
+        }
+        if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
+                CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
+                       " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
+                       PFID(&pli->pli_fid));
+                GOTO(out, rc = -EAGAIN);
+        }
+
+        memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
+
+        EXIT;
+out:
+        if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
+                /* if we vmalloced a large buffer drop it */
+                mdd_buf_put(buf);
+
+        return rc;
+}
+
+/* Returns the full path to this fid, as of changelog record recno. */
+static int mdd_path(const struct lu_env *env, struct md_object *obj,
+                    char *path, int pathlen, __u64 recno, int *linkno)
+{
+        struct path_lookup_info *pli;
+        int tries = 3;
+        int rc = -EAGAIN;
+        ENTRY;
+
+        if (pathlen < 3)
+                RETURN(-EOVERFLOW);
+
+        if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
+                path[0] = '/';
+                path[1] = '\0';
+                RETURN(0);
+        }
+
+        OBD_ALLOC_PTR(pli);
+        if (pli == NULL)
+                RETURN(-ENOMEM);
+
+        pli->pli_mdd_obj = md2mdd_obj(obj);
+        pli->pli_recno = recno;
+        pli->pli_path = path;
+        pli->pli_pathlen = pathlen;
+        pli->pli_linkno = *linkno;
+
+        /* Retry multiple times in case file is being moved */
+        while (tries-- && rc == -EAGAIN)
+                rc = mdd_path_current(env, pli);
+
+#if 0   /* We need old path names only for replication */
+        /* For historical path lookup, the current links may not have existed
+         * at "recno" time.  We must switch over to earlier links/parents
+         * by using the changelog records.  If the earlier parent doesn't
+         * exist, we must search back through the changelog to reconstruct
+         * its parents, then check if it exists, etc.
+         * We may ignore this problem for the initial implementation and
+         * state that an "original" hardlink must still exist for us to find
+         * historic path name. */
+        if (pli->pli_recno != -1)
+                rc = mdd_path_historic(env, pli);
+#endif
+
+        /* return next link index to caller */
+        *linkno = pli->pli_linkno;
+
+        OBD_FREE_PTR(pli);
+
+        RETURN (rc);
+}
+
 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
 {
         struct lu_attr *la = &mdd_env_info(env)->mti_la;
@@ -316,7 +616,7 @@ static int __mdd_lmm_get(const struct lu_env *env,
                 RETURN(0);
 
         rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
-                        MDS_LOV_MD_NAME);
+                        XATTR_NAME_LOV);
 
         if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
                 rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
@@ -353,7 +653,7 @@ static int __mdd_lmv_get(const struct lu_env *env,
                 RETURN(0);
 
         rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size,
-                        MDS_LMV_MD_NAME);
+                        XATTR_NAME_LMV);
         if (rc > 0) {
                 ma->ma_valid |= MA_LMV;
                 rc = 0;
@@ -484,10 +784,13 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj,
 
 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
                                struct mdd_object *c, struct md_attr *ma,
-                               struct thandle *handle)
+                               struct thandle *handle,
+                               const struct md_op_spec *spec)
 {
         struct lu_attr *attr = &ma->ma_attr;
         struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
+        struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
+        const struct dt_index_features *feat = spec->sp_feat;
         int rc;
         ENTRY;
 
@@ -495,11 +798,19 @@ int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
                 struct dt_object *next = mdd_object_child(c);
                 LASSERT(next);
 
+                if (feat != &dt_directory_features && feat != NULL)
+                        dof->dof_type = DFT_INDEX;
+                else
+                        dof->dof_type = dt_mode_to_dft(attr->la_mode);
+
+                dof->u.dof_idx.di_feat = feat;
+
                 /* @hint will be initialized by underlying device. */
                 next->do_ops->do_ah_init(env, hint,
                                          p ? mdd_object_child(p) : NULL,
                                          attr->la_mode & S_IFMT);
-                rc = mdo_create_obj(env, c, attr, hint, handle);
+
+                rc = mdo_create_obj(env, c, attr, hint, dof, handle);
                 LASSERT(ergo(rc == 0, mdd_object_exists(c)));
         } else
                 rc = -EEXIST;
@@ -603,9 +914,9 @@ int mdd_attr_check_set_internal_locked(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
-                           const struct lu_buf *buf, const char *name,
-                           int fl, struct thandle *handle)
+int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
+                    const struct lu_buf *buf, const char *name,
+                    int fl, struct thandle *handle)
 {
         struct lustre_capa *capa = mdd_object_capa(env, obj);
         int rc = -EINVAL;
@@ -667,7 +978,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                         la->la_valid &= ~LA_ATIME;
                 RETURN(0);
         }
+
         /* Check if flags change. */
         if (la->la_valid & LA_FLAGS) {
                 unsigned int oldflags = 0;
@@ -683,7 +994,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                 if (mdd_is_immutable(obj))
                         oldflags |= LUSTRE_IMMUTABLE_FL;
                 if (mdd_is_append(obj))
-                        oldflags |= LUSTRE_APPEND_FL; 
+                        oldflags |= LUSTRE_APPEND_FL;
                 if ((oldflags ^ newflags) &&
                     !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
                         RETURN(-EPERM);
@@ -827,6 +1138,60 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         RETURN(0);
 }
 
+/** Store a data change changelog record
+ * If this fails, we must fail the whole transaction; we don't
+ * want the change to commit without the log entry.
+ * \param mdd_obj - mdd_object of change
+ * \param handle - transacion handle
+ */
+static int mdd_changelog_data_store(const struct lu_env     *env,
+                                    struct mdd_device       *mdd,
+                                    enum changelog_rec_type type,
+                                    struct mdd_object       *mdd_obj,
+                                    struct thandle          *handle)
+{
+        const struct lu_fid *tfid = mdo2fid(mdd_obj);
+        struct llog_changelog_rec *rec;
+        struct lu_buf *buf;
+        int reclen;
+        int rc;
+
+        if (!(mdd->mdd_cl.mc_flags & CLM_ON))
+                RETURN(0);
+
+        LASSERT(handle != NULL);
+        LASSERT(mdd_obj != NULL);
+
+        if ((type == CL_SETATTR) &&
+            cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
+                /* Don't need multiple updates in this log */
+                /* Don't check under lock - no big deal if we get an extra
+                   entry */
+                RETURN(0);
+        }
+
+        reclen = llog_data_len(sizeof(*rec));
+        buf = mdd_buf_alloc(env, reclen);
+        if (buf->lb_buf == NULL)
+                RETURN(-ENOMEM);
+        rec = (struct llog_changelog_rec *)buf->lb_buf;
+
+        rec->cr_flags = CLF_VERSION;
+        rec->cr_type = (__u32)type;
+        rec->cr_tfid = *tfid;
+        rec->cr_namelen = 0;
+        mdd_obj->mod_cltime = cfs_time_current_64();
+
+        rc = mdd_changelog_llog_write(mdd, rec, handle);
+        if (rc < 0) {
+                CERROR("changelog failed: rc=%d op%d t"DFID"\n",
+                       rc, type, PFID(tfid));
+                return -EFAULT;
+        }
+
+        return 0;
+}
+
 /* set attr and LOV EA at once, return updated attr */
 static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                         const struct md_attr *ma)
@@ -838,9 +1203,18 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         struct llog_cookie *logcookies = NULL;
         int  rc, lmm_size = 0, cookie_size = 0;
         struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
+#ifdef HAVE_QUOTA_SUPPORT
+        struct obd_device *obd = mdd->mdd_obd_dev;
+        struct mds_obd *mds = &obd->u.mds;
+        unsigned int qnids[MAXQUOTAS] = { 0, 0 };
+        unsigned int qoids[MAXQUOTAS] = { 0, 0 };
+        int quota_opc = 0, block_count = 0;
+        int inode_pending = 0, block_pending = 0;
+#endif
         ENTRY;
 
-        mdd_txn_param_build(env, mdd, MDD_TXN_ATTR_SET_OP);
+        mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
+                                    MDD_TXN_ATTR_SET_OP);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
@@ -854,13 +1228,13 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                         GOTO(cleanup, rc = -ENOMEM);
 
                 rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
-                                MDS_LOV_MD_NAME);
+                                XATTR_NAME_LOV);
 
                 if (rc < 0)
                         GOTO(cleanup, rc);
         }
 
-        if (ma->ma_attr.la_valid & (ATTR_MTIME | ATTR_CTIME))
+        if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
                 CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
                        ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
 
@@ -869,6 +1243,35 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 GOTO(cleanup, rc);
 
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
+                struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
+
+                rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
+                if (!rc) {
+                        quota_opc = FSFILT_OP_SETATTR;
+                        mdd_quota_wrapper(la_copy, qnids);
+                        mdd_quota_wrapper(la_tmp, qoids);
+                        /* get file quota for new owner */
+                        lquota_chkquota(mds_quota_interface_ref, obd,
+                                        qnids[USRQUOTA], qnids[GRPQUOTA], 1,
+                                        &inode_pending, NULL, 0, NULL, 0);
+                        block_count = (la_tmp->la_blocks + 7) >> 3;
+                        if (block_count) {
+                                void *data = NULL;
+                                mdd_data_get(env, mdd_obj, &data);
+                                /* get block quota for new owner */
+                                lquota_chkquota(mds_quota_interface_ref, obd,
+                                                qnids[USRQUOTA],
+                                                qnids[GRPQUOTA],
+                                                block_count, &block_pending,
+                                                NULL, LQUOTA_FLAGS_BLK,
+                                                data, 1);
+                        }
+                }
+        }
+#endif
+
         if (la_copy->la_valid & LA_FLAGS) {
                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la_copy,
                                                   handle, 1);
@@ -905,12 +1308,32 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
 
         }
 cleanup:
+        if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
+                rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
+                                              handle);
         mdd_trans_stop(env, mdd, rc, handle);
         if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
                 /*set obd attr, if needed*/
                 rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
                                            logcookies);
         }
+#ifdef HAVE_QUOTA_SUPPORT
+        if (quota_opc) {
+                if (inode_pending)
+                        lquota_pending_commit(mds_quota_interface_ref, obd,
+                                              qnids[USRQUOTA], qnids[GRPQUOTA],
+                                              inode_pending, 0);
+                if (block_pending)
+                        lquota_pending_commit(mds_quota_interface_ref, obd,
+                                              qnids[USRQUOTA], qnids[GRPQUOTA],
+                                              block_pending, 1);
+                /* Trigger dqrel/dqacq for original owner and new owner.
+                 * If failed, the next call for lquota_chkquota will
+                 * process it. */
+                lquota_adjust(mds_quota_interface_ref, obd, qnids, qoids, rc,
+                              quota_opc);
+        }
+#endif
         RETURN(rc);
 }
 
@@ -974,6 +1397,12 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                 RETURN(PTR_ERR(handle));
 
         rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
+
+        /* Only record user xattr changes */
+        if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
+            (strncmp("user.", name, 5) == 0))
+                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
+                                              handle);
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
@@ -1005,6 +1434,13 @@ int mdd_xattr_del(const struct lu_env *env, struct md_object *obj,
         rc = mdo_xattr_del(env, mdd_obj, name, handle,
                            mdd_object_capa(env, mdd_obj));
         mdd_write_unlock(env, mdd_obj);
+
+        /* Only record user xattr changes */
+        if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
+            (strncmp("user.", name, 5) != 0))
+                rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
+                                              handle);
+
         mdd_trans_stop(env, mdd, rc, handle);
 
         RETURN(rc);
@@ -1018,6 +1454,12 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct mdd_device *mdd = mdo2mdd(obj);
         struct thandle *handle;
+#ifdef HAVE_QUOTA_SUPPORT
+        struct obd_device *obd = mdd->mdd_obd_dev;
+        struct mds_obd *mds = &obd->u.mds;
+        unsigned int qids[MAXQUOTAS] = { 0, 0 };
+        int quota_opc = 0;
+#endif
         int rc;
         ENTRY;
 
@@ -1060,11 +1502,26 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
                 GOTO(cleanup, rc);
 
         rc = mdd_finish_unlink(env, mdd_obj, ma, handle);
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota && ma->ma_valid & MA_INODE &&
+            ma->ma_attr.la_nlink == 0 && mdd_obj->mod_count == 0) {
+                quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+                mdd_quota_wrapper(&ma->ma_attr, qids);
+        }
+#endif
+
 
         EXIT;
 cleanup:
         mdd_write_unlock(env, mdd_obj);
         mdd_trans_stop(env, mdd, rc, handle);
+#ifdef HAVE_QUOTA_SUPPORT
+        if (quota_opc)
+                /* Trigger dqrel on the owner of child. If failed,
+                 * the next call for lquota_chkquota will process it */
+                lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
+                              quota_opc);
+#endif
         return rc;
 }
 
@@ -1103,20 +1560,53 @@ static int mdd_object_create(const struct lu_env *env,
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         const struct lu_fid *pfid = spec->u.sp_pfid;
         struct thandle *handle;
-        int rc;
+#ifdef HAVE_QUOTA_SUPPORT
+        struct obd_device *obd = mdd->mdd_obd_dev;
+        struct mds_obd *mds = &obd->u.mds;
+        unsigned int qids[MAXQUOTAS] = { 0, 0 };
+        int quota_opc = 0, block_count = 0;
+        int inode_pending = 0, block_pending = 0;
+#endif
+        int rc = 0;
         ENTRY;
 
+#ifdef HAVE_QUOTA_SUPPORT
+        if (mds->mds_quota) {
+                quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
+                mdd_quota_wrapper(&ma->ma_attr, qids);
+                /* get file quota for child */
+                lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA],
+                                qids[GRPQUOTA], 1, &inode_pending, NULL, 0,
+                                NULL, 0);
+                switch (ma->ma_attr.la_mode & S_IFMT) {
+                case S_IFLNK:
+                case S_IFDIR:
+                        block_count = 2;
+                        break;
+                case S_IFREG:
+                        block_count = 1;
+                        break;
+                }
+                /* get block quota for child */
+                if (block_count)
+                        lquota_chkquota(mds_quota_interface_ref, obd,
+                                        qids[USRQUOTA], qids[GRPQUOTA],
+                                        block_count, &block_pending, NULL,
+                                        LQUOTA_FLAGS_BLK, NULL, 0);
+        }
+#endif
+
         mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+                GOTO(out_pending, rc = PTR_ERR(handle));
 
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdd_oc_sanity_check(env, mdd_obj, ma);
         if (rc)
                 GOTO(unlock, rc);
 
-        rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle);
+        rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
         if (rc)
                 GOTO(unlock, rc);
 
@@ -1130,7 +1620,7 @@ static int mdd_object_create(const struct lu_env *env,
 
                 rc = __mdd_xattr_set(env, mdd_obj,
                                      mdd_buf_get_const(env, lmv, lmv_size),
-                                     MDS_LMV_MD_NAME, 0, handle);
+                                     XATTR_NAME_LMV, 0, handle);
                 if (rc)
                         GOTO(unlock, rc);
 
@@ -1156,7 +1646,8 @@ static int mdd_object_create(const struct lu_env *env,
                         pfid = spec->u.sp_ea.fid;
                 }
 #endif
-                rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle);
+                rc = mdd_object_initialize(env, pfid, NULL, mdd_obj, ma, handle,
+                                           spec);
         }
         EXIT;
 unlock:
@@ -1165,6 +1656,23 @@ unlock:
         mdd_write_unlock(env, mdd_obj);
 
         mdd_trans_stop(env, mdd, rc, handle);
+out_pending:
+#ifdef HAVE_QUOTA_SUPPORT
+        if (quota_opc) {
+                if (inode_pending)
+                        lquota_pending_commit(mds_quota_interface_ref, obd,
+                                              qids[USRQUOTA], qids[GRPQUOTA],
+                                              inode_pending, 0);
+                if (block_pending)
+                        lquota_pending_commit(mds_quota_interface_ref, obd,
+                                              qids[USRQUOTA], qids[GRPQUOTA],
+                                              block_pending, 1);
+                /* Trigger dqacq on the owner of child. If failed,
+                 * the next call for lquota_chkquota will process it. */
+                lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
+                              FSFILT_OP_CREATE_PARTIAL_CHILD);
+        }
+#endif
         return rc;
 }
 
@@ -1317,6 +1825,7 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
         if (S_ISREG(mdd_object_type(obj))) {
                 /* Return LOV & COOKIES unconditionally here. We clean evth up.
                  * Caller must be ready for that. */
+
                 rc = __mdd_lmm_get(env, obj, ma);
                 if ((ma->ma_valid & MA_LOV))
                         rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
@@ -1331,9 +1840,17 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
 static int mdd_close(const struct lu_env *env, struct md_object *obj,
                      struct md_attr *ma)
 {
-        int rc;
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct thandle    *handle;
+        int rc;
+        int reset = 1;
+
+#ifdef HAVE_QUOTA_SUPPORT
+        struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
+        struct mds_obd *mds = &obd->u.mds;
+        unsigned int qids[MAXQUOTAS] = { 0, 0 };
+        int quota_opc = 0;
+#endif
         ENTRY;
 
         rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
@@ -1347,14 +1864,39 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         /* release open count */
         mdd_obj->mod_count --;
 
+        if (mdd_obj->mod_count == 0) {
+                /* remove link to object from orphan index */
+                if (mdd_obj->mod_flags & ORPHAN_OBJ)
+                        __mdd_orphan_del(env, mdd_obj, handle);
+        }
+
         rc = mdd_iattr_get(env, mdd_obj, ma);
-        if (rc == 0 && mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0)
-                rc = mdd_object_kill(env, mdd_obj, ma);
-        else
+        if (rc == 0) {
+                if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
+                        rc = mdd_object_kill(env, mdd_obj, ma);
+#ifdef HAVE_QUOTA_SUPPORT
+                        if (mds->mds_quota) {
+                                quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+                                mdd_quota_wrapper(&ma->ma_attr, qids);
+                        }
+#endif
+                        if (rc == 0)
+                                reset = 0;
+                }
+        }
+
+        if (reset)
                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
-        
+
         mdd_write_unlock(env, mdd_obj);
         mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
+#ifdef HAVE_QUOTA_SUPPORT
+        if (quota_opc)
+                /* Trigger dqrel on the owner of child. If failed,
+                 * the next call for lquota_chkquota will process it */
+                lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
+                              quota_opc);
+#endif
         RETURN(rc);
 }
 
@@ -1471,7 +2013,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj,
          * iterate through directory and fill pages from @rdpg
          */
         iops = &next->do_index_ops->dio_it;
-        it = iops->init(env, next, 0, mdd_object_capa(env, obj));
+        it = iops->init(env, next, mdd_object_capa(env, obj));
         if (IS_ERR(it))
                 return PTR_ERR(it);
 
@@ -1613,4 +2155,5 @@ const struct md_object_operations mdd_obj_ops = {
         .moo_readlink      = mdd_readlink,
         .moo_capa_get      = mdd_capa_get,
         .moo_object_sync   = mdd_object_sync,
+        .moo_path          = mdd_path,
 };