Whamcloud - gitweb
LU-1187 mdd: a few missing stuff in MD stack for DNE.
authorwangdi <di.wang@whamcloud.com>
Tue, 1 Oct 2013 11:25:49 +0000 (04:25 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Sun, 3 Feb 2013 21:20:53 +0000 (16:20 -0500)
1. Assign the index operation for the directory create
in declare phase, which is needed for creating the
object in OUT.

2. Declare dotdot insertion for remote directory creation,
so the insert update can be packed into RPC and send to
the remote MDT (out) to be executed.

3. Add hint to the object declare phase, by which OSP can
get some info during update RPC packing, like parent fid.

4. Separate lu_object_exists into lu_object_remote and
lu_object_exists, so it can check whether remote object
exists as well.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: Icb359443d9982ee8567af933c5def42dc51a3a7a
Reviewed-on: http://review.whamcloud.com/4930
Tested-by: Hudson
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
20 files changed:
lustre/include/dt_object.h
lustre/include/lu_object.h
lustre/include/lustre_log.h
lustre/lmv/lmv_obd.c
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lvb.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/mdt/mdt_reint.c
lustre/obdclass/llog_osd.c
lustre/osd-ldiskfs/osd_compat.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h

index 8de5cc5..f8c7477 100644 (file)
@@ -711,6 +711,11 @@ static inline int dt_object_exists(const struct dt_object *dt)
         return lu_object_exists(&dt->do_lu);
 }
 
+static inline int dt_object_remote(const struct dt_object *dt)
+{
+       return lu_object_remote(&dt->do_lu);
+}
+
 static inline struct dt_object *lu2dt_obj(struct lu_object *o)
 {
        LASSERT(ergo(o != NULL, lu_device_is_dt(o->lo_dev)));
index be66fda..b7604e3 100644 (file)
@@ -834,31 +834,25 @@ int lu_object_invariant(const struct lu_object *o);
 
 
 /**
- * \retval  1 iff object \a o exists on stable storage,
- * \retval  0 iff object \a o not exists on stable storage.
- * \retval -1 iff object \a o is on remote server.
+ * Check whether object exists, no matter on local or remote storage.
+ * Note: LOHA_EXISTS will be set once some one created the object,
+ * and it does not needs to be committed to storage.
  */
-static inline int lu_object_exists(const struct lu_object *o)
-{
-        __u32 attr;
-
-        attr = o->lo_header->loh_attr;
-        if (attr & LOHA_REMOTE)
-                return -1;
-        else if (attr & LOHA_EXISTS)
-                return +1;
-        else
-                return 0;
-}
+#define lu_object_exists(o) ((o)->lo_header->loh_attr & LOHA_EXISTS)
+
+/**
+ * Check whether object on the remote storage.
+ */
+#define lu_object_remote(o) unlikely((o)->lo_header->loh_attr & LOHA_REMOTE)
 
 static inline int lu_object_assert_exists(const struct lu_object *o)
 {
-        return lu_object_exists(o) != 0;
+       return lu_object_exists(o);
 }
 
 static inline int lu_object_assert_not_exists(const struct lu_object *o)
 {
-        return lu_object_exists(o) <= 0;
+       return !lu_object_exists(o);
 }
 
 /**
index a0a11a8..17c8434 100644 (file)
@@ -367,9 +367,11 @@ int llog_put_cat_list(struct obd_device *disk_obd,
 /* llog_osd.c */
 extern struct llog_operations llog_osd_ops;
 int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
-                         int idx, int count, struct llog_catid *idarray);
+                         int idx, int count,
+                         struct llog_catid *idarray);
 int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
-                         int idx, int count, struct llog_catid *idarray);
+                         int idx, int count,
+                         struct llog_catid *idarray);
 
 #define LLOG_CTXT_FLAG_UNINITIALIZED     0x00000001
 #define LLOG_CTXT_FLAG_STOP             0x00000002
index fa3e183..52055c6 100644 (file)
@@ -1989,7 +1989,7 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
        struct lmv_obd          *lmv = &obd->u.lmv;
        struct lmv_tgt_desc     *tgt = NULL;
        struct mdt_body         *body;
-       int                      rc;
+       int                     rc;
        ENTRY;
 
        rc = lmv_check_connect(obd);
@@ -2036,14 +2036,37 @@ retry:
        body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
        if (body == NULL)
                RETURN(-EPROTO);
-       /*
-        * Not cross-ref case, just get out of here.
-        */
+
+       /* Not cross-ref case, just get out of here. */
        if (likely(!(body->valid & OBD_MD_MDS)))
                RETURN(0);
 
-       /* Clearly this is a remote object, try remote MDT */
+       CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
+              exp->exp_obd->obd_name, PFID(&body->fid1));
+
+       /* This is a remote object, try remote MDT, Note: it may
+        * try more than 1 time here, Considering following case
+        * /mnt/lustre is root on MDT0, remote1 is on MDT1
+        * 1. Initially A does not know where remote1 is, it send
+        *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
+        *    resend unlink RPC to MDT1 (retry 1st time).
+        *
+        * 2. During the unlink RPC in flight,
+        *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
+        *    and create new remote1, but on MDT0
+        *
+        * 3. MDT1 get unlink RPC(from A), then do remote lock on
+        *    /mnt/lustre, then lookup get fid of remote1, and find
+        *    it is remote dir again, and replay -EREMOTE again.
+        *
+        * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
+        *
+        * In theory, it might try unlimited time here, but it should
+        * be very rare case.  */
        op_data->op_fid2 = body->fid1;
+       ptlrpc_req_finished(*request);
+       *request = NULL;
+
        goto retry;
 }
 
index 2f288a4..7ec9a93 100644 (file)
@@ -689,7 +689,7 @@ static void lod_ah_init(const struct lu_env *env,
         * in case of late striping creation, ->ah_init()
         * can be called with local object existing
         */
-       if (!dt_object_exists(nextc))
+       if (!dt_object_exists(nextc) || dt_object_remote(nextc))
                nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
 
        if (S_ISDIR(child_mode)) {
@@ -876,7 +876,6 @@ static int lod_declare_object_create(const struct lu_env *env,
        LASSERT(dof);
        LASSERT(attr);
        LASSERT(th);
-       LASSERT(!dt_object_exists(next));
 
        /*
         * first of all, we declare creation of local object
index de42a5c..ab7f9be 100644 (file)
@@ -169,7 +169,7 @@ static int mdd_is_parent(const struct lu_env *env,
                parent = mdd_object_find(env, mdd, pfid);
                if (IS_ERR(parent)) {
                        GOTO(out, rc = PTR_ERR(parent));
-               } else if (mdd_object_exists(parent) < 0) {
+               } else if (mdd_object_remote(parent)) {
                        /*FIXME: Because of the restriction of rename in Phase I.
                         * If the parent is remote, we just assumed lf is not the
                         * parent of P1 for now */
@@ -297,7 +297,7 @@ int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
         int rc = 0;
         ENTRY;
 
-        if (cobj && mdd_object_exists(cobj))
+       if (cobj && mdd_object_exists(cobj))
                 RETURN(-EEXIST);
 
         if (mdd_is_dead_obj(pobj))
@@ -1416,14 +1416,12 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         if (unlikely(mdd_is_dead_obj(mdd_obj)))
                 RETURN(-ESTALE);
 
-        rc = mdd_object_exists(mdd_obj);
-        if (unlikely(rc == 0))
-                RETURN(-ESTALE);
-        else if (unlikely(rc < 0)) {
-                CERROR("Object "DFID" locates on remote server\n",
-                        PFID(mdo2fid(mdd_obj)));
-                RETURN(-EINVAL);
-        }
+       if (mdd_object_remote(mdd_obj)) {
+               CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
+                      mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
+       } else if (!mdd_object_exists(mdd_obj)) {
+               RETURN(-ESTALE);
+       }
 
         /* The common filename length check. */
         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
@@ -1450,12 +1448,14 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         RETURN(rc);
 }
 
-int mdd_declare_object_initialize(const struct lu_env *env,
-                                 struct mdd_object *child,
-                                 struct lu_attr *attr,
-                                 struct thandle *handle)
+static int mdd_declare_object_initialize(const struct lu_env *env,
+                                        struct mdd_object *parent,
+                                        struct mdd_object *child,
+                                        struct lu_attr *attr,
+                                        struct thandle *handle)
 {
         int rc;
+       ENTRY;
 
        /*
         * inode mode has been set in creation time, and it's based on umask,
@@ -1472,12 +1472,15 @@ int mdd_declare_object_initialize(const struct lu_env *env,
                                              dot, handle);
                 if (rc == 0)
                         rc = mdo_declare_ref_add(env, child, handle);
+
+               rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
+                                             dotdot, handle);
         }
 
         if (rc == 0)
                 mdd_declare_links_add(env, child, handle);
 
-        return rc;
+       RETURN(rc);
 }
 
 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
@@ -1652,7 +1655,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                        GOTO(out, rc);
         }
 
-       rc = mdd_declare_object_initialize(env, c, attr, handle);
+       rc = mdd_declare_object_initialize(env, p, c, attr, handle);
        if (rc)
                GOTO(out, rc);
 
index d5e4286..cb512de 100644 (file)
@@ -543,6 +543,11 @@ static inline int mdd_object_exists(struct mdd_object *obj)
         return lu_object_exists(mdd2lu_obj(obj));
 }
 
+static inline int mdd_object_remote(struct mdd_object *obj)
+{
+       return lu_object_remote(mdd2lu_obj(obj));
+}
+
 static inline const struct lu_fid *mdd_object_fid(struct mdd_object *obj)
 {
         return lu_object_fid(mdd2lu_obj(obj));
@@ -734,7 +739,13 @@ int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj,
          * if the object doesn't exist yet, then it's supposed to be created
          * and declaration of the creation should be enough to insert ./..
          */
-        if (mdd_object_exists(obj)) {
+        /* FIXME: remote object should not be awared by MDD layer, but local
+         * creation does not declare insert ./.. (comments above), which
+         * is required by remote directory creation.
+         * This remote check should be removed when mdd_object_exists check is
+         * removed.
+         */
+        if (mdd_object_exists(obj) || mdd_object_remote(obj)) {
                 rc = -ENOTDIR;
                 if (dt_try_as_dir(env, next))
                         rc = dt_declare_insert(env, next,
index 64ea295..6248eec 100644 (file)
@@ -224,7 +224,7 @@ static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
 
 static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
 {
-        if (lu_object_exists(o))
+       if (lu_object_exists(o))
                 return mdd_get_flags(env, lu2mdd_obj(o));
         else
                 return 0;
@@ -608,6 +608,7 @@ int mdd_declare_object_create_internal(const struct lu_env *env,
                                       const struct md_op_spec *spec)
 {
         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
+       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
         const struct dt_index_features *feat = spec->sp_feat;
         int rc;
         ENTRY;
@@ -629,7 +630,7 @@ int mdd_declare_object_create_internal(const struct lu_env *env,
                }
        }
 
-       rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
+       rc = mdo_declare_create_obj(env, c, attr, hint, dof, handle);
 
         RETURN(rc);
 }
index 15d653a..bef495f 100644 (file)
@@ -684,13 +684,12 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
         ma->ma_valid = 0;
 
-        rc = mdt_object_exists(o);
-        if (rc < 0) {
-                /* This object is located on remote node.*/
-                repbody->fid1 = *mdt_object_fid(o);
-                repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
-                GOTO(out, rc = 0);
-        }
+       if (mdt_object_remote(o)) {
+               /* This object is located on remote node.*/
+               repbody->fid1 = *mdt_object_fid(o);
+               repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
+               GOTO(out, rc = 0);
+       }
 
        buffer->lb_len = reqbody->eadatasize;
        if (buffer->lb_len > 0)
@@ -993,18 +992,18 @@ int mdt_is_subdir(struct mdt_thread_info *info)
 
         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
 
-        /*
-         * We save last checked parent fid to @repbody->fid1 for remote
-         * directory case.
-         */
-        LASSERT(fid_is_sane(&body->fid2));
-        LASSERT(mdt_object_exists(o) > 0);
-        rc = mdo_is_subdir(info->mti_env, mdt_object_child(o),
-                           &body->fid2, &repbody->fid1);
-        if (rc == 0 || rc == -EREMOTE)
-                repbody->valid |= OBD_MD_FLID;
+       /*
+        * We save last checked parent fid to @repbody->fid1 for remote
+        * directory case.
+        */
+       LASSERT(fid_is_sane(&body->fid2));
+       LASSERT(mdt_object_exists(o) && !mdt_object_remote(o));
+       rc = mdo_is_subdir(info->mti_env, mdt_object_child(o),
+                          &body->fid2, &repbody->fid1);
+       if (rc == 0 || rc == -EREMOTE)
+               repbody->valid |= OBD_MD_FLID;
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdt_raw_lookup(struct mdt_thread_info *info,
@@ -1117,14 +1116,14 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         }
         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
 
-       rc = mdt_object_exists(parent);
-       if (unlikely(rc == 0)) {
+       if (unlikely(!mdt_object_exists(parent))) {
                LU_OBJECT_DEBUG(D_INODE, info->mti_env,
                                &parent->mot_obj.mo_lu,
                                "Parent doesn't exist!\n");
                RETURN(-ESTALE);
        } else if (!info->mti_cross_ref) {
-               LASSERTF(rc > 0, "Parent "DFID" is on remote server\n",
+               LASSERTF(!mdt_object_remote(parent),
+                        "Parent "DFID" is on remote server\n",
                         PFID(mdt_object_fid(parent)));
        }
         if (lname) {
@@ -1248,15 +1247,15 @@ relock:
                 mdt_lock_handle_init(lhc);
                mdt_lock_reg_init(lhc, LCK_PR);
 
-                if (mdt_object_exists(child) == 0) {
-                        LU_OBJECT_DEBUG(D_INODE, info->mti_env,
-                                        &child->mot_obj.mo_lu,
-                                        "Object doesn't exist!\n");
-                        GOTO(out_child, rc = -ENOENT);
-                }
+               if (!mdt_object_exists(child)) {
+                       LU_OBJECT_DEBUG(D_INODE, info->mti_env,
+                                       &child->mot_obj.mo_lu,
+                                       "Object doesn't exist!\n");
+                       GOTO(out_child, rc = -ENOENT);
+               }
 
                if (!(child_bits & MDS_INODELOCK_UPDATE) &&
-                     mdt_object_exists(child) > 0) {
+                     mdt_object_exists(child) && !mdt_object_remote(child)) {
                         struct md_attr *ma = &info->mti_attr;
 
                         ma->ma_valid = 0;
@@ -1333,7 +1332,7 @@ relock:
                          (unsigned long)res_id->name[1],
                          (unsigned long)res_id->name[2],
                          PFID(mdt_object_fid(child)));
-               if (mdt_object_exists(child) > 0)
+               if (mdt_object_exists(child) && !mdt_object_remote(child))
                        mdt_pack_size2body(info, child);
         }
         if (lock)
@@ -2415,7 +2414,7 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti,
        int rc = 0;
        ENTRY;
 
-       LASSERT(mdt_object_exists(o) < 0);
+       LASSERT(mdt_object_remote(o));
 
        LASSERT((ibits & MDS_INODELOCK_UPDATE));
 
@@ -2449,7 +2448,7 @@ static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
         LASSERT(lh->mlh_type != MDT_NUL_LOCK);
 
-        if (mdt_object_exists(o) < 0) {
+       if (mdt_object_remote(o)) {
                 if (locality == MDT_CROSS_LOCK) {
                         ibits &= ~MDS_INODELOCK_UPDATE;
                         ibits |= MDS_INODELOCK_LOOKUP;
@@ -2461,7 +2460,7 @@ static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
                 LASSERT(lh->mlh_type != MDT_PDO_LOCK);
         }
 
-        if (lh->mlh_type == MDT_PDO_LOCK) {
+       if (lh->mlh_type == MDT_PDO_LOCK) {
                 /* check for exists after object is locked */
                 if (mdt_object_exists(o) == 0) {
                         /* Non-existent object shouldn't have PDO lock */
@@ -5513,24 +5512,23 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
         if (IS_ERR(obj))
                 RETURN(PTR_ERR(obj));
 
-        rc = mdt_object_exists(obj);
-        if (rc < 0) {
-                rc = -EREMOTE;
-                /**
-                 * before calling version get the correct MDS should be
-                 * fid, this is error to find remote object here
-                 */
-                CERROR("nonlocal object "DFID"\n", PFID(fid));
-        } else if (rc == 0) {
-                *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
-                rc = -ENOENT;
-        } else {
-                version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
-               *(__u64 *)data->ioc_inlbuf2 = version;
-                rc = 0;
-        }
-        mdt_object_unlock_put(mti, obj, lh, 1);
-        RETURN(rc);
+       if (mdt_object_remote(obj)) {
+               rc = -EREMOTE;
+               /**
+                * before calling version get the correct MDS should be
+                * fid, this is error to find remote object here
+                */
+               CERROR("nonlocal object "DFID"\n", PFID(fid));
+       } else if (!mdt_object_exists(obj)) {
+               *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
+               rc = -ENOENT;
+       } else {
+               version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
+              *(__u64 *)data->ioc_inlbuf2 = version;
+               rc = 0;
+       }
+       mdt_object_unlock_put(mti, obj, lh, 1);
+       RETURN(rc);
 }
 
 /* ioctls on obd dev */
index 339fc1a..57ada59 100644 (file)
@@ -589,6 +589,11 @@ static inline int mdt_object_exists(const struct mdt_object *o)
         return lu_object_exists(&o->mot_obj.mo_lu);
 }
 
+static inline int mdt_object_remote(const struct mdt_object *o)
+{
+       return lu_object_remote(&o->mot_obj.mo_lu);
+}
+
 static inline const struct lu_fid *mdt_object_fid(const struct mdt_object *o)
 {
         return lu_object_fid(&o->mot_obj.mo_lu);
index 40a525e..9a830d0 100644 (file)
@@ -140,7 +140,7 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
-       if (mdt_object_exists(obj) <= 0)
+       if (!mdt_object_exists(obj) || mdt_object_remote(obj))
                GOTO(out, rc = -ENOENT);
 
        child = mdt_object_child(obj);
index c9e5ecd..3e7dfd3 100644 (file)
@@ -1048,25 +1048,29 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
                         mdt_export_evict(exp);
                         RETURN_EXIT;
                 }
-                rc = mdt_object_exists(child);
-                if (rc > 0) {
-
-                        mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
-                       rc = mdt_attr_get_complex(info, child, ma);
-                        if (rc == 0)
-                              rc = mdt_finish_open(info, parent, child,
-                                                   flags, 1, ldlm_rep);
-                } else if (rc < 0) {
-                        /* the child object was created on remote server */
-                        repbody->fid1 = *rr->rr_fid2;
-                        repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
-                        rc = 0;
-                } else if (rc == 0) {
-                        /* the child does not exist, we should do regular open */
-                        mdt_object_put(env, parent);
-                        mdt_object_put(env, child);
-                        GOTO(regular_open, 0);
-                }
+
+               if (unlikely(mdt_object_remote(child))) {
+                       /* the child object was created on remote server */
+                       repbody->fid1 = *rr->rr_fid2;
+                       repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+                       rc = 0;
+               } else {
+                       if (mdt_object_exists(child)) {
+                               mdt_set_capainfo(info, 1, rr->rr_fid2,
+                                                BYPASS_CAPA);
+                               rc = mdt_attr_get_complex(info, child, ma);
+                               if (rc == 0)
+                                       rc = mdt_finish_open(info, parent,
+                                                            child, flags,
+                                                            1, ldlm_rep);
+                       } else {
+                               /* the child does not exist, we should do
+                                * regular open */
+                               mdt_object_put(env, parent);
+                               mdt_object_put(env, child);
+                               GOTO(regular_open, 0);
+                       }
+               }
                 mdt_object_put(env, parent);
                 mdt_object_put(env, child);
                 GOTO(out, rc);
@@ -1097,25 +1101,27 @@ int mdt_open_by_fid(struct mdt_thread_info* info,
         if (IS_ERR(o))
                 RETURN(rc = PTR_ERR(o));
 
-        rc = mdt_object_exists(o);
-        if (rc > 0) {
-                mdt_set_disposition(info, rep, (DISP_IT_EXECD |
-                                                DISP_LOOKUP_EXECD |
-                                                DISP_LOOKUP_POS));
-
-               rc = mdt_attr_get_complex(info, o, ma);
-                if (rc == 0)
-                        rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
-        } else if (rc == 0) {
-                rc = -ENOENT;
-        } else  {
+       if (unlikely(mdt_object_remote(o))) {
                 /* the child object was created on remote server */
                 struct mdt_body *repbody;
                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
                 repbody->fid1 = *rr->rr_fid2;
                 repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
                 rc = 0;
-        }
+       } else {
+               if (mdt_object_exists(o)) {
+                       mdt_set_disposition(info, rep, (DISP_IT_EXECD |
+                                                       DISP_LOOKUP_EXECD |
+                                                       DISP_LOOKUP_POS));
+
+                       rc = mdt_attr_get_complex(info, o, ma);
+                       if (rc == 0)
+                               rc = mdt_finish_open(info, NULL, o, flags, 0,
+                                                    rep);
+               } else {
+                       rc = -ENOENT;
+               }
+       }
 
         mdt_object_put(info->mti_env, o);
         RETURN(rc);
@@ -1268,17 +1274,18 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
         if (IS_ERR(o))
                 RETURN(rc = PTR_ERR(o));
 
-        rc = mdt_object_exists(o);
-        if (rc == 0) {
-                mdt_set_disposition(info, rep, (DISP_LOOKUP_EXECD |
-                                    DISP_LOOKUP_NEG));
-                GOTO(out, rc = -ENOENT);
-        } else if (rc < 0) {
-                CERROR("NFS remote open shouldn't happen.\n");
-                GOTO(out, rc);
-        }
-        mdt_set_disposition(info, rep, (DISP_IT_EXECD |
-                                       DISP_LOOKUP_EXECD));
+       if (mdt_object_remote(o)) {
+               CDEBUG(D_INFO, "%s: "DFID" is on remote MDT.\n",
+                      info->mti_mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name,
+                      PFID(rr->rr_fid2));
+               GOTO(out, rc = -EREMOTE);
+       } else if (!mdt_object_exists(o)) {
+               mdt_set_disposition(info, rep, (DISP_LOOKUP_EXECD |
+                                   DISP_LOOKUP_NEG));
+               GOTO(out, rc = -ENOENT);
+       }
+
+       mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
        rc = mdt_attr_get_complex(info, o, ma);
         if (rc)
@@ -1335,35 +1342,38 @@ int mdt_cross_open(struct mdt_thread_info* info,
         if (IS_ERR(o))
                 RETURN(rc = PTR_ERR(o));
 
-        rc = mdt_object_exists(o);
-        if (rc > 0) {
-                /* Do permission check for cross-open. */
-                rc = mo_permission(info->mti_env, NULL, mdt_object_child(o),
-                                   NULL, flags | MDS_OPEN_CROSS);
-                if (rc)
-                        goto out;
-
-                mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
-               rc = mdt_attr_get_complex(info, o, ma);
-                if (rc == 0)
-                        rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
-        } else if (rc == 0) {
-                /*
-                 * Something is wrong here. lookup was positive but there is
-                 * no object!
-                 */
-                CERROR("Cross-ref object doesn't exist!\n");
-                rc = -EFAULT;
-        } else  {
-                /* Something is wrong here, the object is on another MDS! */
-                CERROR("The object isn't on this server! FLD error?\n");
-                LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
-                                &o->mot_obj.mo_lu,
-                                "Object isn't on this server! FLD error?\n");
-
+       if (mdt_object_remote(o)) {
+               /* Something is wrong here, the object is on another MDS! */
+               CERROR("%s: "DFID" isn't on this server!: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
+               LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
+                               &o->mot_obj.mo_lu,
+                               "Object isn't on this server! FLD error?\n");
                 rc = -EFAULT;
+       } else {
+               if (mdt_object_exists(o)) {
+                       /* Do permission check for cross-open. */
+                       rc = mo_permission(info->mti_env, NULL,
+                                          mdt_object_child(o),
+                                          NULL, flags | MDS_OPEN_CROSS);
+                       if (rc)
+                               goto out;
+
+                       mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
+                       rc = mdt_attr_get_complex(info, o, ma);
+                       if (rc == 0)
+                               rc = mdt_finish_open(info, NULL, o, flags, 0,
+                                                    rep);
+               } else {
+                       /*
+                        * Something is wrong here. lookup was positive but
+                        * there is no object!
+                        */
+                       CERROR("%s: "DFID" doesn't exist!: rc = %d\n",
+                             mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
+                       rc = -EFAULT;
+               }
         }
-
 out:
         mdt_object_put(info->mti_env, o);
         RETURN(rc);
@@ -1574,13 +1584,10 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 }
                 created = 1;
         } else {
-               /* We have to get attr & LOV EA, HSM bits for this object */
-               ma->ma_need |= MA_HSM;
-               result = mdt_attr_get_complex(info, child, ma);
                 /*
                  * The object is on remote node, return its FID for remote open.
                  */
-                if (result == -EREMOTE) {
+               if (mdt_object_remote(child)) {
                         /*
                          * Check if this lock already was sent to client and
                          * this is resent case. For resent case do not take lock
@@ -1618,7 +1625,17 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                        else
                                result = -EREMOTE;
                         GOTO(out_child, result);
-                }
+               } else {
+                       if (mdt_object_exists(child)) {
+                               /* We have to get attr & LOV EA & HSM for this
+                                * object */
+                               ma->ma_need |= MA_HSM;
+                               result = mdt_attr_get_complex(info, child, ma);
+                       } else {
+                               /*object non-exist!!!*/
+                               LBUG();
+                       }
+               }
         }
 
         LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
index 9cd8549..a936f97 100644 (file)
@@ -497,7 +497,9 @@ static int mdt_txn_start_cb(const struct lu_env *env,
        if (rc)
                return rc;
 
-       if (mti->mti_mos != NULL)
+       /* we probably should not set local transno to the remote object
+        * on another storage, What about VBR on remote object? XXX */
+       if (mti->mti_mos != NULL && !mdt_object_remote(mti->mti_mos))
                rc = dt_declare_version_set(env, mdt_obj2dt(mti->mti_mos), th);
 
        return rc;
@@ -545,7 +547,11 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
         LASSERT(req != NULL && req->rq_repmsg != NULL);
 
         /** VBR: set new versions */
-        if (txn->th_result == 0 && mti->mti_mos != NULL) {
+       /* we probably should not set local transno to the remote object
+        * on another storage, What about VBR on remote object? XXX */
+       if (txn->th_result == 0 && mti->mti_mos != NULL &&
+           !mdt_object_remote(mti->mti_mos)) {
+
                 dt_version_set(env, mdt_obj2dt(mti->mti_mos),
                                mti->mti_transno, txn);
                 mti->mti_mos = NULL;
index 8e362f7..5b458ce 100644 (file)
@@ -91,7 +91,8 @@ static void mdt_obj_version_get(struct mdt_thread_info *info,
                                 struct mdt_object *o, __u64 *version)
 {
         LASSERT(o);
-       if (mdt_object_exists(o) > 0 && !mdt_object_obf(o))
+       if (mdt_object_exists(o) && !mdt_object_remote(o) &&
+           !mdt_object_obf(o))
                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
         else
                 *version = ENOENT_VERSION;
@@ -305,7 +306,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
         if (likely(!IS_ERR(child))) {
                 struct md_object *next = mdt_object_child(parent);
 
-               if (mdt_object_exists(child) < 0) {
+               if (mdt_object_remote(child)) {
                        struct seq_server_site *ss;
                        struct lu_ucred *uc  = mdt_ucred(info);
 
@@ -385,8 +386,8 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
         int rc;
         ENTRY;
 
-        /* attr shouldn't be set on remote object */
-        LASSERT(mdt_object_exists(mo) >= 0);
+       /* attr shouldn't be set on remote object */
+       LASSERT(!mdt_object_remote(mo));
 
         lh = &info->mti_lh[MDT_LH_PARENT];
         mdt_lock_reg_init(lh, LCK_PW);
@@ -691,7 +692,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
        parent_lh = &info->mti_lh[MDT_LH_PARENT];
        lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
-       if (mdt_object_exists(mp) < 0) {
+       if (mdt_object_remote(mp)) {
                mdt_lock_reg_init(parent_lh, LCK_EX);
                rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh,
                                            parent_lh->mlh_rreg_mode,
@@ -728,7 +729,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
         child_lh = &info->mti_lh[MDT_LH_CHILD];
         mdt_lock_reg_init(child_lh, LCK_EX);
-       if (mdt_object_exists(mc) < 0) {
+       if (mdt_object_remote(mc)) {
                struct mdt_body  *repbody;
 
                if (!fid_is_zero(rr->rr_fid2)) {
@@ -892,7 +893,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (IS_ERR(ms))
                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
 
-       if (mdt_object_exists(ms) < 0) {
+       if (mdt_object_remote(ms)) {
                mdt_object_put(info->mti_env, ms);
                CERROR("Target directory "DFID" is on another MDT\n",
                        PFID(rr->rr_fid1));
@@ -1136,25 +1137,26 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 if (rc)
                         GOTO(out_put_target, rc);
 
-                rc = mdt_object_exists(mtgtdir);
-                if (rc == 0) {
-                        GOTO(out_put_target, rc = -ESTALE);
-                } else if (rc > 0) {
-                        /* we lock the target dir if it is local */
-                        rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
-                                             MDS_INODELOCK_UPDATE,
-                                             MDT_LOCAL_LOCK);
-                        if (rc != 0)
-                                GOTO(out_put_target, rc);
-                        /* get and save correct version after locking */
-                        mdt_version_get_save(info, mtgtdir, 1);
-               } else if (rc < 0) {
-                       CERROR("Source dir "DFID" target dir "DFID
+               if (unlikely(mdt_object_remote(mtgtdir))) {
+                       CDEBUG(D_INFO, "Source dir "DFID" target dir "DFID
                               "on different MDTs\n", PFID(rr->rr_fid1),
                               PFID(rr->rr_fid2));
                        GOTO(out_put_target, rc = -EXDEV);
+               } else {
+                       if (likely(mdt_object_exists(mtgtdir))) {
+                               /* we lock the target dir if it is local */
+                               rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
+                                                    MDS_INODELOCK_UPDATE,
+                                                    MDT_LOCAL_LOCK);
+                               if (rc != 0)
+                                       GOTO(out_put_target, rc);
+                               /* get and save correct version after locking */
+                               mdt_version_get_save(info, mtgtdir, 1);
+                       } else {
+                               GOTO(out_put_target, rc = -ESTALE);
+                       }
                }
-        }
+       }
 
         /* step 3: find & lock the old object. */
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
@@ -1170,9 +1172,10 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
        mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
        if (IS_ERR(mold))
                GOTO(out_unlock_target, rc = PTR_ERR(mold));
-       if (mdt_object_exists(mold) < 0) {
+       if (mdt_object_remote(mold)) {
                mdt_object_put(info->mti_env, mold);
-               CERROR("Source child "DFID" is on another MDT\n", PFID(old_fid));
+               CDEBUG(D_INFO, "Source child "DFID" is on another MDT\n",
+                      PFID(old_fid));
                GOTO(out_unlock_target, rc = -EXDEV);
        }
 
@@ -1220,9 +1223,9 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                        GOTO(out_unlock_old, rc = -EPERM);
                }
 
-               if (mdt_object_exists(mnew) < 0) {
+               if (mdt_object_remote(mnew)) {
                        mdt_object_put(info->mti_env, mnew);
-                       CERROR("Source child "DFID" is on another MDT\n",
+                       CDEBUG(D_INFO, "src child "DFID" is on another MDT\n",
                               PFID(new_fid));
                        GOTO(out_unlock_old, rc = -EXDEV);
                }
index f5b54d9..70aa4c4 100644 (file)
@@ -1152,6 +1152,7 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
        lgi->lgi_off = idx *  sizeof(*idarray);
 
        lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
+
        o = dt_locate(env, d, &lgi->lgi_fid);
        if (IS_ERR(o))
                RETURN(PTR_ERR(o));
@@ -1246,6 +1247,7 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
        lgi->lgi_off = idx * sizeof(*idarray);
 
        lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
+
        o = dt_locate(env, d, &lgi->lgi_fid);
        if (IS_ERR(o))
                RETURN(PTR_ERR(o));
index 09045b7..e2d6025 100644 (file)
@@ -176,33 +176,32 @@ static void osd_mdt_fini(struct osd_device *osd)
        osd->od_ost_map = NULL;
 }
 
-int osd_create_agent_inode(const struct lu_env *env, struct osd_device *osd,
-                          struct osd_object *obj, struct osd_thandle *oh)
+int osd_add_to_agent(const struct lu_env *env, struct osd_device *osd,
+                    struct osd_object *obj, struct osd_thandle *oh)
 {
        struct osd_mdobj_map    *omm = osd->od_mdt_map;
        struct osd_thread_info  *oti = osd_oti_get(env);
-       char                    *name_buf = oti->oti_name;
+       char                    *name = oti->oti_name;
        struct dentry           *agent;
        struct dentry           *parent;
        int                     rc;
 
        parent = omm->omm_agent_dentry;
-       sprintf(name_buf, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+       sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
        agent = osd_child_dentry_by_inode(env, parent->d_inode,
-                                         name_buf, strlen(name_buf));
+                                         name, strlen(name));
        mutex_lock(&parent->d_inode->i_mutex);
        rc = osd_ldiskfs_add_entry(oh->ot_handle, agent, obj->oo_inode, NULL);
+       LASSERTF(parent->d_inode->i_nlink > 1, "%s: agent inode nlink %d",
+                osd_name(osd), parent->d_inode->i_nlink);
        parent->d_inode->i_nlink++;
        mark_inode_dirty(parent->d_inode);
        mutex_unlock(&parent->d_inode->i_mutex);
-       if (rc != 0)
-               CERROR("%s: "DFID" add agent error: rc = %d\n", osd_name(osd),
-                      PFID(lu_object_fid(&obj->oo_dt.do_lu)), rc);
        RETURN(rc);
 }
 
-int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd,
-                          struct osd_object *obj, struct osd_thandle *oh)
+int osd_delete_from_agent(const struct lu_env *env, struct osd_device *osd,
+                         struct osd_object *obj, struct osd_thandle *oh)
 {
        struct osd_mdobj_map       *omm = osd->od_mdt_map;
        struct osd_thread_info     *oti = osd_oti_get(env);
@@ -214,7 +213,7 @@ int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd,
        int                        rc;
 
        parent = omm->omm_agent_dentry;
-       sprintf(name, DFID, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+       sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
        agent = osd_child_dentry_by_inode(env, parent->d_inode,
                                          name, strlen(name));
        mutex_lock(&parent->d_inode->i_mutex);
@@ -224,6 +223,8 @@ int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd,
                RETURN(-ENOENT);
        }
        rc = ldiskfs_delete_entry(oh->ot_handle, parent->d_inode, de, bh);
+       LASSERTF(parent->d_inode->i_nlink > 1, "%s: agent inode nlink %d",
+                osd_name(osd), parent->d_inode->i_nlink);
        parent->d_inode->i_nlink--;
        mark_inode_dirty(parent->d_inode);
        mutex_unlock(&parent->d_inode->i_mutex);
@@ -752,10 +753,10 @@ int osd_obj_spec_lookup(struct osd_thread_info *info, struct osd_device *osd,
                        const struct lu_fid *fid, struct osd_inode_id *id)
 {
        struct dentry   *root;
-       struct dentry *dentry;
-       struct inode  *inode;
-       char          *name;
-       int            rc = -ENOENT;
+       struct dentry   *dentry;
+       struct inode    *inode;
+       char            *name;
+       int             rc = -ENOENT;
        ENTRY;
 
        if (fid_is_last_id(fid)) {
index 7d4d486..a36f86a 100644 (file)
@@ -1387,7 +1387,7 @@ static int osd_attr_get(const struct lu_env *env,
 {
         struct osd_object *obj = osd_dt_obj(dt);
 
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LINVRNT(osd_invariant(obj));
 
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
@@ -1620,7 +1620,7 @@ static int osd_attr_set(const struct lu_env *env,
         int rc;
 
         LASSERT(handle != NULL);
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(osd_invariant(obj));
 
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
@@ -2004,7 +2004,9 @@ int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
        return rc;
 }
 
-
+/*
+ * Concurrency: no external locking is necessary.
+ */
 static int osd_declare_object_create(const struct lu_env *env,
                                     struct dt_object *dt,
                                     struct lu_attr *attr,
@@ -2077,7 +2079,7 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         ENTRY;
 
         LINVRNT(osd_invariant(obj));
-        LASSERT(!dt_object_exists(dt));
+       LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
@@ -2093,7 +2095,9 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         if (result == 0)
                 result = __osd_oi_insert(env, obj, fid, th);
 
-        LASSERT(ergo(result == 0, dt_object_exists(dt)));
+       LASSERT(ergo(result == 0,
+                    dt_object_exists(dt) && !dt_object_remote(dt)));
+
         LASSERT(osd_invariant(obj));
         RETURN(result);
 }
@@ -2163,7 +2167,7 @@ static int osd_object_destroy(const struct lu_env *env,
                /* it will check/delete the agent inode for every dir
                 * destory, how to optimize it? unlink performance
                 * impaction XXX */
-               result = osd_delete_agent_inode(env, osd, obj, oh);
+               result = osd_delete_from_agent(env, osd, obj, oh);
                if (result != 0 && result != -ENOENT) {
                        CERROR("%s: delete agent inode "DFID": rc = %d\n",
                               osd_name(osd), PFID(fid), result);
@@ -2382,7 +2386,7 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
         ENTRY;
 
         LASSERT(osd_invariant(obj));
-        LASSERT(!dt_object_exists(dt));
+       LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
@@ -2403,7 +2407,8 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
         if (result == 0)
                 result = __osd_oi_insert(env, obj, fid, th);
 
-        LASSERT(ergo(result == 0, dt_object_exists(dt)));
+       LASSERT(ergo(result == 0,
+                    dt_object_exists(dt) && !dt_object_remote(dt)));
         LINVRNT(osd_invariant(obj));
         RETURN(result);
 }
@@ -2436,7 +2441,7 @@ static int osd_object_ref_add(const struct lu_env *env,
         struct inode      *inode = obj->oo_inode;
 
         LINVRNT(osd_invariant(obj));
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
@@ -2477,7 +2482,7 @@ static int osd_declare_object_ref_del(const struct lu_env *env,
 {
         struct osd_thandle *oh;
 
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(handle != NULL);
 
         oh = container_of0(handle, struct osd_thandle, ot_super);
@@ -2499,7 +2504,7 @@ static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt,
         struct inode      *inode = obj->oo_inode;
 
         LINVRNT(osd_invariant(obj));
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
@@ -2555,7 +2560,7 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
                 return sizeof(dt_obj_version_t);
         }
 
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
 
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
@@ -2651,7 +2656,7 @@ static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
         struct osd_thread_info *info   = osd_oti_get(env);
         struct dentry          *dentry = &info->oti_obj_dentry;
 
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
 
@@ -2668,7 +2673,7 @@ static int osd_declare_xattr_del(const struct lu_env *env,
 {
         struct osd_thandle *oh;
 
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(handle != NULL);
 
         oh = container_of0(handle, struct osd_thandle, ot_super);
@@ -2693,7 +2698,7 @@ static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
         struct dentry          *dentry = &info->oti_obj_dentry;
         int                     rc;
 
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
         LASSERT(handle != NULL);
 
@@ -2727,7 +2732,7 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env,
         if (!dev->od_fl_capa)
                 RETURN(ERR_PTR(-ENOENT));
 
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LINVRNT(osd_invariant(obj));
 
         /* renewal sanity check */
@@ -2892,14 +2897,13 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
        struct osd_object       *obj = osd_dt_obj(dt);
 
         LINVRNT(osd_invariant(obj));
-        LASSERT(dt_object_exists(dt));
 
         if (osd_object_is_root(obj)) {
                 dt->do_index_ops = &osd_index_ea_ops;
                 result = 0;
        } else if (feat == &dt_directory_features) {
                 dt->do_index_ops = &osd_index_ea_ops;
-                if (S_ISDIR(obj->oo_inode->i_mode))
+               if (obj->oo_inode != NULL && S_ISDIR(obj->oo_inode->i_mode))
                         result = 0;
                 else
                         result = -ENOTDIR;
@@ -3079,7 +3083,7 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
         ENTRY;
 
         LINVRNT(osd_invariant(obj));
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(bag->ic_object == obj->oo_inode);
         LASSERT(handle != NULL);
 
@@ -3118,7 +3122,7 @@ static int osd_index_declare_ea_delete(const struct lu_env *env,
        int                 rc;
        ENTRY;
 
-       LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
        LASSERT(handle != NULL);
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
@@ -3197,7 +3201,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         ENTRY;
 
         LINVRNT(osd_invariant(obj));
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(handle != NULL);
 
        osd_trans_exec_op(env, handle, OSD_OT_DELETE);
@@ -3237,6 +3241,12 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0)
                GOTO(out, rc);
 
+       /* For inode on the remote MDT, .. will point to
+        * /Agent directory. So do not try to lookup/delete
+        * remote inode for .. */
+       if (strcmp((char *)key, dotdot) == 0)
+               GOTO(out, rc = 0);
+
        LASSERT(de != NULL);
        rc = osd_get_fid_from_dentry(de, (struct dt_rec *)fid);
        if (rc == 0 && osd_remote_fid(env, osd, fid)) {
@@ -3282,7 +3292,7 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
         ENTRY;
 
         LASSERT(osd_invariant(obj));
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(bag->ic_object == obj->oo_inode);
 
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
@@ -3334,7 +3344,6 @@ static int osd_index_declare_iam_insert(const struct lu_env *env,
 {
         struct osd_thandle *oh;
 
-        LASSERT(dt_object_exists(dt));
         LASSERT(handle != NULL);
 
         oh = container_of0(handle, struct osd_thandle, ot_super);
@@ -3373,7 +3382,7 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         ENTRY;
 
         LINVRNT(osd_invariant(obj));
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(bag->ic_object == obj->oo_inode);
         LASSERT(th != NULL);
 
@@ -3753,7 +3762,7 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
        int                     rc;
        ENTRY;
 
-       LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
        LASSERT(handle != NULL);
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
@@ -3827,7 +3836,7 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
        ENTRY;
 
         LASSERT(osd_invariant(obj));
-        LASSERT(dt_object_exists(dt));
+       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(th != NULL);
 
        osd_trans_exec_op(env, th, OSD_OT_INSERT);
@@ -3853,7 +3862,7 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
                        /* If parent on remote MDT, we need put this object
                         * under AGENT */
                        oh = container_of(th, typeof(*oh), ot_super);
-                       rc = osd_create_agent_inode(env, osd, obj, oh);
+                       rc = osd_add_to_agent(env, osd, obj, oh);
                        if (rc != 0) {
                                CERROR("%s: add agent "DFID" error: rc = %d\n",
                                       osd_name(osd),
index 1dec8c2..d4fe344 100644 (file)
@@ -658,10 +658,10 @@ struct dentry *osd_agent_lookup(struct osd_mdobj_map *omm, int index);
 struct dentry *osd_agent_load(const struct osd_device *osd, int mdt_index,
                              int create);
 
-int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd,
-                          struct osd_object *obj, struct osd_thandle *oh);
-int osd_create_agent_inode(const struct lu_env *env, struct osd_device *osd,
-                          struct osd_object *obj, struct osd_thandle *oh);
+int osd_delete_from_agent(const struct lu_env *env, struct osd_device *osd,
+                         struct osd_object *obj, struct osd_thandle *oh);
+int osd_add_to_agent(const struct lu_env *env, struct osd_device *osd,
+                    struct osd_object *obj, struct osd_thandle *oh);
 
 /* osd_quota_fmt.c */
 int walk_tree_dqentry(const struct lu_env *env, struct osd_object *obj,
index a35b392..fce9c94 100644 (file)
@@ -598,6 +598,7 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m,
                        RETURN(-EINVAL);
                }
                m->opd_index = idx;
+               m->opd_group = 0;
                idx = tgt - src;
        } else {
                /* New OSC name fsname-OSTXXXX-osc-MDTXXXX */
@@ -608,8 +609,18 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m,
                        RETURN(-EINVAL);
                }
 
+               idx = simple_strtol(tgt + 4, &mdt, 16);
+               if (*mdt != '\0' || idx > INT_MAX || idx < 0) {
+                       CERROR("%s: invalid OST index in '%s'\n",
+                              m->opd_obd->obd_name, src);
+                       RETURN(-EINVAL);
+               }
+
+               /* Get MDT index from the name and set it to opd_group,
+                * which will be used by OSP to connect with OST */
+               m->opd_group = idx;
                if (tgt - src <= 12) {
-                       CERROR("%s: invalid target name %s\n",
+                       CERROR("%s: invalid mdt index retrieve from %s\n",
                               m->opd_obd->obd_name, lustre_cfg_string(cfg, 0));
                        RETURN(-EINVAL);
                }
index 75821a5..ed3057f 100644 (file)
@@ -68,6 +68,10 @@ struct osp_device {
        struct dt_device                 opd_dt_dev;
        /* corresponded OST index */
        int                              opd_index;
+
+       /* corrsponded MDT index, which will be used when connecting to OST
+        * for validating the connection (see ofd_parse_connect_data) */
+       int                              opd_group;
        /* device used to store persistent state (llogs, last ids) */
        struct obd_export               *opd_storage_exp;
        struct dt_device                *opd_storage;