Whamcloud - gitweb
LU-13437 mdt: don't fetch LOOKUP lock for remote object
[fs/lustre-release.git] / lustre / mdt / mdt_lib.c
index ff79c25..96d4078 100644 (file)
@@ -1832,3 +1832,134 @@ int mdt_pack_secctx_in_reply(struct mdt_thread_info *info,
        }
        return rc;
 }
+
+/* check whether two FIDs belong to different MDT. */
+static int mdt_fids_different_target(struct mdt_thread_info *info,
+                                    const struct lu_fid *fid1,
+                                    const struct lu_fid *fid2)
+{
+       const struct lu_env *env = info->mti_env;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct lu_seq_range *range = &info->mti_range;
+       struct seq_server_site *ss;
+       __u32 index1, index2;
+       int rc;
+
+       if (fid_seq(fid1) == fid_seq(fid2))
+               return 0;
+
+       ss = mdt->mdt_lu_dev.ld_site->ld_seq_site;
+
+       range->lsr_flags = LU_SEQ_RANGE_MDT;
+       rc = fld_server_lookup(env, ss->ss_server_fld, fid1->f_seq, range);
+       if (rc)
+               return rc;
+
+       index1 = range->lsr_index;
+
+       rc = fld_server_lookup(env, ss->ss_server_fld, fid2->f_seq, range);
+       if (rc)
+               return rc;
+
+       index2 = range->lsr_index;
+
+       return index1 != index2;
+}
+
+static bool mdt_object_is_shard(struct mdt_thread_info *info,
+                               struct mdt_object *obj)
+{
+       struct lmv_mds_md_v1 *lmv = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
+       struct lu_buf buf;
+       int rc;
+
+       if (!mdt_object_exists(obj))
+               return false;
+
+       if (!S_ISDIR(lu_object_attr(&obj->mot_obj)))
+               return false;
+
+       buf.lb_buf = lmv;
+       buf.lb_len = sizeof(*lmv);
+       rc = mo_xattr_get(info->mti_env, mdt_object_child(obj), &buf,
+                         XATTR_NAME_LMV);
+       if (rc < 0)
+               return false;
+
+       return lmv->lmv_magic == cpu_to_le32(LMV_MAGIC_STRIPE);
+}
+
+/**
+ * Check whether \a child is remote object on \a parent.
+ *
+ * \param[in]  info    thread environment
+ * \param[in]  parent  parent object, it's the same as child object in
+ *                     getattr_by_fid
+ * \param[in]  child   child object
+ *
+ * \retval 1   is remote object.
+ * \retval 0   isn't remote object.
+ * \retval < 1  error code
+ */
+int mdt_is_remote_object(struct mdt_thread_info *info,
+                        struct mdt_object *parent,
+                        struct mdt_object *child)
+{
+       struct lu_buf *buf = &info->mti_big_buf;
+       struct linkea_data ldata = { NULL };
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       struct lu_name name;
+       struct lu_fid pfid;
+       int reclen;
+       int i;
+       int rc;
+
+       ENTRY;
+
+       if (fid_is_root(mdt_object_fid(child)))
+               RETURN(0);
+
+       if (likely(parent != child)) {
+               if (mdt_object_remote(parent) ^ mdt_object_remote(child)) {
+                       /* don't treat shard as remote object, otherwise client
+                        * need to revalidate shards all the time.
+                        */
+                       if (mdt_object_is_shard(info, child))
+                               RETURN(0);
+                       RETURN(1);
+               }
+
+               if (!mdt_object_remote(parent) && !mdt_object_remote(child))
+                       RETURN(0);
+
+               rc = mdt_fids_different_target(info, mdt_object_fid(parent),
+                                              mdt_object_fid(child));
+               RETURN(rc);
+       }
+
+       /* client < 2.13.52 getattr_by_fid parent and child are the same */
+       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+       if (!buf->lb_buf)
+               RETURN(-ENOMEM);
+
+       ldata.ld_buf = buf;
+       rc = mdt_links_read(info, child, &ldata);
+       /* can't read linkea, just assume it's remote object */
+       if (rc == -ENOENT || rc == -ENODATA)
+               RETURN(1);
+       if (rc)
+               RETURN(rc);
+
+       leh = buf->lb_buf;
+       lee = (struct link_ea_entry *)(leh + 1);
+       for (i = 0; i < leh->leh_reccount; i++) {
+               linkea_entry_unpack(lee, &reclen, &name, &pfid);
+               lee = (struct link_ea_entry *) ((char *)lee + reclen);
+               if (mdt_fids_different_target(info, &pfid,
+                                             mdt_object_fid(child)))
+                       RETURN(1);
+       }
+
+       RETURN(0);
+}