Whamcloud - gitweb
b=19486 add server identifier into lu_seq_range.
[fs/lustre-release.git] / lustre / cmm / cmm_object.c
index e6212dc..86741be 100644 (file)
@@ -1,29 +1,43 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  lustre/cmm/cmm_object.c
- *  Lustre Cluster Metadata Manager (cmm)
+ * GPL HEADER START
  *
- *  Copyright (c) 2006 Cluster File Systems, Inc.
- *   Author: Mike Pershin <tappro@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/cmm/cmm_object.c
+ *
+ * Lustre Cluster Metadata Manager (cmm)
+ *
+ * Author: Mike Pershin <tappro@clusterfs.com>
  */
 
 #ifndef EXPORT_SYMTAB
 #include <lustre_fid.h>
 #include "cmm_internal.h"
 #include "mdc_internal.h"
-
+/**
+ * \ingroup cmm
+ * Lookup MDS number \a mds by FID \a fid.
+ *
+ * \param fid FID of object to find MDS
+ * \param mds mds number to return.
+ */
 int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
                    mdsno_t *mds, const struct lu_env *env)
 {
@@ -44,7 +64,8 @@ int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
 
         LASSERT(fid_is_sane(fid));
 
-        rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
+        rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds,
+                               LU_SEQ_RANGE_MDT, env);
         if (rc) {
                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
                        fid_seq(fid), rc);
@@ -52,25 +73,33 @@ int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
         }
 
         if (*mds > cm->cmm_tgt_count) {
-                CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
+                CERROR("Got invalid mdsno: %x (max: %x)\n",
                        *mds, cm->cmm_tgt_count);
                 rc = -EINVAL;
         } else {
-                CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "
-                       LPU64"\n", *mds, fid_seq(fid));
+                CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
+                       LPX64"\n", *mds, fid_seq(fid));
         }
 
         RETURN (rc);
 }
 
-static struct md_object_operations cml_mo_ops;
-static struct md_dir_operations    cml_dir_ops;
-static struct lu_object_operations cml_obj_ops;
+/**
+ * \addtogroup cml
+ * @{
+ */
+static const struct md_object_operations cml_mo_ops;
+static const struct md_dir_operations    cml_dir_ops;
+static const struct lu_object_operations cml_obj_ops;
 
-static struct md_object_operations cmr_mo_ops;
-static struct md_dir_operations    cmr_dir_ops;
-static struct lu_object_operations cmr_obj_ops;
+static const struct md_object_operations cmr_mo_ops;
+static const struct md_dir_operations    cmr_dir_ops;
+static const struct lu_object_operations cmr_obj_ops;
 
+/**
+ * \ingroup cmm
+ * Allocate CMM object.
+ */
 struct lu_object *cmm_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *loh,
                                    struct lu_device *ld)
@@ -102,8 +131,8 @@ struct lu_object *cmm_object_alloc(const struct lu_env *env,
                 struct cml_object *clo;
 
                 OBD_ALLOC_PTR(clo);
-               if (clo != NULL) {
-                       lo = &clo->cmm_obj.cmo_obj.mo_lu;
+                if (clo != NULL) {
+                        lo = &clo->cmm_obj.cmo_obj.mo_lu;
                         lu_object_init(lo, NULL, ld);
                         clo->cmm_obj.cmo_obj.mo_ops = &cml_mo_ops;
                         clo->cmm_obj.cmo_obj.mo_dir_ops = &cml_dir_ops;
@@ -113,8 +142,8 @@ struct lu_object *cmm_object_alloc(const struct lu_env *env,
                 struct cmr_object *cro;
 
                 OBD_ALLOC_PTR(cro);
-               if (cro != NULL) {
-                       lo = &cro->cmm_obj.cmo_obj.mo_lu;
+                if (cro != NULL) {
+                        lo = &cro->cmm_obj.cmo_obj.mo_lu;
                         lu_object_init(lo, NULL, ld);
                         cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
                         cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
@@ -125,18 +154,17 @@ struct lu_object *cmm_object_alloc(const struct lu_env *env,
         RETURN(lo);
 }
 
-/*
- * CMM has two types of objects - local and remote. They have different set
- * of operations so we are avoiding multiple checks in code.
+/**
+ * Get local child device.
  */
-
-/* get local child device */
 static struct lu_device *cml_child_dev(struct cmm_device *d)
 {
         return &d->cmm_child->md_lu_dev;
 }
 
-/* lu_object operations */
+/**
+ * Free cml_object.
+ */
 static void cml_object_free(const struct lu_env *env,
                             struct lu_object *lo)
 {
@@ -145,7 +173,11 @@ static void cml_object_free(const struct lu_env *env,
         OBD_FREE_PTR(clo);
 }
 
-static int cml_object_init(const struct lu_env *env, struct lu_object *lo)
+/**
+ * Initialize cml_object.
+ */
+static int cml_object_init(const struct lu_env *env, struct lu_object *lo,
+                           const struct lu_object_conf *unused)
 {
         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
         struct lu_device  *c_dev;
@@ -180,16 +212,20 @@ static int cml_object_init(const struct lu_env *env, struct lu_object *lo)
 static int cml_object_print(const struct lu_env *env, void *cookie,
                             lu_printer_t p, const struct lu_object *lo)
 {
-       return (*p)(env, cookie, LUSTRE_CMM_NAME"-local@%p", lo);
+        return (*p)(env, cookie, "[local]");
 }
 
-static struct lu_object_operations cml_obj_ops = {
-       .loo_object_init    = cml_object_init,
-       .loo_object_free    = cml_object_free,
-       .loo_object_print   = cml_object_print
+static const struct lu_object_operations cml_obj_ops = {
+        .loo_object_init    = cml_object_init,
+        .loo_object_free    = cml_object_free,
+        .loo_object_print   = cml_object_print
 };
 
-/* CMM local md_object operations */
+/**
+ * \name CMM local md_object operations.
+ * All of them call just corresponding operations on next layer.
+ * @{
+ */
 static int cml_object_create(const struct lu_env *env,
                              struct md_object *mo,
                              const struct md_op_spec *spec,
@@ -258,8 +294,8 @@ static int cml_xattr_list(const struct lu_env *env, struct md_object *mo,
 }
 
 static int cml_xattr_set(const struct lu_env *env, struct md_object *mo,
-                         const struct lu_buf *buf,
-                         const char *name, int fl)
+                         const struct lu_buf *buf, const char *name,
+                         int fl)
 {
         int rc;
         ENTRY;
@@ -330,7 +366,36 @@ static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
         RETURN(rc);
 }
 
-static struct md_object_operations cml_mo_ops = {
+static int cml_path(const struct lu_env *env, struct md_object *mo,
+                    char *path, int pathlen, __u64 *recno, int *linkno)
+{
+        int rc;
+        ENTRY;
+        rc = mo_path(env, md_object_next(mo), path, pathlen, recno, linkno);
+        RETURN(rc);
+}
+
+static int cml_object_sync(const struct lu_env *env, struct md_object *mo)
+{
+        int rc;
+        ENTRY;
+        rc = mo_object_sync(env, md_object_next(mo));
+        RETURN(rc);
+}
+
+static dt_obj_version_t cml_version_get(const struct lu_env *env,
+                                        struct md_object *mo)
+{
+        return mo_version_get(env, md_object_next(mo));
+}
+
+static void cml_version_set(const struct lu_env *env, struct md_object *mo,
+                            dt_obj_version_t version)
+{
+        return mo_version_set(env, md_object_next(mo), version);
+}
+
+static const struct md_object_operations cml_mo_ops = {
         .moo_permission    = cml_permission,
         .moo_attr_get      = cml_attr_get,
         .moo_attr_set      = cml_attr_set,
@@ -345,10 +410,22 @@ static struct md_object_operations cml_mo_ops = {
         .moo_close         = cml_close,
         .moo_readpage      = cml_readpage,
         .moo_readlink      = cml_readlink,
-        .moo_capa_get      = cml_capa_get
+        .moo_capa_get      = cml_capa_get,
+        .moo_object_sync   = cml_object_sync,
+        .moo_version_get   = cml_version_get,
+        .moo_version_set   = cml_version_set,
+        .moo_path          = cml_path,
 };
+/** @} */
 
-/* md_dir operations */
+/**
+ * \name CMM local md_dir_operations.
+ * @{
+ */
+/**
+ * cml lookup object fid by name.
+ * This returns only FID by name.
+ */
 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
                       const struct lu_name *lname, struct lu_fid *lf,
                       struct md_op_spec *spec)
@@ -368,6 +445,9 @@ static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
 
 }
 
+/**
+ * Helper to return lock mode. Used in split cases only.
+ */
 static mdl_mode_t cml_lock_mode(const struct lu_env *env,
                                 struct md_object *mo, mdl_mode_t lm)
 {
@@ -381,6 +461,17 @@ static mdl_mode_t cml_lock_mode(const struct lu_env *env,
         RETURN(rc);
 }
 
+/**
+ * Create operation for cml.
+ * Objects are local, but split can happen.
+ * If split is not needed this will call next layer mdo_create().
+ *
+ * \param mo_p Parent directory. Local object.
+ * \param lname name of file to create.
+ * \param mo_c Child object. It has no real inode yet.
+ * \param spec creation specification.
+ * \param ma child object attributes.
+ */
 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
                       const struct lu_name *lname, struct md_object *mo_c,
                       struct md_op_spec *spec, struct md_attr *ma)
@@ -409,11 +500,13 @@ static int cml_create(const struct lu_env *env, struct md_object *mo_p,
          * lock.
          */
         if (spec->sp_cr_mode == MDL_EX) {
-                /*
-                 * Try to split @mo_p. If split is ok, -ERESTART is returned and
-                 * current thread will not peoceed with create. Instead it sends
-                 * -ERESTART to client to let it know that correct MDT should be
-                 * choosen.
+                /**
+                 * Split cases:
+                 * - Try to split \a mo_p upon each create operation.
+                 *   If split is ok, -ERESTART is returned and current thread
+                 *   will not peoceed with create. Instead it sends -ERESTART
+                 *   to client to let it know that correct MDT must be chosen.
+                 * \see cmm_split_dir()
                  */
                 rc = cmm_split_dir(env, mo_p);
                 if (rc)
@@ -425,10 +518,11 @@ static int cml_create(const struct lu_env *env, struct md_object *mo_p,
         }
 
         if (spec != NULL && spec->sp_ck_split) {
-                /*
-                 * Check for possible split directory and let caller know that
+                /**
+                 * - Directory is split already. Let the caller know that
                  * it should tell client that directory is split and operation
                  * should repeat to correct MDT.
+                 * \see cmm_split_check()
                  */
                 rc = cmm_split_check(env, mo_p, lname->ln_name);
                 if (rc)
@@ -446,6 +540,7 @@ out:
         return rc;
 }
 
+/** Call mdo_create_data() on next layer. All objects are local. */
 static int cml_create_data(const struct lu_env *env, struct md_object *p,
                            struct md_object *o,
                            const struct md_op_spec *spec,
@@ -458,6 +553,7 @@ static int cml_create_data(const struct lu_env *env, struct md_object *p,
         RETURN(rc);
 }
 
+/** Call mdo_link() on next layer. All objects are local. */
 static int cml_link(const struct lu_env *env, struct md_object *mo_p,
                     struct md_object *mo_s, const struct lu_name *lname,
                     struct md_attr *ma)
@@ -469,6 +565,7 @@ static int cml_link(const struct lu_env *env, struct md_object *mo_p,
         RETURN(rc);
 }
 
+/** Call mdo_unlink() on next layer. All objects are local. */
 static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
                       struct md_object *mo_c, const struct lu_name *lname,
                       struct md_attr *ma)
@@ -480,30 +577,16 @@ static int cml_unlink(const struct lu_env *env, struct md_object *mo_p,
         RETURN(rc);
 }
 
-/* rename is split to local/remote by location of new parent dir */
-struct md_object *md_object_find(const struct lu_env *env,
-                                 struct md_device *md,
-                                 const struct lu_fid *f)
-{
-        struct lu_object *o;
-        struct md_object *m;
-        ENTRY;
-
-        o = lu_object_find(env, md2lu_dev(md)->ld_site, f);
-        if (IS_ERR(o))
-                m = (struct md_object *)o;
-        else {
-                o = lu_object_locate(o->lo_header, md2lu_dev(md)->ld_type);
-                m = o ? lu2md(o) : NULL;
-        }
-        RETURN(m);
-}
-
+/**
+ * \ingroup cmm
+ * Get mode of object.
+ * Used in both cml and cmr hence can produce RPC to another server.
+ */
 static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
                         const struct lu_fid *lf, struct md_attr *ma,
                         int *remote)
 {
-        struct md_object *mo_s = md_object_find(env, md, lf);
+        struct md_object *mo_s = md_object_find_slice(env, md, lf);
         struct cmm_thread_info *cmi;
         struct md_attr *tmp_ma;
         int rc;
@@ -532,10 +615,15 @@ static int cmm_mode_get(const struct lu_env *env, struct md_device *md,
         RETURN(rc);
 }
 
+/**
+ * \ingroup cmm
+ * Set ctime for object.
+ * Used in both cml and cmr hence can produce RPC to another server.
+ */
 static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
                             const struct lu_fid *lf, struct md_attr *ma)
 {
-        struct md_object *mo_s = md_object_find(env, md, lf);
+        struct md_object *mo_s = md_object_find_slice(env, md, lf);
         int rc;
         ENTRY;
 
@@ -549,6 +637,7 @@ static int cmm_rename_ctime(const struct lu_env *env, struct md_device *md,
         RETURN(rc);
 }
 
+/** Helper to output debug information about rename operation. */
 static inline void cml_rename_warn(const char *fname,
                                   struct md_object *mo_po,
                                   struct md_object *mo_pn,
@@ -577,6 +666,20 @@ static inline void cml_rename_warn(const char *fname,
                       t_name, err);
 }
 
+/**
+ * Rename operation for cml.
+ *
+ * This is the most complex cross-reference operation. It may consist of up to 4
+ * MDS server and require several RPCs to be sent.
+ *
+ * \param mo_po Old parent object.
+ * \param mo_pn New parent object.
+ * \param lf FID of object to rename.
+ * \param ls_name Source file name.
+ * \param mo_t target object. Should be NULL here.
+ * \param lt_name Name of target file.
+ * \param ma object attributes.
+ */
 static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
                       struct md_object *mo_pn, const struct lu_fid *lf,
                       const struct lu_name *ls_name, struct md_object *mo_t,
@@ -593,10 +696,12 @@ static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
                 RETURN(rc);
 
         if (mo_t && lu_object_exists(&mo_t->mo_lu) < 0) {
-                /* XXX: mo_t is remote object and there is RPC to unlink it.
-                 * before that, do local sanity check for rename first. */
+                /**
+                 * \note \a mo_t is remote object and there is RPC to unlink it.
+                 * Before that, do local sanity check for rename first.
+                 */
                 if (!remote) {
-                        struct md_object *mo_s = md_object_find(env,
+                        struct md_object *mo_s = md_object_find_slice(env,
                                                         md_obj2dev(mo_po), lf);
                         if (IS_ERR(mo_s))
                                 RETURN(PTR_ERR(mo_s));
@@ -621,8 +726,8 @@ static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
                         RETURN(rc);
 
                 /*
-                 * XXX: @ma will be changed after mo_ref_del, but we will use
-                 * it for mdo_rename later, so save it before mo_ref_del.
+                 * /note \a ma will be changed after mo_ref_del(), but we will use
+                 * it for mdo_rename() later, so save it before mo_ref_del().
                  */
                 cmi = cmm_env_info(env);
                 tmp_ma = &cmi->cmi_ma;
@@ -635,8 +740,10 @@ static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
                 mo_t = NULL;
         }
 
-        /* XXX: for src on remote MDS case, change its ctime before local
-         * rename. Firstly, do local sanity check for rename if necessary. */
+        /**
+         * \note for src on remote MDS case, change its ctime before local
+         * rename. Firstly, do local sanity check for rename if necessary.
+         */
         if (remote) {
                 if (!tmp_ma) {
                         rc = mo_permission(env, NULL, md_object_next(mo_po),
@@ -694,6 +801,10 @@ static int cml_rename(const struct lu_env *env, struct md_object *mo_po,
         RETURN(rc);
 }
 
+/**
+ * Rename target partial operation.
+ * Used for cross-ref rename.
+ */
 static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
                           struct md_object *mo_t, const struct lu_fid *lf,
                           const struct lu_name *lname, struct md_attr *ma)
@@ -705,7 +816,11 @@ static int cml_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
                             md_object_next(mo_t), lf, lname, ma);
         RETURN(rc);
 }
-/* used only in case of rename_tgt() when target is not exist */
+
+/**
+ * Name insert only operation.
+ * used only in case of rename_tgt() when target doesn't exist.
+ */
 static int cml_name_insert(const struct lu_env *env, struct md_object *p,
                            const struct lu_name *lname, const struct lu_fid *lf,
                            const struct md_attr *ma)
@@ -718,6 +833,10 @@ static int cml_name_insert(const struct lu_env *env, struct md_object *p,
         RETURN(rc);
 }
 
+/**
+ * \ingroup cmm
+ * Check two fids are not subdirectories.
+ */
 static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
                          const struct lu_fid *fid, struct lu_fid *sfid)
 {
@@ -737,7 +856,7 @@ static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo,
         RETURN(rc);
 }
 
-static struct md_dir_operations cml_dir_ops = {
+static const struct md_dir_operations cml_dir_ops = {
         .mdo_is_subdir   = cmm_is_subdir,
         .mdo_lookup      = cml_lookup,
         .mdo_lock_mode   = cml_lock_mode,
@@ -749,41 +868,56 @@ static struct md_dir_operations cml_dir_ops = {
         .mdo_rename_tgt  = cml_rename_tgt,
         .mdo_create_data = cml_create_data
 };
+/** @} */
+/** @} */
 
-/* -------------------------------------------------------------------
- * remote CMM object operations. cmr_...
+/**
+ * \addtogroup cmr
+ * @{
+ */
+/**
+ * \name cmr helpers
+ * @{
  */
+/** Get cmr_object from lu_object. */
 static inline struct cmr_object *lu2cmr_obj(struct lu_object *o)
 {
         return container_of0(o, struct cmr_object, cmm_obj.cmo_obj.mo_lu);
 }
+/** Get cmr_object from md_object. */
 static inline struct cmr_object *md2cmr_obj(struct md_object *mo)
 {
         return container_of0(mo, struct cmr_object, cmm_obj.cmo_obj);
 }
+/** Get cmr_object from cmm_object. */
 static inline struct cmr_object *cmm2cmr_obj(struct cmm_object *co)
 {
         return container_of0(co, struct cmr_object, cmm_obj);
 }
+/** @} */
 
-/* get proper child device from MDCs */
+/**
+ * Get proper child device from MDCs.
+ */
 static struct lu_device *cmr_child_dev(struct cmm_device *d, __u32 num)
 {
         struct lu_device *next = NULL;
         struct mdc_device *mdc;
 
-        spin_lock(&d->cmm_tgt_guard);
-        list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
+        cfs_spin_lock(&d->cmm_tgt_guard);
+        cfs_list_for_each_entry(mdc, &d->cmm_targets, mc_linkage) {
                 if (mdc->mc_num == num) {
                         next = mdc2lu_dev(mdc);
                         break;
                 }
         }
-        spin_unlock(&d->cmm_tgt_guard);
+        cfs_spin_unlock(&d->cmm_tgt_guard);
         return next;
 }
 
-/* lu_object operations */
+/**
+ * Free cmr_object.
+ */
 static void cmr_object_free(const struct lu_env *env,
                             struct lu_object *lo)
 {
@@ -792,7 +926,11 @@ static void cmr_object_free(const struct lu_env *env,
         OBD_FREE_PTR(cro);
 }
 
-static int cmr_object_init(const struct lu_env *env, struct lu_object *lo)
+/**
+ * Initialize cmr object.
+ */
+static int cmr_object_init(const struct lu_env *env, struct lu_object *lo,
+                           const struct lu_object_conf *unused)
 {
         struct cmm_device *cd = lu2cmm_dev(lo->lo_dev);
         struct lu_device  *c_dev;
@@ -818,19 +956,35 @@ static int cmr_object_init(const struct lu_env *env, struct lu_object *lo)
         RETURN(rc);
 }
 
+/**
+ * Output lu_object data.
+ */
 static int cmr_object_print(const struct lu_env *env, void *cookie,
                             lu_printer_t p, const struct lu_object *lo)
 {
-       return (*p)(env, cookie, LUSTRE_CMM_NAME"-remote@%p", lo);
+        const struct cmr_object *cro = lu2cmr_obj((struct lu_object *)lo);
+        return (*p)(env, cookie, "[remote](mds_num=%d)", cro->cmo_num);
 }
 
-static struct lu_object_operations cmr_obj_ops = {
-       .loo_object_init    = cmr_object_init,
-       .loo_object_free    = cmr_object_free,
-       .loo_object_print   = cmr_object_print
+/**
+ * Cmr instance of lu_object_operations.
+ */
+static const struct lu_object_operations cmr_obj_ops = {
+        .loo_object_init    = cmr_object_init,
+        .loo_object_free    = cmr_object_free,
+        .loo_object_print   = cmr_object_print
 };
 
-/* CMM remote md_object operations. All are invalid */
+/**
+ * \name cmr remote md_object operations.
+ * All operations here are invalid and return errors. There is no local object
+ * so these operations return two kinds of error:
+ * -# -EFAULT if operation is prohibited.
+ * -# -EREMOTE if operation can be done just to notify upper level about remote
+ *  object.
+ *
+ * @{
+ */
 static int cmr_object_create(const struct lu_env *env,
                              struct md_object *mo,
                              const struct md_op_spec *spec,
@@ -877,7 +1031,8 @@ static int cmr_xattr_list(const struct lu_env *env, struct md_object *mo,
 }
 
 static int cmr_xattr_set(const struct lu_env *env, struct md_object *mo,
-                         const struct lu_buf *buf, const char *name, int fl)
+                         const struct lu_buf *buf, const char *name,
+                         int fl)
 {
         return -EFAULT;
 }
@@ -924,7 +1079,41 @@ static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
         return -EFAULT;
 }
 
-static struct md_object_operations cmr_mo_ops = {
+static int cmr_path(const struct lu_env *env, struct md_object *obj,
+                    char *path, int pathlen, __u64 *recno, int *linkno)
+{
+        return -EREMOTE;
+}
+
+static int cmr_object_sync(const struct lu_env *env, struct md_object *mo)
+{
+        return -EFAULT;
+}
+
+/**
+ * cmr moo_version_get().
+ */
+static dt_obj_version_t cmr_version_get(const struct lu_env *env,
+                                        struct md_object *mo)
+{
+        /** Don't check remote object version */
+        return 0;
+}
+
+
+/**
+ * cmr moo_version_set().
+ * No need to update remote object version here, it is done as a part
+ * of reintegration of partial operation on the remote server.
+ */
+static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
+                            dt_obj_version_t version)
+{
+        return;
+}
+
+/** Set of md_object_operations for cmr. */
+static const struct md_object_operations cmr_mo_ops = {
         .moo_permission    = cmr_permission,
         .moo_attr_get      = cmr_attr_get,
         .moo_attr_set      = cmr_attr_set,
@@ -939,10 +1128,26 @@ static struct md_object_operations cmr_mo_ops = {
         .moo_close         = cmr_close,
         .moo_readpage      = cmr_readpage,
         .moo_readlink      = cmr_readlink,
-        .moo_capa_get      = cmr_capa_get
+        .moo_capa_get      = cmr_capa_get,
+        .moo_object_sync   = cmr_object_sync,
+        .moo_version_get   = cmr_version_get,
+        .moo_version_set   = cmr_version_set,
+        .moo_path          = cmr_path,
 };
+/** @} */
 
-/* remote part of md_dir operations */
+/**
+ * \name cmr md_dir operations.
+ *
+ * All methods below are cross-ref by nature. They consist of remote call and
+ * local operation. Due to future rollback functionality there are several
+ * limitations for such methods:
+ * -# remote call should be done at first to do epoch negotiation between all
+ * MDS involved and to avoid the RPC inside transaction.
+ * -# only one RPC can be sent - also due to epoch negotiation.
+ * For more details see rollback HLD/DLD.
+ * @{
+ */
 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
                       const struct lu_name *lname, struct lu_fid *lf,
                       struct md_op_spec *spec)
@@ -955,20 +1160,22 @@ static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
         return -EREMOTE;
 }
 
+/** Return lock mode. */
 static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
                                 struct md_object *mo, mdl_mode_t lm)
 {
         return MDL_MINMODE;
 }
 
-/*
- * All methods below are cross-ref by nature. They consist of remote call and
- * local operation. Due to future rollback functionality there are several
- * limitations for such methods:
- * 1) remote call should be done at first to do epoch negotiation between all
- * MDS involved and to avoid the RPC inside transaction.
- * 2) only one RPC can be sent - also due to epoch negotiation.
- * For more details see rollback HLD/DLD.
+/**
+ * Create operation for cmr.
+ * Remote object creation and local name insert.
+ *
+ * \param mo_p Parent directory. Local object.
+ * \param lchild_name name of file to create.
+ * \param mo_c Child object. It has no real inode yet.
+ * \param spec creation specification.
+ * \param ma child object attributes.
  */
 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
                       const struct lu_name *lchild_name, struct md_object *mo_c,
@@ -1030,10 +1237,9 @@ static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
         if (rc)
                 RETURN(rc);
 
-        /* Remote object creation and local name insert. */
-        /*
-         * XXX: @ma will be changed after mo_object_create, but we will use
-         * it for mdo_name_insert later, so save it before mo_object_create.
+        /**
+         * \note \a ma will be changed after mo_object_create(), but we will use
+         * it for mdo_name_insert() later, so save it before mo_object_create().
          */
         *tmp_ma = *ma;
         rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
@@ -1054,6 +1260,18 @@ static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
         RETURN(rc);
 }
 
+/**
+ * Link operations for cmr.
+ *
+ * The link RPC is always issued to the server where source parent is living.
+ * The first operation to do is object nlink increment on remote server.
+ * Second one is local mdo_name_insert().
+ *
+ * \param mo_p parent directory. It is local.
+ * \param mo_s source object to link. It is remote.
+ * \param lname Name of link file.
+ * \param ma object attributes.
+ */
 static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
                     struct md_object *mo_s, const struct lu_name *lname,
                     struct md_attr *ma)
@@ -1092,6 +1310,18 @@ static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
         RETURN(rc);
 }
 
+/**
+ * Unlink operations for cmr.
+ *
+ * The unlink RPC is always issued to the server where parent is living. Hence
+ * the first operation to do is object unlink on remote server. Second one is
+ * local mdo_name_remove().
+ *
+ * \param mo_p parent md_object. It is local.
+ * \param mo_c child object to be unlinked. It is remote.
+ * \param lname Name of file to unlink.
+ * \param ma object attributes.
+ */
 static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
                       struct md_object *mo_c, const struct lu_name *lname,
                       struct md_attr *ma)
@@ -1108,8 +1338,8 @@ static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
                 RETURN(rc);
 
         /*
-         * XXX: @ma will be changed after mo_ref_del, but we will use
-         * it for mdo_name_remove later, so save it before mo_ref_del.
+         * \note \a ma will be changed after mo_ref_del, but we will use
+         * it for mdo_name_remove() later, so save it before mo_ref_del().
          */
         cmi = cmm_env_info(env);
         tmp_ma = &cmi->cmi_ma;
@@ -1131,6 +1361,7 @@ static int cmr_unlink(const struct lu_env *env, struct md_object *mo_p,
         RETURN(rc);
 }
 
+/** Helper which outputs error message during cmr_rename() */
 static inline void cmr_rename_warn(const char *fname,
                                   struct md_object *mo_po,
                                   struct md_object *mo_pn,
@@ -1147,6 +1378,20 @@ static inline void cmr_rename_warn(const char *fname,
               PFID(lf), s_name, t_name, err);
 }
 
+/**
+ * Rename operation for cmr.
+ *
+ * This is the most complex cross-reference operation. It may consist of up to 4
+ * MDS server and require several RPCs to be sent.
+ *
+ * \param mo_po Old parent object.
+ * \param mo_pn New parent object.
+ * \param lf FID of object to rename.
+ * \param ls_name Source file name.
+ * \param mo_t target object. Should be NULL here.
+ * \param lt_name Name of target file.
+ * \param ma object attributes.
+ */
 static int cmr_rename(const struct lu_env *env,
                       struct md_object *mo_po, struct md_object *mo_pn,
                       const struct lu_fid *lf, const struct lu_name *ls_name,
@@ -1171,16 +1416,18 @@ static int cmr_rename(const struct lu_env *env,
         if (rc)
                 RETURN(rc);
 
-        /*
-         * XXX: @ma maybe changed after mdo_rename_tgt, but we will use it
-         * for mdo_name_remove later, so save it before mdo_rename_tgt.
+        /**
+         * \todo \a ma maybe changed after mdo_rename_tgt(), but we will use it
+         * for mdo_name_remove() later, so save it before mdo_rename_tgt.
          */
         cmi = cmm_env_info(env);
         tmp_ma = &cmi->cmi_ma;
         *tmp_ma = *ma;
-        /* the mo_pn is remote directory, so we cannot even know if there is
-         * mo_t or not. Therefore mo_t is NULL here but remote server should do
-         * lookup and process this further */
+        /**
+         * \note The \a mo_pn is remote directory, so we cannot even know if there is
+         * \a mo_t or not. Therefore \a mo_t is NULL here but remote server should do
+         * lookup and process this further.
+         */
         rc = mdo_rename_tgt(env, md_object_next(mo_pn),
                             NULL/* mo_t */, lf, lt_name, ma);
         if (rc)
@@ -1207,8 +1454,10 @@ static int cmr_rename(const struct lu_env *env,
         RETURN(rc);
 }
 
-/* part of cross-ref rename(). Used to insert new name in new parent
- * and unlink target */
+/**
+ * Part of cross-ref rename().
+ * Used to insert new name in new parent and unlink target.
+ */
 static int cmr_rename_tgt(const struct lu_env *env,
                           struct md_object *mo_p, struct md_object *mo_t,
                           const struct lu_fid *lf, const struct lu_name *lname,
@@ -1252,8 +1501,11 @@ static int cmr_rename_tgt(const struct lu_env *env,
         }
         RETURN(rc);
 }
-
-static struct md_dir_operations cmr_dir_ops = {
+/** @} */
+/**
+ * The md_dir_operations for cmr.
+ */
+static const struct md_dir_operations cmr_dir_ops = {
         .mdo_is_subdir   = cmm_is_subdir,
         .mdo_lookup      = cmr_lookup,
         .mdo_lock_mode   = cmr_lock_mode,
@@ -1261,5 +1513,6 @@ static struct md_dir_operations cmr_dir_ops = {
         .mdo_link        = cmr_link,
         .mdo_unlink      = cmr_unlink,
         .mdo_rename      = cmr_rename,
-        .mdo_rename_tgt  = cmr_rename_tgt,
+        .mdo_rename_tgt  = cmr_rename_tgt
 };
+/** @} */