Whamcloud - gitweb
LU-2240 fld: Move range lookup to fld.
authorwangdi <di.wang@whamcloud.com>
Wed, 25 Sep 2013 23:57:01 +0000 (16:57 -0700)
committerOleg Drokin <green@whamcloud.com>
Fri, 28 Dec 2012 06:49:44 +0000 (01:49 -0500)
Because ZFS does not support range lookup in a ZAP, we will move
range lookup  to FLD, and do an in-memory lookup there.  The FLD
entries are still stored in an index on disk, but this is just a
container and is not used for searching anymore.

It is expected the FLD will always be able to fit in RAM.

Signed-off-by: Wang Di <di.wang@whamcloud.com>
Change-Id: If8cadcc30743e6d63ba779a430bc585af29d51fe
Reviewed-on: http://review.whamcloud.com/4396
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/fid/fid_store.c
lustre/fld/fld_cache.c
lustre/fld/fld_handler.c
lustre/fld/fld_index.c
lustre/fld/fld_internal.h
lustre/fld/fld_request.c
lustre/include/lustre_fld.h
lustre/lmv/lmv_fld.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_recovery.c

index 77d4aed..13a0933 100644 (file)
@@ -132,47 +132,48 @@ int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
 
        rc = dt_declare_record_write(env, seq->lss_obj,
                                     sizeof(struct lu_seq_range), 0, th);
-        if (rc)
-                GOTO(exit, rc);
-
-        if (out != NULL) {
-                rc = fld_declare_server_create(seq->lss_site->ms_server_fld,
-                                               env, th);
-                if (rc)
-                        GOTO(exit, rc);
-        }
+       if (rc)
+               GOTO(exit, rc);
+
+       if (out != NULL) {
+               rc = fld_declare_server_create(env,
+                                              seq->lss_site->ms_server_fld,
+                                              out, th);
+               if (rc)
+                       GOTO(exit, rc);
+       }
 
        rc = dt_trans_start_local(env, dt_dev, th);
-        if (rc)
-                GOTO(exit, rc);
+       if (rc)
+               GOTO(exit, rc);
 
        /* Store ranges in le format. */
        range_cpu_to_le(&info->sti_space, &seq->lss_space);
 
        rc = dt_record_write(env, seq->lss_obj, seq_store_buf(info), &pos, th);
-        if (rc) {
-                CERROR("%s: Can't write space data, rc %d\n",
-                       seq->lss_name, rc);
+       if (rc) {
+               CERROR("%s: Can't write space data, rc %d\n",
+                      seq->lss_name, rc);
                GOTO(exit, rc);
-        } else if (out != NULL) {
-                rc = fld_server_create(seq->lss_site->ms_server_fld,
-                                       env, out, th);
-                if (rc) {
-                        CERROR("%s: Can't Update fld database, rc %d\n",
-                               seq->lss_name, rc);
+       } else if (out != NULL) {
+               rc = fld_server_create(env, seq->lss_site->ms_server_fld, out,
+                                      th);
+               if (rc) {
+                       CERROR("%s: Can't Update fld database, rc %d\n",
+                               seq->lss_name, rc);
                        GOTO(exit, rc);
-                }
-        }
-        /* next sequence update will need sync until this update is committed
-         * in case of sync operation this is not needed obviously */
-        if (!sync)
-                /* if callback can't be added then sync always */
-                sync = !!seq_update_cb_add(th, seq);
-
-        th->th_sync |= sync;
+               }
+       }
+       /* next sequence update will need sync until this update is committed
+        * in case of sync operation this is not needed obviously */
+       if (!sync)
+               /* if callback can't be added then sync always */
+               sync = !!seq_update_cb_add(th, seq);
+
+       th->th_sync |= sync;
 exit:
-        dt_trans_stop(env, dt_dev, th);
-        return rc;
+       dt_trans_stop(env, dt_dev, th);
+       return rc;
 }
 
 /*
index ccfd381..b9af9a3 100644 (file)
@@ -131,13 +131,13 @@ void fld_cache_fini(struct fld_cache *cache)
 /**
  * delete given node from list.
  */
-static inline void fld_cache_entry_delete(struct fld_cache *cache,
-                                          struct fld_cache_entry *node)
+void fld_cache_entry_delete(struct fld_cache *cache,
+                           struct fld_cache_entry *node)
 {
-        cfs_list_del(&node->fce_list);
-        cfs_list_del(&node->fce_lru);
-        cache->fci_cache_count--;
-        OBD_FREE_PTR(node);
+       cfs_list_del(&node->fce_list);
+       cfs_list_del(&node->fce_lru);
+       cache->fci_cache_count--;
+       OBD_FREE_PTR(node);
 }
 
 /**
@@ -316,9 +316,9 @@ void fld_cache_punch_hole(struct fld_cache *cache,
 /**
  * handle range overlap in fld cache.
  */
-void fld_cache_overlap_handle(struct fld_cache *cache,
-                              struct fld_cache_entry *f_curr,
-                              struct fld_cache_entry *f_new)
+static void fld_cache_overlap_handle(struct fld_cache *cache,
+                               struct fld_cache_entry *f_curr,
+                               struct fld_cache_entry *f_new)
 {
         const struct lu_seq_range *range = &f_new->fce_range;
         const seqno_t new_start  = range->lsr_start;
@@ -377,71 +377,164 @@ void fld_cache_overlap_handle(struct fld_cache *cache,
                        PRANGE(range),PRANGE(&f_curr->fce_range));
 }
 
+struct fld_cache_entry
+*fld_cache_entry_create(const struct lu_seq_range *range)
+{
+       struct fld_cache_entry *f_new;
+
+       LASSERT(range_is_sane(range));
+
+       OBD_ALLOC_PTR(f_new);
+       if (!f_new)
+               RETURN(ERR_PTR(-ENOMEM));
+
+       f_new->fce_range = *range;
+       RETURN(f_new);
+}
+
 /**
  * Insert FLD entry in FLD cache.
  *
  * This function handles all cases of merging and breaking up of
  * ranges.
  */
-void fld_cache_insert(struct fld_cache *cache,
-                      const struct lu_seq_range *range)
+int fld_cache_insert_nolock(struct fld_cache *cache,
+                           struct fld_cache_entry *f_new)
 {
-        struct fld_cache_entry *f_new;
-        struct fld_cache_entry *f_curr;
-        struct fld_cache_entry *n;
-        cfs_list_t *head;
-        cfs_list_t *prev = NULL;
-        const seqno_t new_start  = range->lsr_start;
-        const seqno_t new_end  = range->lsr_end;
-        __u32 new_flags  = range->lsr_flags;
-        ENTRY;
+       struct fld_cache_entry *f_curr;
+       struct fld_cache_entry *n;
+       cfs_list_t *head;
+       cfs_list_t *prev = NULL;
+       const seqno_t new_start  = f_new->fce_range.lsr_start;
+       const seqno_t new_end  = f_new->fce_range.lsr_end;
+       __u32 new_flags  = f_new->fce_range.lsr_flags;
+       ENTRY;
 
-        LASSERT(range_is_sane(range));
+       LASSERT_SPIN_LOCKED(&cache->fci_lock);
 
-        /* Allocate new entry. */
-        OBD_ALLOC_PTR(f_new);
-        if (!f_new) {
-                EXIT;
-                return;
-        }
+       /*
+        * Duplicate entries are eliminated in insert op.
+        * So we don't need to search new entry before starting
+        * insertion loop.
+        */
 
-        f_new->fce_range = *range;
+       if (!cache->fci_no_shrink)
+               fld_cache_shrink(cache);
 
-        /*
-         * Duplicate entries are eliminated in inset op.
-         * So we don't need to search new entry before starting insertion loop.
-         */
+       head = &cache->fci_entries_head;
+
+       cfs_list_for_each_entry_safe(f_curr, n, head, fce_list) {
+               /* add list if next is end of list */
+               if (new_end < f_curr->fce_range.lsr_start ||
+                  (new_end == f_curr->fce_range.lsr_start &&
+                   new_flags != f_curr->fce_range.lsr_flags))
+                       break;
+
+               prev = &f_curr->fce_list;
+               /* check if this range is to left of new range. */
+               if (new_start < f_curr->fce_range.lsr_end &&
+                   new_flags == f_curr->fce_range.lsr_flags) {
+                       fld_cache_overlap_handle(cache, f_curr, f_new);
+                       goto out;
+               }
+       }
+
+       if (prev == NULL)
+               prev = head;
+
+       CDEBUG(D_INFO, "insert range "DRANGE"\n", PRANGE(&f_new->fce_range));
+       /* Add new entry to cache and lru list. */
+       fld_cache_entry_add(cache, f_new, prev);
+out:
+       RETURN(0);
+}
+
+int fld_cache_insert(struct fld_cache *cache,
+                    const struct lu_seq_range *range)
+{
+       struct fld_cache_entry  *flde;
+       int rc;
+
+       flde = fld_cache_entry_create(range);
+       if (IS_ERR(flde))
+               RETURN(PTR_ERR(flde));
 
        spin_lock(&cache->fci_lock);
-        fld_cache_shrink(cache);
+       rc = fld_cache_insert_nolock(cache, flde);
+       spin_unlock(&cache->fci_lock);
+       if (rc)
+               OBD_FREE_PTR(flde);
 
-        head = &cache->fci_entries_head;
+       RETURN(rc);
+}
 
-        cfs_list_for_each_entry_safe(f_curr, n, head, fce_list) {
-                /* add list if next is end of list */
-                if (new_end < f_curr->fce_range.lsr_start ||
-                   (new_end == f_curr->fce_range.lsr_start &&
-                    new_flags != f_curr->fce_range.lsr_flags))
-                        break;
+void fld_cache_delete_nolock(struct fld_cache *cache,
+                     const struct lu_seq_range *range)
+{
+       struct fld_cache_entry *flde;
+       struct fld_cache_entry *tmp;
+       cfs_list_t *head;
 
-                prev = &f_curr->fce_list;
-                /* check if this range is to left of new range. */
-                if (new_start < f_curr->fce_range.lsr_end &&
-                    new_flags == f_curr->fce_range.lsr_flags) {
-                        fld_cache_overlap_handle(cache, f_curr, f_new);
-                        goto out;
-                }
-        }
+       LASSERT_SPIN_LOCKED(&cache->fci_lock);
+       head = &cache->fci_entries_head;
+       cfs_list_for_each_entry_safe(flde, tmp, head, fce_list) {
+               /* add list if next is end of list */
+               if (range->lsr_start == flde->fce_range.lsr_start ||
+                  (range->lsr_end == flde->fce_range.lsr_end &&
+                   range->lsr_flags == flde->fce_range.lsr_flags)) {
+                       fld_cache_entry_delete(cache, flde);
+                       break;
+               }
+       }
+}
 
-        if (prev == NULL)
-                prev = head;
+/**
+ * Delete FLD entry in FLD cache.
+ *
+ */
+void fld_cache_delete(struct fld_cache *cache,
+                     const struct lu_seq_range *range)
+{
+       spin_lock(&cache->fci_lock);
+       fld_cache_delete_nolock(cache, range);
+       spin_unlock(&cache->fci_lock);
+}
 
-        CDEBUG(D_INFO, "insert range "DRANGE"\n", PRANGE(&f_new->fce_range));
-        /* Add new entry to cache and lru list. */
-        fld_cache_entry_add(cache, f_new, prev);
-out:
+struct fld_cache_entry
+*fld_cache_entry_lookup_nolock(struct fld_cache *cache,
+                             struct lu_seq_range *range)
+{
+       struct fld_cache_entry *flde;
+       struct fld_cache_entry *got = NULL;
+       cfs_list_t *head;
+
+       LASSERT_SPIN_LOCKED(&cache->fci_lock);
+       head = &cache->fci_entries_head;
+       cfs_list_for_each_entry(flde, head, fce_list) {
+               if (range->lsr_start == flde->fce_range.lsr_start ||
+                  (range->lsr_end == flde->fce_range.lsr_end &&
+                   range->lsr_flags == flde->fce_range.lsr_flags)) {
+                       got = flde;
+                       break;
+               }
+       }
+
+       RETURN(got);
+}
+
+/**
+ * lookup \a seq sequence for range in fld cache.
+ */
+struct fld_cache_entry
+*fld_cache_entry_lookup(struct fld_cache *cache, struct lu_seq_range *range)
+{
+       struct fld_cache_entry *got = NULL;
+       ENTRY;
+
+       spin_lock(&cache->fci_lock);
+       got = fld_cache_entry_lookup_nolock(cache, range);
        spin_unlock(&cache->fci_lock);
-       EXIT;
+       RETURN(got);
 }
 
 /**
@@ -451,17 +544,22 @@ int fld_cache_lookup(struct fld_cache *cache,
                     const seqno_t seq, struct lu_seq_range *range)
 {
        struct fld_cache_entry *flde;
+       struct fld_cache_entry *prev = NULL;
        cfs_list_t *head;
        ENTRY;
 
        spin_lock(&cache->fci_lock);
-        head = &cache->fci_entries_head;
-
-        cache->fci_stat.fst_count++;
-        cfs_list_for_each_entry(flde, head, fce_list) {
-                if (flde->fce_range.lsr_start > seq)
-                        break;
+       head = &cache->fci_entries_head;
+
+       cache->fci_stat.fst_count++;
+       cfs_list_for_each_entry(flde, head, fce_list) {
+               if (flde->fce_range.lsr_start > seq) {
+                       if (prev != NULL)
+                               memcpy(range, prev, sizeof(*range));
+                       break;
+               }
 
+               prev = flde;
                 if (range_within(&flde->fce_range, seq)) {
                         *range = flde->fce_range;
 
@@ -475,3 +573,4 @@ int fld_cache_lookup(struct fld_cache *cache,
        spin_unlock(&cache->fci_lock);
        RETURN(-ENOENT);
 }
+
index 01bc8d8..4e4404f 100644 (file)
@@ -98,29 +98,14 @@ static void __exit fld_mod_exit(void)
         }
 }
 
-int fld_declare_server_create(struct lu_server_fld *fld,
-                              const struct lu_env *env,
-                              struct thandle *th)
+int fld_declare_server_create(const struct lu_env *env,
+                             struct lu_server_fld *fld,
+                             struct lu_seq_range *range,
+                             struct thandle *th)
 {
-        int rc;
-
-        ENTRY;
-
-       if (fld->lsf_no_range_lookup) {
-               /* Stub for underlying FS which can't lookup ranges */
-               return 0;
-       }
+       int rc;
 
-        /* for ldiskfs OSD it's enough to declare operation with any ops
-         * with DMU we'll probably need to specify exact key/value */
-       rc = dt_declare_delete(env, fld->lsf_obj, NULL, th);
-       if (rc)
-               GOTO(out, rc);
-       rc = dt_declare_delete(env, fld->lsf_obj, NULL, th);
-       if (rc)
-               GOTO(out, rc);
-       rc = dt_declare_insert(env, fld->lsf_obj, NULL, NULL, th);
-out:
+       rc = fld_declare_index_create(env, fld, range, th);
        RETURN(rc);
 }
 EXPORT_SYMBOL(fld_declare_server_create);
@@ -128,136 +113,19 @@ EXPORT_SYMBOL(fld_declare_server_create);
 /**
  * Insert FLD index entry and update FLD cache.
  *
- * First it try to merge given range with existing range then update
- * FLD index and FLD cache accordingly. FLD index consistency is maintained
- * by this function.
  * This function is called from the sequence allocator when a super-sequence
  * is granted to a server.
  */
-
-int fld_server_create(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      struct lu_seq_range *add_range,
-                      struct thandle *th)
+int fld_server_create(const struct lu_env *env, struct lu_server_fld *fld,
+                     struct lu_seq_range *range, struct thandle *th)
 {
-        struct lu_seq_range *erange;
-        struct lu_seq_range *new;
-        struct fld_thread_info *info;
-        int rc = 0;
-        int do_merge=0;
+       int rc;
 
-        ENTRY;
-
-        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
        mutex_lock(&fld->lsf_lock);
-
-        erange = &info->fti_lrange;
-        new = &info->fti_irange;
-        *new = *add_range;
-
-        /* STEP 1: try to merge with previous range */
-        rc = fld_index_lookup(fld, env, new->lsr_start, erange);
-        if (rc == 0) {
-                /* in case of range overlap, the location must be same */
-                if (range_compare_loc(new, erange) != 0) {
-                        CERROR("the start of given range "DRANGE" conflict to"
-                               "an existing range "DRANGE"\n",
-                               PRANGE(new), PRANGE(erange));
-                        GOTO(out, rc = -EIO);
-                }
-
-                if (new->lsr_end < erange->lsr_end)
-                        GOTO(out, rc);
-                do_merge = 1;
-        } else if (rc == -ENOENT) {
-                /* check for merge case: optimizes for single mds lustre.
-                 * As entry does not exist, returned entry must be left side
-                 * entry compared to start of new range (ref dio_lookup()).
-                 * So try to merge from left.
-                 */
-                if (new->lsr_start == erange->lsr_end &&
-                    range_compare_loc(new, erange) == 0)
-                        do_merge = 1;
-        } else {
-                /* no overlap allowed in fld, so failure in lookup is error */
-                GOTO(out, rc);
-        }
-
-        if (do_merge) {
-                /* new range will be merged with the existing one.
-                 * delete this range at first. */
-                rc = fld_index_delete(fld, env, erange, th);
-                if (rc != 0)
-                        GOTO(out, rc);
-
-                new->lsr_start = min(erange->lsr_start, new->lsr_start);
-                new->lsr_end = max(erange->lsr_end, new->lsr_end);
-                do_merge = 0;
-        }
-
-        /* STEP 2: try to merge with next range */
-        rc = fld_index_lookup(fld, env, new->lsr_end, erange);
-        if (rc == 0) {
-                /* found a matched range, meaning we're either
-                 * overlapping or ajacent, must merge with it. */
-                do_merge = 1;
-        } else if (rc == -ENOENT) {
-                /* this range is left of new range end point */
-                LASSERT(erange->lsr_end <= new->lsr_end);
-                /*
-                 * the found left range must be either:
-                 *  1. withing new range.
-                 *  2. left of new range (no overlapping).
-                 * because if they're partly overlapping, the STEP 1 must have
-                 * been removed this range.
-                 */
-                LASSERTF(erange->lsr_start > new->lsr_start ||
-                         erange->lsr_end < new->lsr_start ||
-                         (erange->lsr_end == new->lsr_start &&
-                          range_compare_loc(new, erange) != 0),
-                         "left "DRANGE", new "DRANGE"\n",
-                         PRANGE(erange), PRANGE(new));
-
-                /* if it's within the new range, merge it */
-                if (erange->lsr_start > new->lsr_start)
-                        do_merge = 1;
-        } else {
-               GOTO(out, rc);
-        }
-
-        if (do_merge) {
-                if (range_compare_loc(new, erange) != 0) {
-                        CERROR("the end of given range "DRANGE" overlaps "
-                               "with an existing range "DRANGE"\n",
-                               PRANGE(new), PRANGE(erange));
-                        GOTO(out, rc = -EIO);
-                }
-
-                /* merge with next range */
-                rc = fld_index_delete(fld, env, erange, th);
-                if (rc != 0)
-                        GOTO(out, rc);
-
-                new->lsr_start = min(erange->lsr_start, new->lsr_start);
-                new->lsr_end = max(erange->lsr_end, new->lsr_end);
-        }
-
-        /* now update fld entry. */
-        rc = fld_index_create(fld, env, new, th);
-
-        LASSERT(rc != -EEXIST);
-out:
-        if (rc == 0)
-                fld_cache_insert(fld->lsf_cache, new);
-
+       rc = fld_index_create(env, fld, range, th);
        mutex_unlock(&fld->lsf_lock);
 
-        CDEBUG((rc != 0 ? D_ERROR : D_INFO),
-               "%s: FLD create: given range : "DRANGE
-               "after merge "DRANGE" rc = %d \n", fld->lsf_name,
-                PRANGE(add_range), PRANGE(new), rc);
-
-        RETURN(rc);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(fld_server_create);
 
@@ -269,9 +137,8 @@ EXPORT_SYMBOL(fld_server_create);
  *  cache fld entries, but this cache is not persistent.
  */
 
-int fld_server_lookup(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      seqno_t seq, struct lu_seq_range *range)
+int fld_server_lookup(const struct lu_env *env, struct lu_server_fld *fld,
+                     seqno_t seq, struct lu_seq_range *range)
 {
         struct lu_seq_range *erange;
         struct fld_thread_info *info;
@@ -294,31 +161,24 @@ int fld_server_lookup(struct lu_server_fld *fld,
                 RETURN(0);
         }
 
-        if (fld->lsf_obj) {
-                rc = fld_index_lookup(fld, env, seq, erange);
-                if (rc == 0) {
-                        if (unlikely(erange->lsr_flags != range->lsr_flags)) {
-                                CERROR("FLD found a range "DRANGE" doesn't "
-                                       "match the requested flag %x\n",
-                                       PRANGE(erange), range->lsr_flags);
-                                RETURN(-EIO);
-                        }
-                        *range = *erange;
-                }
-        } else {
-                LASSERT(fld->lsf_control_exp);
-                /* send request to mdt0 i.e. super seq. controller.
-                 * This is temporary solution, long term solution is fld
-                 * replication on all mdt servers.
-                 */
-                rc = fld_client_rpc(fld->lsf_control_exp,
-                                    range, FLD_LOOKUP);
-        }
-
-        if (rc == 0)
-                fld_cache_insert(fld->lsf_cache, range);
-
-        RETURN(rc);
+       if (fld->lsf_obj) {
+               /* On server side, all entries should be in cache.
+                * If we can not find it in cache, just return error */
+               CERROR("%s: Can not found the seq "LPX64"\n",
+                       fld->lsf_name, seq);
+               RETURN(-EIO);
+       } else {
+               LASSERT(fld->lsf_control_exp);
+               /* send request to mdt0 i.e. super seq. controller.
+                * This is temporary solution, long term solution is fld
+                * replication on all mdt servers.
+                */
+               rc = fld_client_rpc(fld->lsf_control_exp,
+                                   range, FLD_LOOKUP);
+               if (rc == 0)
+                       fld_cache_insert(fld->lsf_cache, range);
+       }
+       RETURN(rc);
 }
 EXPORT_SYMBOL(fld_server_lookup);
 
@@ -337,8 +197,7 @@ static int fld_server_handle(struct lu_server_fld *fld,
 
         switch (opc) {
         case FLD_LOOKUP:
-                rc = fld_server_lookup(fld, env,
-                                       range->lsr_start, range);
+               rc = fld_server_lookup(env, fld, range->lsr_start, range);
                 break;
         default:
                 rc = -EINVAL;
@@ -517,9 +376,8 @@ static void fld_server_proc_fini(struct lu_server_fld *fld)
 }
 #endif
 
-int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
-                    const char *prefix, const struct lu_env *env,
-                    int mds_node_id)
+int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
+                   struct dt_device *dt, const char *prefix, int mds_node_id)
 {
         int cache_size, cache_threshold;
         struct lu_seq_range range;
@@ -545,7 +403,7 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
         }
 
         if (!mds_node_id) {
-                rc = fld_index_init(fld, env, dt);
+               rc = fld_index_init(env, fld, dt);
                 if (rc)
                         GOTO(out, rc);
         } else
@@ -562,31 +420,30 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
         range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
         range.lsr_index = 0;
         range.lsr_flags = LU_SEQ_RANGE_MDT;
-        fld_cache_insert(fld->lsf_cache, &range);
+       fld_cache_insert(fld->lsf_cache, &range);
 
         EXIT;
 out:
-        if (rc)
-                fld_server_fini(fld, env);
-        return rc;
+       if (rc)
+               fld_server_fini(env, fld);
+       return rc;
 }
 EXPORT_SYMBOL(fld_server_init);
 
-void fld_server_fini(struct lu_server_fld *fld,
-                     const struct lu_env *env)
+void fld_server_fini(const struct lu_env *env, struct lu_server_fld *fld)
 {
-        ENTRY;
+       ENTRY;
 
-        fld_server_proc_fini(fld);
-        fld_index_fini(fld, env);
+       fld_server_proc_fini(fld);
+       fld_index_fini(env, fld);
 
-        if (fld->lsf_cache != NULL) {
-                if (!IS_ERR(fld->lsf_cache))
-                        fld_cache_fini(fld->lsf_cache);
-                fld->lsf_cache = NULL;
-        }
+       if (fld->lsf_cache != NULL) {
+               if (!IS_ERR(fld->lsf_cache))
+                       fld_cache_fini(fld->lsf_cache);
+               fld->lsf_cache = NULL;
+       }
 
-        EXIT;
+       EXIT;
 }
 EXPORT_SYMBOL(fld_server_fini);
 
index da83bef..f8e3646 100644 (file)
@@ -72,62 +72,69 @@ static const struct lu_seq_range IGIF_FLD_RANGE = {
 };
 
 const struct dt_index_features fld_index_features = {
-       .dif_flags       = DT_IND_UPDATE | DT_IND_RANGE,
-        .dif_keysize_min = sizeof(seqno_t),
-        .dif_keysize_max = sizeof(seqno_t),
-        .dif_recsize_min = sizeof(struct lu_seq_range),
-        .dif_recsize_max = sizeof(struct lu_seq_range),
-        .dif_ptrsize     = 4
+       .dif_flags       = DT_IND_UPDATE,
+       .dif_keysize_min = sizeof(seqno_t),
+       .dif_keysize_max = sizeof(seqno_t),
+       .dif_recsize_min = sizeof(struct lu_seq_range),
+       .dif_recsize_max = sizeof(struct lu_seq_range),
+       .dif_ptrsize     = 4
 };
 
 extern struct lu_context_key fld_thread_key;
 
-static struct dt_key *fld_key(const struct lu_env *env, const seqno_t seq)
+int fld_declare_index_create(const struct lu_env *env,
+                            struct lu_server_fld *fld,
+                            const struct lu_seq_range *new_range,
+                            struct thandle *th)
 {
-        struct fld_thread_info *info;
-        ENTRY;
-
-        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
-        LASSERT(info != NULL);
-
-        info->fti_key = cpu_to_be64(seq);
-        RETURN((void *)&info->fti_key);
-}
-
-static struct dt_rec *fld_rec(const struct lu_env *env,
-                              const struct lu_seq_range *range)
-{
-        struct fld_thread_info *info;
-        struct lu_seq_range *rec;
-        ENTRY;
-
-        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
-        LASSERT(info != NULL);
-        rec = &info->fti_rec;
-
-        range_cpu_to_be(rec, range);
-        RETURN((void *)rec);
-}
-
-int fld_declare_index_create(struct lu_server_fld *fld,
-                             const struct lu_env *env,
-                             const struct lu_seq_range *range,
-                             struct thandle *th)
-{
-        int rc;
-
-        ENTRY;
+       struct lu_seq_range     *tmp;
+       struct lu_seq_range     *range;
+       struct fld_thread_info  *info;
+       int                     rc = 0;
+
+       ENTRY;
+
+       info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+       range = &info->fti_lrange;
+       tmp = &info->fti_irange;
+       memset(range, 0, sizeof(*range));
+
+       rc = fld_index_lookup(env, fld, new_range->lsr_start, range);
+       if (rc == 0) {
+               /* In case of duplicate entry, the location must be same */
+               LASSERT((range_compare_loc(new_range, range) == 0));
+               GOTO(out, rc = -EEXIST);
+       }
 
-       if (fld->lsf_no_range_lookup) {
-               /* Stub for underlying FS which can't lookup ranges */
-               return 0;
+       if (rc != -ENOENT) {
+               CERROR("%s: lookup range "DRANGE" error: rc = %d\n",
+                       fld->lsf_name, PRANGE(range), rc);
+               GOTO(out, rc);
        }
 
-        LASSERT(range_is_sane(range));
+       /* Check for merge case, since the fld entry can only be increamental,
+        * so we will only check whether it can be merged from the left. */
+       if (new_range->lsr_start == range->lsr_end && range->lsr_end != 0 &&
+           range_compare_loc(new_range, range) == 0) {
+               range_cpu_to_be(tmp, range);
+               rc = dt_declare_delete(env, fld->lsf_obj,
+                                      (struct dt_key *)&tmp->lsr_start, th);
+               if (rc) {
+                       CERROR("%s: declare record "DRANGE" failed: rc = %d\n",
+                              fld->lsf_name, PRANGE(range), rc);
+                       GOTO(out, rc);
+               }
+               memcpy(tmp, new_range, sizeof(*new_range));
+               tmp->lsr_start = range->lsr_start;
+       } else {
+               memcpy(tmp, new_range, sizeof(*new_range));
+       }
 
-       rc = dt_declare_insert(env, fld->lsf_obj, fld_rec(env, range),
-                             fld_key(env, range->lsr_start), th);
-        RETURN(rc);
+       range_cpu_to_be(tmp, tmp);
+       rc = dt_declare_insert(env, fld->lsf_obj, (struct dt_rec *)tmp,
+                              (struct dt_key *)&tmp->lsr_start, th);
+out:
+       RETURN(rc);
 }
 
 /**
@@ -139,59 +146,75 @@ int fld_declare_index_create(struct lu_server_fld *fld,
  *
  *      \retval  0  success
  *      \retval  -ve error
- */
-int fld_index_create(struct lu_server_fld *fld,
-                     const struct lu_env *env,
-                     const struct lu_seq_range *range,
-                     struct thandle *th)
+ *
+ * The whole fld index insertion is protected by seq->lss_mutex (see
+ * seq_server_alloc_super), i.e. only one thread will access fldb each
+ * time, so we do not need worry the fld file and cache will being
+ * changed between declare and create.
+ * Because the fld entry can only be increamental, so we will only check
+ * whether it can be merged from the left.
+ **/
+int fld_index_create(const struct lu_env *env, struct lu_server_fld *fld,
+                    const struct lu_seq_range *new_range, struct thandle *th)
 {
-        int rc;
-
-        ENTRY;
-
-       if (fld->lsf_no_range_lookup) {
-               /* Stub for underlying FS which can't lookup ranges */
-               if (range->lsr_index != 0) {
-                       CERROR("%s: FLD backend does not support range"
-                              "lookups, so DNE and FIDs-on-OST are not"
-                              "supported in this configuration\n",
-                              fld->lsf_name);
-                       return -EINVAL;
-               }
+       struct lu_seq_range     *range;
+       struct lu_seq_range     *tmp;
+       struct fld_thread_info  *info;
+       int                     rc = 0;
+       int                     deleted = 0;
+       struct fld_cache_entry  *flde;
+       ENTRY;
+
+       info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+
+       LASSERT_MUTEX_LOCKED(&fld->lsf_lock);
+
+       range = &info->fti_lrange;
+       memset(range, 0, sizeof(*range));
+       tmp = &info->fti_irange;
+       rc = fld_index_lookup(env, fld, new_range->lsr_start, range);
+       if (rc != -ENOENT) {
+               rc = rc == 0 ? -EEXIST : rc;
+               GOTO(out, rc);
        }
 
-        LASSERT(range_is_sane(range));
-
-       rc = dt_insert(env, fld->lsf_obj, fld_rec(env, range),
-                      fld_key(env, range->lsr_start), th, BYPASS_CAPA, 1);
-        CDEBUG(D_INFO, "%s: insert given range : "DRANGE" rc = %d\n",
-               fld->lsf_name, PRANGE(range), rc);
-        RETURN(rc);
-}
+       if (new_range->lsr_start == range->lsr_end && range->lsr_end != 0 &&
+           range_compare_loc(new_range, range) == 0) {
+               range_cpu_to_be(tmp, range);
+               rc = dt_delete(env, fld->lsf_obj,
+                              (struct dt_key *)&tmp->lsr_start, th,
+                               BYPASS_CAPA);
+               if (rc != 0)
+                       GOTO(out, rc);
+               memcpy(tmp, new_range, sizeof(*new_range));
+               tmp->lsr_start = range->lsr_start;
+               deleted = 1;
+       } else {
+               memcpy(tmp, new_range, sizeof(*new_range));
+       }
 
-/**
- * delete range in fld store.
- *
- *      \param  range range to be deleted
- *      \param  th     transaction
- *
- *      \retval  0  success
- *      \retval  -ve error
- */
-int fld_index_delete(struct lu_server_fld *fld,
-                     const struct lu_env *env,
-                     struct lu_seq_range *range,
-                     struct thandle   *th)
-{
-        int rc;
+       range_cpu_to_be(tmp, tmp);
+       rc = dt_insert(env, fld->lsf_obj, (struct dt_rec *)tmp,
+                      (struct dt_key *)&tmp->lsr_start, th, BYPASS_CAPA, 1);
+       if (rc != 0) {
+               CERROR("%s: insert range "DRANGE" failed: rc = %d\n",
+                      fld->lsf_name, PRANGE(new_range), rc);
+               GOTO(out, rc);
+       }
 
-        ENTRY;
+       flde = fld_cache_entry_create(new_range);
+       if (IS_ERR(flde))
+               GOTO(out, rc = PTR_ERR(flde));
 
-       rc = dt_delete(env, fld->lsf_obj, fld_key(env, range->lsr_start), th,
-                      BYPASS_CAPA);
-        CDEBUG(D_INFO, "%s: delete given range : "DRANGE" rc = %d\n",
-               fld->lsf_name, PRANGE(range), rc);
-        RETURN(rc);
+       spin_lock(&fld->lsf_cache->fci_lock);
+       if (deleted)
+               fld_cache_delete_nolock(fld->lsf_cache, new_range);
+       rc = fld_cache_insert_nolock(fld->lsf_cache, flde);
+       spin_unlock(&fld->lsf_cache->fci_lock);
+       if (rc)
+               OBD_FREE_PTR(flde);
+out:
+       RETURN(rc);
 }
 
 /**
@@ -205,40 +228,20 @@ int fld_index_delete(struct lu_server_fld *fld,
  * \retval -ENOENT      not found, \a range is the left-side range;
  * \retval  -ve         other error;
  */
-
-int fld_index_lookup(struct lu_server_fld *fld,
-                     const struct lu_env *env,
-                     seqno_t seq,
-                     struct lu_seq_range *range)
+int fld_index_lookup(const struct lu_env *env, struct lu_server_fld *fld,
+                    seqno_t seq, struct lu_seq_range *range)
 {
-        struct dt_object        *dt_obj = fld->lsf_obj;
         struct lu_seq_range     *fld_rec;
-        struct dt_key           *key = fld_key(env, seq);
         struct fld_thread_info  *info;
         int rc;
 
         ENTRY;
 
-       if (fld->lsf_no_range_lookup) {
-               /* Stub for underlying FS which can't lookup ranges */
-               range->lsr_start = 0;
-               range->lsr_end = ~0;
-               range->lsr_index = 0;
-               range->lsr_flags = LU_SEQ_RANGE_MDT;
+       info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+       fld_rec = &info->fti_rec;
 
-               range_cpu_to_be(range, range);
-               return 0;
-       }
-
-        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
-        fld_rec = &info->fti_rec;
-
-        rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj,
-                                              (struct dt_rec*) fld_rec,
-                                              key, BYPASS_CAPA);
-
-        if (rc >= 0) {
-                range_be_to_cpu(fld_rec, fld_rec);
+       rc = fld_cache_lookup(fld->lsf_cache, seq, fld_rec);
+       if (rc == 0) {
                 *range = *fld_rec;
                 if (range_within(range, seq))
                         rc = 0;
@@ -253,25 +256,29 @@ int fld_index_lookup(struct lu_server_fld *fld,
 }
 
 static int fld_insert_igif_fld(struct lu_server_fld *fld,
-                               const struct lu_env *env)
+                              const struct lu_env *env)
 {
-        struct thandle *th;
-        int rc;
-        ENTRY;
+       struct thandle *th;
+       int rc;
+       ENTRY;
 
        th = dt_trans_create(env, lu2dt_dev(fld->lsf_obj->do_lu.lo_dev));
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
-       rc = fld_declare_index_create(fld, env, &IGIF_FLD_RANGE, th);
-       if (rc)
+
+       rc = fld_declare_index_create(env, fld, &IGIF_FLD_RANGE, th);
+       if (rc != 0) {
+               if (rc == -EEXIST)
+                       rc = 0;
                GOTO(out, rc);
+       }
 
        rc = dt_trans_start_local(env, lu2dt_dev(fld->lsf_obj->do_lu.lo_dev),
                                  th);
        if (rc)
                GOTO(out, rc);
 
-       rc = fld_index_create(fld, env, &IGIF_FLD_RANGE, th);
+       rc = fld_index_create(env, fld, &IGIF_FLD_RANGE, th);
        if (rc == -EEXIST)
                rc = 0;
 out:
@@ -279,73 +286,107 @@ out:
        RETURN(rc);
 }
 
-int fld_index_init(struct lu_server_fld *fld,
-                   const struct lu_env *env,
-                   struct dt_device *dt)
+int fld_index_init(const struct lu_env *env, struct lu_server_fld *fld,
+                  struct dt_device *dt)
 {
-        struct dt_object *dt_obj;
-        struct lu_fid fid;
-       struct lu_attr attr;
-       struct dt_object_format dof;
-        int rc;
-        ENTRY;
+       struct dt_object        *dt_obj = NULL;
+       struct lu_fid           fid;
+       struct lu_attr          *attr = NULL;
+       struct lu_seq_range     *range = NULL;
+       struct fld_thread_info  *info;
+       struct dt_object_format dof;
+       struct dt_it            *it;
+       const struct dt_it_ops  *iops;
+       int                     rc;
+       ENTRY;
+
+       info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+       LASSERT(info != NULL);
 
        lu_local_obj_fid(&fid, FLD_INDEX_OID);
+       OBD_ALLOC_PTR(attr);
+       if (attr == NULL)
+               RETURN(-ENOMEM);
 
-       memset(&attr, 0, sizeof(attr));
-       attr.la_valid = LA_MODE;
-       attr.la_mode = S_IFREG | 0666;
+       memset(attr, 0, sizeof(attr));
+       attr->la_valid = LA_MODE;
+       attr->la_mode = S_IFREG | 0666;
        dof.dof_type = DFT_INDEX;
        dof.u.dof_idx.di_feat = &fld_index_features;
 
-       dt_obj = dt_find_or_create(env, dt, &fid, &dof, &attr);
-        if (!IS_ERR(dt_obj)) {
-                fld->lsf_obj = dt_obj;
-                rc = dt_obj->do_ops->do_index_try(env, dt_obj,
-                                                  &fld_index_features);
-                if (rc == 0) {
-                        LASSERT(dt_obj->do_index_ops != NULL);
-                        rc = fld_insert_igif_fld(fld, env);
-
-                        if (rc != 0) {
-                                CERROR("insert igif in fld! = %d\n", rc);
-                                lu_object_put(env, &dt_obj->do_lu);
-                                fld->lsf_obj = NULL;
-                        }
-               } else if (rc == -ERANGE) {
-                       CWARN("%s: File \"%s\" doesn't support range lookup, "
-                             "using stub. DNE and FIDs on OST will not work "
-                             "with this backend\n",
-                             fld->lsf_name, fld_index_name);
-
-                       LASSERT(dt_obj->do_index_ops == NULL);
-                       fld->lsf_no_range_lookup = 1;
-                       rc = 0;
-               } else {
-                       CERROR("%s: File \"%s\" is not index, rc %d!\n",
-                              fld->lsf_name, fld_index_name, rc);
-                       lu_object_put(env, &fld->lsf_obj->do_lu);
-                       fld->lsf_obj = NULL;
-               }
+       dt_obj = dt_find_or_create(env, dt, &fid, &dof, attr);
+       if (IS_ERR(dt_obj)) {
+               rc = PTR_ERR(dt_obj);
+               CERROR("%s: Can't find \"%s\" obj %d\n", fld->lsf_name,
+                       fld_index_name, rc);
+               dt_obj = NULL;
+               GOTO(out, rc);
+       }
 
+       fld->lsf_obj = dt_obj;
+       rc = dt_obj->do_ops->do_index_try(env, dt_obj, &fld_index_features);
+       if (rc == 0) {
+               LASSERT(dt_obj->do_index_ops != NULL);
+               mutex_lock(&fld->lsf_lock);
+               rc = fld_insert_igif_fld(fld, env);
+               mutex_unlock(&fld->lsf_lock);
+               if (rc != 0) {
+                       CERROR("insert igif in fld! = %d\n", rc);
+                       GOTO(out, rc);
+               }
+       } else {
+               CERROR("%s: File \"%s\" is not an index: rc = %d!\n",
+                      fld->lsf_name, fld_index_name, rc);
+               GOTO(out, rc);
+       }
 
-        } else {
-                CERROR("%s: Can't find \"%s\" obj %d\n",
-                       fld->lsf_name, fld_index_name, (int)PTR_ERR(dt_obj));
-                rc = PTR_ERR(dt_obj);
-        }
+       range = &info->fti_rec;
+       /* Load fld entry to cache */
+       iops = &dt_obj->do_index_ops->dio_it;
+       it = iops->init(env, dt_obj, 0, NULL);
+       if (IS_ERR(it))
+               GOTO(out, rc = PTR_ERR(it));
+
+       rc = iops->load(env, it, 0);
+       if (rc < 0)
+               GOTO(out_it_fini, rc);
+
+       do {
+               rc = iops->rec(env, it, (struct dt_rec *)range, 0);
+               if (rc != 0)
+                       GOTO(out_it_fini, rc);
+
+               LASSERT(range != NULL);
+               range_be_to_cpu(range, range);
+               rc = fld_cache_insert(fld->lsf_cache, range);
+               if (rc != 0)
+                       GOTO(out_it_fini, rc);
+               rc = iops->next(env, it);
+
+       } while (rc == 0);
+       rc = 0;
+
+out_it_fini:
+       iops->fini(env, it);
+out:
+       if (attr != NULL)
+               OBD_FREE_PTR(attr);
 
-        RETURN(rc);
+       if (rc != 0) {
+               if (dt_obj != NULL)
+                       lu_object_put(env, &dt_obj->do_lu);
+               fld->lsf_obj = NULL;
+       }
+       RETURN(rc);
 }
 
-void fld_index_fini(struct lu_server_fld *fld,
-                    const struct lu_env *env)
+void fld_index_fini(const struct lu_env *env, struct lu_server_fld *fld)
 {
-        ENTRY;
-        if (fld->lsf_obj != NULL) {
-                if (!IS_ERR(fld->lsf_obj))
-                        lu_object_put(env, &fld->lsf_obj->do_lu);
-                fld->lsf_obj = NULL;
-        }
-        EXIT;
+       ENTRY;
+       if (fld->lsf_obj != NULL) {
+               if (!IS_ERR(fld->lsf_obj))
+                       lu_object_put(env, &fld->lsf_obj->do_lu);
+               fld->lsf_obj = NULL;
+       }
+       EXIT;
 }
index 61955be..eb951cc 100644 (file)
@@ -112,6 +112,7 @@ struct fld_cache {
         /**
          * Cache name used for debug and messages. */
         char                     fci_name[80];
+       int                     fci_no_shrink:1;
 };
 
 enum fld_op {
@@ -150,26 +151,21 @@ struct fld_thread_info {
 
 extern struct lu_context_key fld_thread_key;
 
-int fld_index_init(struct lu_server_fld *fld,
-                   const struct lu_env *env,
-                   struct dt_device *dt);
+int fld_index_init(const struct lu_env *env, struct lu_server_fld *fld,
+                  struct dt_device *dt);
 
-void fld_index_fini(struct lu_server_fld *fld,
-                    const struct lu_env *env);
+void fld_index_fini(const struct lu_env *env, struct lu_server_fld *fld);
 
-int fld_index_create(struct lu_server_fld *fld,
-                     const struct lu_env *env,
-                     const struct lu_seq_range *range,
-                     struct thandle *th);
+int fld_declare_index_create(const struct lu_env *env,
+                            struct lu_server_fld *fld,
+                            const struct lu_seq_range *new,
+                            struct thandle *th);
 
-int fld_index_delete(struct lu_server_fld *fld,
-                     const struct lu_env *env,
-                     struct lu_seq_range *range,
-                     struct thandle *th);
+int fld_index_create(const struct lu_env *env, struct lu_server_fld *fld,
+                    const struct lu_seq_range *new, struct thandle *th);
 
-int fld_index_lookup(struct lu_server_fld *fld,
-                     const struct lu_env *env,
-                     seqno_t seq, struct lu_seq_range *range);
+int fld_index_lookup(const struct lu_env *env, struct lu_server_fld *fld,
+                    seqno_t seq, struct lu_seq_range *range);
 
 int fld_client_rpc(struct obd_export *exp,
                    struct lu_seq_range *range, __u32 fld_op);
@@ -188,15 +184,33 @@ void fld_cache_fini(struct fld_cache *cache);
 
 void fld_cache_flush(struct fld_cache *cache);
 
-void fld_cache_insert(struct fld_cache *cache,
-                      const struct lu_seq_range *range);
+int fld_cache_insert(struct fld_cache *cache,
+                    const struct lu_seq_range *range);
+
+struct fld_cache_entry
+*fld_cache_entry_create(const struct lu_seq_range *range);
 
+int fld_cache_insert_nolock(struct fld_cache *cache,
+                           struct fld_cache_entry *f_new);
 void fld_cache_delete(struct fld_cache *cache,
                       const struct lu_seq_range *range);
-
+void fld_cache_delete_nolock(struct fld_cache *cache,
+                            const struct lu_seq_range *range);
 int fld_cache_lookup(struct fld_cache *cache,
                      const seqno_t seq, struct lu_seq_range *range);
 
+struct fld_cache_entry*
+fld_cache_entry_lookup(struct fld_cache *cache, struct lu_seq_range *range);
+void fld_cache_entry_delete(struct fld_cache *cache,
+                           struct fld_cache_entry *node);
+void fld_dump_cache_entries(struct fld_cache *cache);
+
+struct fld_cache_entry
+*fld_cache_entry_lookup_nolock(struct fld_cache *cache,
+                             struct lu_seq_range *range);
+int fld_write_range(const struct lu_env *env, struct dt_object *dt,
+                   const struct lu_seq_range *range, struct thandle *th);
+
 static inline const char *
 fld_target_name(struct lu_fld_target *tar)
 {
index cde98e2..b3438c6 100644 (file)
@@ -488,9 +488,8 @@ int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds,
         res.lsr_flags = flags;
 #ifdef __KERNEL__
         if (target->ft_srv != NULL) {
-                LASSERT(env != NULL);
-                rc = fld_server_lookup(target->ft_srv,
-                                       env, seq, &res);
+               LASSERT(env != NULL);
+               rc = fld_server_lookup(env, target->ft_srv, seq, &res);
         } else {
 #endif
                 rc = fld_client_rpc(target->ft_exp,
@@ -502,7 +501,7 @@ int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds,
         if (rc == 0) {
                 *mds = res.lsr_index;
 
-                fld_cache_insert(fld->lcf_cache, &res);
+               fld_cache_insert(fld->lcf_cache, &res);
         }
         RETURN(rc);
 }
index 7a26a9e..c5497ee 100644 (file)
@@ -98,10 +98,6 @@ struct lu_server_fld {
          * Fld service name in form "fld-srv-lustre-MDTXXX" */
         char                     lsf_name[80];
 
-       /**
-        * Backend does not support range lookups,
-        * indexes other that 0 will be prohibited */
-       int                      lsf_no_range_lookup;
 };
 
 struct lu_client_fld {
@@ -151,31 +147,23 @@ enum {
 int fld_query(struct com_thread_info *info);
 
 /* Server methods */
-int fld_server_init(struct lu_server_fld *fld,
-                    struct dt_device *dt,
-                    const char *prefix,
-                    const struct lu_env *env,
-                    int mds_node_id);
-
-void fld_server_fini(struct lu_server_fld *fld,
-                     const struct lu_env *env);
-
-int fld_declare_server_create(struct lu_server_fld *fld,
-                              const struct lu_env *env,
-                              struct thandle *th);
-
-int fld_server_create(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      struct lu_seq_range *add_range,
-                      struct thandle *th);
-
-int fld_server_delete(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      struct lu_seq_range *range);
-
-int fld_server_lookup(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      seqno_t seq, struct lu_seq_range *range);
+int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
+                   struct dt_device *dt, const char *prefix, int mds_node_id);
+
+void fld_server_fini(const struct lu_env *env, struct lu_server_fld *fld);
+
+int fld_declare_server_create(const struct lu_env *env,
+                             struct lu_server_fld *fld,
+                             struct lu_seq_range *new,
+                             struct thandle *th);
+
+int fld_server_create(const struct lu_env *env,
+                     struct lu_server_fld *fld,
+                     struct lu_seq_range *add_range,
+                     struct thandle *th);
+
+int fld_server_lookup(const struct lu_env *env, struct lu_server_fld *fld,
+                     seqno_t seq, struct lu_seq_range *range);
 
 /* Client methods */
 int fld_client_init(struct lu_client_fld *fld,
index 1adcb54..4dfce3a 100644 (file)
@@ -63,7 +63,17 @@ int lmv_fld_lookup(struct lmv_obd *lmv,
         ENTRY;
 
         LASSERT(fid_is_sane(fid));
-        rc = fld_client_lookup(&lmv->lmv_fld, fid_seq(fid), mds,
+
+       /* FIXME: Because ZFS still use LOCAL fid sequence for root,
+        * and root will always be in MDT0, for local fid, it will
+        * return 0 directly. And it should be removed once the root
+        * FID has been assigned with special sequence */
+       if (fid_seq(fid) == FID_SEQ_LOCAL_FILE) {
+               *mds = 0;
+               RETURN(0);
+       }
+
+       rc = fld_client_lookup(&lmv->lmv_fld, fid_seq(fid), mds,
                                LU_SEQ_RANGE_MDT, NULL);
         if (rc) {
                 CERROR("Error while looking for mds number. Seq "LPX64
index da410c2..1afa76d 100644 (file)
@@ -4123,7 +4123,7 @@ static int mdt_fld_fini(const struct lu_env *env,
         ENTRY;
 
         if (ms && ms->ms_server_fld) {
-                fld_server_fini(ms->ms_server_fld, env);
+               fld_server_fini(env, ms->ms_server_fld);
                 OBD_FREE_PTR(ms->ms_server_fld);
                 ms->ms_server_fld = NULL;
         }
@@ -4145,9 +4145,8 @@ static int mdt_fld_init(const struct lu_env *env,
         if (ms->ms_server_fld == NULL)
                 RETURN(rc = -ENOMEM);
 
-        rc = fld_server_init(ms->ms_server_fld,
-                             m->mdt_bottom, uuid,
-                             env, ms->ms_node_id);
+       rc = fld_server_init(env, ms->ms_server_fld, m->mdt_bottom, uuid,
+                            ms->ms_node_id);
         if (rc) {
                 OBD_FREE_PTR(ms->ms_server_fld);
                 ms->ms_server_fld = NULL;
index b990ba9..2d68060 100644 (file)
@@ -187,11 +187,11 @@ struct mdt_device {
        struct lu_device          *mdt_qmt_dev;
 };
 
-#define MDT_SERVICE_WATCHDOG_FACTOR     (2)
-#define MDT_ROCOMPAT_SUPP       (OBD_ROCOMPAT_LOVOBJID)
-#define MDT_INCOMPAT_SUPP       (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR | \
-                                 OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR | \
-                                 OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI)
+#define MDT_SERVICE_WATCHDOG_FACTOR    (2)
+#define MDT_ROCOMPAT_SUPP      (OBD_ROCOMPAT_LOVOBJID)
+#define MDT_INCOMPAT_SUPP      (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR | \
+                               OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR | \
+                               OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI)
 #define MDT_COS_DEFAULT         (0)
 
 struct mdt_object {
index 1720cd4..0877a53 100644 (file)
@@ -216,9 +216,9 @@ static int mdt_server_data_init(const struct lu_env *env,
                 lsd->lsd_client_size = LR_CLIENT_SIZE;
                 lsd->lsd_feature_compat = OBD_COMPAT_MDT;
                 lsd->lsd_feature_rocompat = OBD_ROCOMPAT_LOVOBJID;
-                lsd->lsd_feature_incompat = OBD_INCOMPAT_MDT |
-                                            OBD_INCOMPAT_COMMON_LR |
-                                            OBD_INCOMPAT_MULTI_OI;
+               lsd->lsd_feature_incompat = OBD_INCOMPAT_MDT |
+                                           OBD_INCOMPAT_COMMON_LR |
+                                           OBD_INCOMPAT_MULTI_OI;
         } else {
                 LCONSOLE_WARN("%s: used disk, loading\n", obd->obd_name);
                rc = tgt_server_data_read(env, &mdt->mdt_lut);