Whamcloud - gitweb
LU-1445 osd: add fld look up in osd.
authorwangdi <di.wang@whamcloud.com>
Thu, 26 Sep 2013 11:48:51 +0000 (04:48 -0700)
committerOleg Drokin <green@whamcloud.com>
Mon, 28 Jan 2013 06:09:20 +0000 (01:09 -0500)
Lookup FLDB during oi_insert, so we know it is MDT or
OST objects. And it also needs to init FLD service
on OFD as well, so object on OST can lookup FLDB
as well.

Change fci_lock to read/write lock, since fld_cache_lookup
will become the hot path after FLD lookup is added in OSD
layer.

Change-Id: I79ef723de603214b7136a51c36f0cb63513b5640
Signed-off-by: Wang Di <di.wang@intel.com>
Reviewed-on: http://review.whamcloud.com/4330
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
lustre/fld/fld_cache.c
lustre/fld/fld_handler.c
lustre/fld/fld_index.c
lustre/fld/fld_internal.h
lustre/include/lustre_fld.h
lustre/mdt/mdt_handler.c
lustre/ofd/ofd_fs.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osd-ldiskfs/osd_oi.c
lustre/osd-ldiskfs/osd_oi.h

index 4d8b3a4..e6ff0a7 100644 (file)
@@ -85,7 +85,7 @@ struct fld_cache *fld_cache_init(const char *name,
         CFS_INIT_LIST_HEAD(&cache->fci_lru);
 
         cache->fci_cache_count = 0;
-       spin_lock_init(&cache->fci_lock);
+       rwlock_init(&cache->fci_lock);
 
         strncpy(cache->fci_name, name,
                 sizeof(cache->fci_name));
@@ -263,10 +263,10 @@ void fld_cache_flush(struct fld_cache *cache)
 {
        ENTRY;
 
-       spin_lock(&cache->fci_lock);
+       write_lock(&cache->fci_lock);
        cache->fci_cache_size = 0;
        fld_cache_shrink(cache);
-       spin_unlock(&cache->fci_lock);
+       write_unlock(&cache->fci_lock);
 
        EXIT;
 }
@@ -412,8 +412,6 @@ int fld_cache_insert_nolock(struct fld_cache *cache,
        __u32 new_flags  = f_new->fce_range.lsr_flags;
        ENTRY;
 
-       LASSERT_SPIN_LOCKED(&cache->fci_lock);
-
        /*
         * Duplicate entries are eliminated in insert op.
         * So we don't need to search new entry before starting
@@ -461,9 +459,9 @@ int fld_cache_insert(struct fld_cache *cache,
        if (IS_ERR(flde))
                RETURN(PTR_ERR(flde));
 
-       spin_lock(&cache->fci_lock);
+       write_lock(&cache->fci_lock);
        rc = fld_cache_insert_nolock(cache, flde);
-       spin_unlock(&cache->fci_lock);
+       write_unlock(&cache->fci_lock);
        if (rc)
                OBD_FREE_PTR(flde);
 
@@ -477,7 +475,6 @@ void fld_cache_delete_nolock(struct fld_cache *cache,
        struct fld_cache_entry *tmp;
        cfs_list_t *head;
 
-       LASSERT_SPIN_LOCKED(&cache->fci_lock);
        head = &cache->fci_entries_head;
        cfs_list_for_each_entry_safe(flde, tmp, head, fce_list) {
                /* add list if next is end of list */
@@ -497,9 +494,9 @@ void fld_cache_delete_nolock(struct fld_cache *cache,
 void fld_cache_delete(struct fld_cache *cache,
                      const struct lu_seq_range *range)
 {
-       spin_lock(&cache->fci_lock);
+       write_lock(&cache->fci_lock);
        fld_cache_delete_nolock(cache, range);
-       spin_unlock(&cache->fci_lock);
+       write_unlock(&cache->fci_lock);
 }
 
 struct fld_cache_entry
@@ -510,7 +507,6 @@ struct fld_cache_entry
        struct fld_cache_entry *got = NULL;
        cfs_list_t *head;
 
-       LASSERT_SPIN_LOCKED(&cache->fci_lock);
        head = &cache->fci_entries_head;
        cfs_list_for_each_entry(flde, head, fce_list) {
                if (range->lsr_start == flde->fce_range.lsr_start ||
@@ -533,9 +529,9 @@ struct fld_cache_entry
        struct fld_cache_entry *got = NULL;
        ENTRY;
 
-       spin_lock(&cache->fci_lock);
+       read_lock(&cache->fci_lock);
        got = fld_cache_entry_lookup_nolock(cache, range);
-       spin_unlock(&cache->fci_lock);
+       read_unlock(&cache->fci_lock);
        RETURN(got);
 }
 
@@ -550,7 +546,7 @@ int fld_cache_lookup(struct fld_cache *cache,
        cfs_list_t *head;
        ENTRY;
 
-       spin_lock(&cache->fci_lock);
+       read_lock(&cache->fci_lock);
        head = &cache->fci_entries_head;
 
        cache->fci_stat.fst_count++;
@@ -565,14 +561,12 @@ int fld_cache_lookup(struct fld_cache *cache,
                 if (range_within(&flde->fce_range, seq)) {
                         *range = flde->fce_range;
 
-                        /* update position of this entry in lru list. */
-                        cfs_list_move(&flde->fce_lru, &cache->fci_lru);
                         cache->fci_stat.fst_cache++;
-                       spin_unlock(&cache->fci_lock);
+                       read_unlock(&cache->fci_lock);
                        RETURN(0);
                }
        }
-       spin_unlock(&cache->fci_lock);
+       read_unlock(&cache->fci_lock);
        RETURN(-ENOENT);
 }
 
index 5e3d068..0ee6852 100644 (file)
@@ -151,7 +151,8 @@ int fld_server_lookup(const struct lu_env *env, struct lu_server_fld *fld,
         /* Lookup it in the cache. */
         rc = fld_cache_lookup(fld->lsf_cache, seq, erange);
         if (rc == 0) {
-                if (unlikely(erange->lsr_flags != range->lsr_flags)) {
+               if (unlikely(erange->lsr_flags != range->lsr_flags) &&
+                   range->lsr_flags != -1) {
                         CERROR("FLD cache found a range "DRANGE" doesn't "
                                "match the requested flag %x\n",
                                PRANGE(erange), range->lsr_flags);
@@ -173,6 +174,7 @@ int fld_server_lookup(const struct lu_env *env, struct lu_server_fld *fld,
                 * This is temporary solution, long term solution is fld
                 * replication on all mdt servers.
                 */
+               range->lsr_start = seq;
                rc = fld_client_rpc(fld->lsf_control_exp,
                                    range, FLD_LOOKUP);
                if (rc == 0)
@@ -236,12 +238,15 @@ static int fld_req_handle(struct ptlrpc_request *req,
                         RETURN(err_serious(-EPROTO));
                 *out = *in;
 
-                /* For old 2.0 client, the 'lsr_flags' is uninitialized.
-                 * Set it as 'LU_SEQ_RANGE_MDT' by default.
-                 * Old 2.0 liblustre client cannot talk with new 2.1 server. */
-                if (!(exp->exp_connect_flags & OBD_CONNECT_64BITHASH) &&
-                    !exp->exp_libclient)
-                        out->lsr_flags = LU_SEQ_RANGE_MDT;
+               /* For old 2.0 client, the 'lsr_flags' is uninitialized.
+                * Set it as 'LU_SEQ_RANGE_MDT' by default.
+                * Old 2.0 liblustre client cannot talk with new 2.1 server. */
+               if (!(exp->exp_connect_flags & OBD_CONNECT_64BITHASH) &&
+                   !((exp->exp_connect_flags & OBD_CONNECT_MDS) &&
+                     (exp->exp_connect_flags & OBD_CONNECT_FID)) &&
+                   !(exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) &&
+                   !exp->exp_libclient)
+                       out->lsr_flags = LU_SEQ_RANGE_MDT;
 
                rc = fld_server_handle(lu_site2seq(site)->ss_server_fld,
                                       req->rq_svc_thread->t_env,
@@ -377,7 +382,8 @@ static void fld_server_proc_fini(struct lu_server_fld *fld)
 #endif
 
 int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
-                   struct dt_device *dt, const char *prefix, int mds_node_id)
+                   struct dt_device *dt, const char *prefix, int mds_node_id,
+                   __u32 lsr_flags)
 {
         int cache_size, cache_threshold;
         struct lu_seq_range range;
@@ -402,7 +408,7 @@ int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
                 GOTO(out, rc);
         }
 
-        if (!mds_node_id) {
+       if (!mds_node_id && lsr_flags == LU_SEQ_RANGE_MDT) {
                rc = fld_index_init(env, fld, dt);
                 if (rc)
                         GOTO(out, rc);
@@ -416,12 +422,13 @@ int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
         fld->lsf_control_exp = NULL;
 
         /* Insert reserved sequence number of ".lustre" into fld cache. */
-        range.lsr_start = FID_SEQ_DOT_LUSTRE;
-        range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
-        range.lsr_index = 0;
-        range.lsr_flags = LU_SEQ_RANGE_MDT;
-       fld_cache_insert(fld->lsf_cache, &range);
-
+       if (lsr_flags == LU_SEQ_RANGE_MDT) {
+               range.lsr_start = FID_SEQ_DOT_LUSTRE;
+               range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
+               range.lsr_index = 0;
+               range.lsr_flags = lsr_flags;
+               fld_cache_insert(fld->lsf_cache, &range);
+       }
         EXIT;
 out:
        if (rc)
index 0dd72cf..c0295ab 100644 (file)
@@ -206,11 +206,11 @@ int fld_index_create(const struct lu_env *env, struct lu_server_fld *fld,
        if (IS_ERR(flde))
                GOTO(out, rc = PTR_ERR(flde));
 
-       spin_lock(&fld->lsf_cache->fci_lock);
+       write_lock(&fld->lsf_cache->fci_lock);
        if (deleted)
                fld_cache_delete_nolock(fld->lsf_cache, new_range);
        rc = fld_cache_insert_nolock(fld->lsf_cache, flde);
-       spin_unlock(&fld->lsf_cache->fci_lock);
+       write_unlock(&fld->lsf_cache->fci_lock);
        if (rc)
                OBD_FREE_PTR(flde);
 out:
index 1545a60..db6dad6 100644 (file)
@@ -83,7 +83,7 @@ struct fld_cache {
         * Cache guard, protects fci_hash mostly because others immutable after
         * init is finished.
         */
-       spinlock_t               fci_lock;
+       rwlock_t                 fci_lock;
 
         /**
          * Cache shrink threshold */
index 09bcc8a..853793d 100644 (file)
@@ -148,7 +148,8 @@ int fld_query(struct com_thread_info *info);
 
 /* Server methods */
 int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
-                   struct dt_device *dt, const char *prefix, int mds_node_id);
+                   struct dt_device *dt, const char *prefix, int mds_node_id,
+                   __u32 lsr_flags);
 
 void fld_server_fini(const struct lu_env *env, struct lu_server_fld *fld);
 
index 097cbb0..2e6ff25 100644 (file)
@@ -4036,7 +4036,7 @@ static int mdt_fld_init(const struct lu_env *env,
                RETURN(rc = -ENOMEM);
 
        rc = fld_server_init(env, ss->ss_server_fld, m->mdt_bottom, uuid,
-                            ss->ss_node_id);
+                            ss->ss_node_id, LU_SEQ_RANGE_MDT);
        if (rc) {
                OBD_FREE_PTR(ss->ss_server_fld);
                ss->ss_server_fld = NULL;
@@ -5059,7 +5059,9 @@ static int mdt_connect_internal(struct obd_export *exp,
        }
 
        if (mdt->mdt_som_conf &&
-           !(data->ocd_connect_flags & (OBD_CONNECT_MDS_MDS|OBD_CONNECT_SOM))){
+           !(data->ocd_connect_flags & (OBD_CONNECT_LIGHTWEIGHT |
+                                        OBD_CONNECT_MDS_MDS |
+                                        OBD_CONNECT_SOM))) {
                CWARN("%s: MDS has SOM enabled, but client does not support "
                      "it\n", mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
                return -EBADE;
index d7fd08d..7690ebe 100644 (file)
@@ -197,6 +197,26 @@ static void ofd_deregister_seq_exp(struct ofd_device *ofd)
                lustre_deregister_osp_item(&ss->ss_client_seq->lcs_exp);
                ss->ss_client_seq->lcs_exp = NULL;
        }
+
+       if (ss->ss_server_fld != NULL) {
+               lustre_deregister_osp_item(&ss->ss_server_fld->lsf_control_exp);
+               ss->ss_server_fld->lsf_control_exp = NULL;
+       }
+}
+
+static int ofd_fld_fini(const struct lu_env *env,
+                       struct ofd_device *ofd)
+{
+       struct seq_server_site *ss = &ofd->ofd_seq_site;
+       ENTRY;
+
+       if (ss && ss->ss_server_fld) {
+               fld_server_fini(env, ss->ss_server_fld);
+               OBD_FREE_PTR(ss->ss_server_fld);
+               ss->ss_server_fld = NULL;
+       }
+
+       RETURN(0);
 }
 
 void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
@@ -212,6 +232,10 @@ void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
        if (rc != 0)
                CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc);
 
+       rc = ofd_fld_fini(env, ofd);
+       if (rc != 0)
+               CERROR("%s: fld fini error: rc = %d\n", ofd_name(ofd), rc);
+
        CFS_INIT_LIST_HEAD(&dispose);
        write_lock(&ofd->ofd_seq_list_lock);
        cfs_list_for_each_entry_safe(oseq, tmp, &ofd->ofd_seq_list, os_list) {
@@ -305,6 +329,27 @@ cleanup:
        return ERR_PTR(rc);
 }
 
+static int ofd_fld_init(const struct lu_env *env, const char *uuid,
+                       struct ofd_device *ofd)
+{
+       struct seq_server_site *ss = &ofd->ofd_seq_site;
+       int rc;
+       ENTRY;
+
+       OBD_ALLOC_PTR(ss->ss_server_fld);
+       if (ss->ss_server_fld == NULL)
+               RETURN(rc = -ENOMEM);
+
+       rc = fld_server_init(env, ss->ss_server_fld, ofd->ofd_osd, uuid,
+                            ss->ss_node_id, LU_SEQ_RANGE_OST);
+       if (rc) {
+               OBD_FREE_PTR(ss->ss_server_fld);
+               ss->ss_server_fld = NULL;
+               RETURN(rc);
+       }
+       RETURN(0);
+}
+
 static int ofd_register_seq_exp(struct ofd_device *ofd)
 {
        struct seq_server_site  *ss = &ofd->ofd_seq_site;
@@ -321,6 +366,17 @@ static int ofd_register_seq_exp(struct ofd_device *ofd)
 
        rc = lustre_register_osp_item(osp_name, &ss->ss_client_seq->lcs_exp,
                                      NULL, NULL);
+       if (rc != 0)
+               GOTO(out_free, rc);
+
+       rc = lustre_register_osp_item(osp_name,
+                                     &ss->ss_server_fld->lsf_control_exp,
+                                     NULL, NULL);
+       if (rc != 0) {
+               lustre_deregister_osp_item(&ss->ss_client_seq->lcs_exp);
+               ss->ss_client_seq->lcs_exp = NULL;
+               GOTO(out_free, rc);
+       }
 out_free:
        if (osp_name != NULL)
                OBD_FREE(osp_name, MAX_OBD_NAME);
@@ -339,6 +395,12 @@ int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd)
                return rc;
        }
 
+       rc = ofd_fld_init(env, ofd_name(ofd), ofd);
+       if (rc) {
+               CERROR("%s: Can't init fld, rc %d\n", ofd_name(ofd), rc);
+               return rc;
+       }
+
        rc = ofd_register_seq_exp(ofd);
        if (rc) {
                CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc);
index 2622a35..0dbfaaa 100644 (file)
@@ -1914,6 +1914,44 @@ static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
        return osd_oi_insert(info, osd, fid, id, th);
 }
 
+int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
+                  const struct lu_fid *fid, struct lu_seq_range *range)
+{
+       struct seq_server_site  *ss = osd_seq_site(osd);
+       int                     rc;
+
+       if (fid_is_igif(fid)) {
+               range->lsr_flags = LU_SEQ_RANGE_MDT;
+               range->lsr_index = 0;
+               return 0;
+       }
+
+       if (fid_is_idif(fid)) {
+               range->lsr_flags = LU_SEQ_RANGE_OST;
+               range->lsr_index = fid_idif_ost_idx(fid);
+               return 0;
+       }
+
+       if (!fid_is_norm(fid)) {
+               range->lsr_flags = LU_SEQ_RANGE_MDT;
+               if (ss != NULL)
+                       /* FIXME: If ss is NULL, it suppose not get lsr_index
+                        * at all */
+                       range->lsr_index = ss->ss_node_id;
+               return 0;
+       }
+
+       LASSERT(ss != NULL);
+       range->lsr_flags = -1;
+       rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
+       if (rc != 0) {
+               CERROR("%s can not find "DFID": rc = %d\n",
+                      osd2lu_dev(osd)->ld_obd->obd_name, PFID(fid), rc);
+       }
+       return rc;
+}
+
+
 static int osd_declare_object_create(const struct lu_env *env,
                                     struct dt_object *dt,
                                     struct lu_attr *attr,
@@ -1921,6 +1959,7 @@ static int osd_declare_object_create(const struct lu_env *env,
                                     struct dt_object_format *dof,
                                     struct thandle *handle)
 {
+       struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
        struct osd_thandle      *oh;
        int                      rc;
        ENTRY;
@@ -1933,7 +1972,9 @@ static int osd_declare_object_create(const struct lu_env *env,
        OSD_DECLARE_OP(oh, create, osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
        /* XXX: So far, only normal fid needs be inserted into the oi,
         *      things could be changed later. Revise following code then. */
-       if (fid_is_norm(lu_object_fid(&dt->do_lu))) {
+       if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
+           !fid_is_on_ost(osd_oti_get(env), osd_dt_dev(handle->th_dev),
+                          lu_object_fid(&dt->do_lu))) {
                /* Reuse idle OI block may cause additional one OI block
                 * to be changed. */
                OSD_DECLARE_OP(oh, insert,
@@ -1955,6 +1996,19 @@ static int osd_declare_object_create(const struct lu_env *env,
 
        rc = osd_declare_inode_qid(env, attr->la_uid, attr->la_gid, 1, oh,
                                   false, false, NULL, false);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* It does fld look up inside declare, and the result will be
+        * added to fld cache, so the following fld lookup inside insert
+        * does not need send RPC anymore, so avoid send rpc with holding
+        * transaction */
+       if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
+               !fid_is_last_id(lu_object_fid(&dt->do_lu)))
+               osd_fld_lookup(env, osd_dt_dev(handle->th_dev),
+                              lu_object_fid(&dt->do_lu), range);
+
+
        RETURN(rc);
 }
 
@@ -2209,7 +2263,9 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
 
         result = __osd_object_create(info, obj, attr, hint, dof, th);
         /* objects under osd root shld have igif fid, so dont add fid EA */
-        if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
+       /* For ost object, the fid will be stored during first write */
+       if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL &&
+           !fid_is_on_ost(info, osd_dt_dev(th->th_dev), fid))
                 result = osd_ea_fid_set(env, dt, fid);
 
         if (result == 0)
@@ -2224,7 +2280,7 @@ static int osd_declare_object_ref_add(const struct lu_env *env,
                                       struct dt_object *dt,
                                       struct thandle *handle)
 {
-        struct osd_thandle *oh;
+       struct osd_thandle       *oh;
 
         /* it's possible that object doesn't exist yet */
         LASSERT(handle != NULL);
@@ -3516,9 +3572,10 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
                                       const struct dt_key *key,
                                       struct thandle *handle)
 {
-       struct osd_thandle *oh;
-       struct inode       *inode;
-       int                 rc;
+       struct osd_thandle      *oh;
+       struct inode            *inode;
+       struct lu_fid           *fid = (struct lu_fid *)rec;
+       int                     rc;
        ENTRY;
 
        LASSERT(dt_object_exists(dt));
@@ -3537,6 +3594,17 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
         * insert */
        rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
                                   true, true, NULL, false);
+       if (fid == NULL)
+               RETURN(0);
+
+       /* It does fld look up inside declare, and the result will be
+       * added to fld cache, so the following fld lookup inside insert
+       * does not need send RPC anymore, so avoid send rpc with holding
+       * transaction */
+       LASSERTF(fid_is_sane(fid), "fid is insane"DFID"\n", PFID(fid));
+       osd_fld_lookup(env, osd_dt_dev(handle->th_dev), fid,
+                       &osd_oti_get(env)->oti_seq_range);
+
        RETURN(rc);
 }
 
index 48f1bc5..6722a2f 100644 (file)
@@ -597,6 +597,7 @@ struct osd_thread_info {
        struct lquota_trans     oti_quota_trans;
        union lquota_rec        oti_quota_rec;
        __u64                   oti_quota_id;
+       struct lu_seq_range     oti_seq_range;
 };
 
 extern int ldiskfs_pdo;
@@ -644,6 +645,8 @@ int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
                   struct osd_inode_id *id);
 int osd_scrub_dump(struct osd_device *dev, char *buf, int len);
 
+int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
+                  const struct lu_fid *fid, struct lu_seq_range *range);
 /* osd_quota_fmt.c */
 int walk_tree_dqentry(const struct lu_env *env, struct osd_object *obj,
                       int type, uint blk, int depth, uint index,
index ef67a79..b625d82 100644 (file)
@@ -479,6 +479,32 @@ static int osd_oi_iam_lookup(struct osd_thread_info *oti,
         RETURN(rc);
 }
 
+int fid_is_on_ost(struct osd_thread_info *info, struct osd_device *osd,
+                 const struct lu_fid *fid)
+{
+       struct lu_seq_range *range = &info->oti_seq_range;
+       int rc;
+       ENTRY;
+
+       if (fid_is_idif(fid) || fid_is_last_id(fid))
+               RETURN(1);
+
+       rc = osd_fld_lookup(info->oti_env, osd, fid, range);
+       if (rc != 0) {
+               CERROR("%s: Can not lookup fld for "DFID"\n",
+                      osd2lu_dev(osd)->ld_obd->obd_name, PFID(fid));
+               RETURN(rc);
+       }
+
+       CDEBUG(D_INFO, "fid "DFID" range "DRANGE"\n", PFID(fid),
+              PRANGE(range));
+
+       if (range->lsr_flags == LU_SEQ_RANGE_OST)
+               RETURN(1);
+
+       RETURN(0);
+}
+
 int __osd_oi_lookup(struct osd_thread_info *info, struct osd_device *osd,
                    const struct lu_fid *fid, struct osd_inode_id *id)
 {
@@ -502,7 +528,7 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_device *osd,
 {
        int                  rc = 0;
 
-       if ((fid_is_idif(fid) && !fid_is_last_id(fid)) ||
+       if ((!fid_is_last_id(fid) && fid_is_on_ost(info, osd, fid)) ||
             fid_is_llog(fid)) {
                /* old OSD obj id */
                /* FIXME: actually for all of the OST object */
@@ -569,7 +595,7 @@ int osd_oi_insert(struct osd_thread_info *info, struct osd_device *osd,
        if (fid_is_igif(fid) || unlikely(fid_seq(fid) == FID_SEQ_DOT_LUSTRE))
                return 0;
 
-       if ((fid_is_idif(fid) && !fid_is_last_id(fid)) ||
+       if ((fid_is_on_ost(info, osd, fid) && !fid_is_last_id(fid)) ||
             fid_is_llog(fid))
                return osd_obj_map_insert(info, osd, fid, id, th);
 
@@ -624,7 +650,7 @@ int osd_oi_delete(struct osd_thread_info *info,
 
        LASSERT(fid_seq(fid) != FID_SEQ_LOCAL_FILE);
 
-       if (fid_is_idif(fid) || fid_is_llog(fid))
+       if (fid_is_on_ost(info, osd, fid) || fid_is_llog(fid))
                return osd_obj_map_delete(info, osd, fid, th);
 
        fid_cpu_to_be(oi_fid, fid);
index 0cb61c8..bb97b49 100644 (file)
@@ -139,5 +139,7 @@ int  osd_oi_delete(struct osd_thread_info *info,
                   struct osd_device *osd, const struct lu_fid *fid,
                   struct thandle *th);
 
+int fid_is_on_ost(struct osd_thread_info *info, struct osd_device *osd,
+                 const struct lu_fid *fid);
 #endif /* __KERNEL__ */
 #endif /* _OSD_OI_H */