Whamcloud - gitweb
LU-6158 mdt: always shrink_capsule in getxattr_all
[fs/lustre-release.git] / lustre / fld / fld_cache.c
index 192d81d..24b0cd1 100644 (file)
-/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *  lustre/fld/fld_cache.c
- *  FLD (Fids Location Database)
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *  Copyright (C) 2006 Cluster File Systems, Inc.
- *   Author: Yury Umanets <umka@clusterfs.com>
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, 2014, Intel Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * lustre/fld/fld_cache.c
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * FLD (Fids Location Database)
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * Author: Pravin Shelar <pravin.shelar@sun.com>
+ * Author: Yury Umanets <umka@clusterfs.com>
  */
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
+
 #define DEBUG_SUBSYSTEM S_FLD
 
-#ifdef __KERNEL__
-# include <libcfs/libcfs.h>
-# include <linux/module.h>
-# include <linux/jbd.h>
-# include <asm/div64.h>
-#else /* __KERNEL__ */
-# include <liblustre.h>
-# include <libcfs/list.h>
-#endif
-
-#include <obd.h>
-#include <obd_class.h>
-#include <lustre_ver.h>
+#include <libcfs/libcfs.h>
+#include <linux/module.h>
+#include <linux/math64.h>
 #include <obd_support.h>
-#include <lprocfs_status.h>
-
-#include <dt_object.h>
-#include <md_object.h>
-#include <lustre_req_layout.h>
 #include <lustre_fld.h>
 #include "fld_internal.h"
 
-#ifdef __KERNEL__
-static inline __u32 fld_cache_hash(seqno_t seq)
-{
-        return (__u32)seq;
-}
-
-void fld_cache_flush(struct fld_cache *cache)
-{
-        struct fld_cache_entry *flde;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        struct hlist_node *next;
-        int i;
-        ENTRY;
-
-       /* Free all cache entries. */
-       spin_lock(&cache->fci_lock);
-       for (i = 0; i < cache->fci_hash_size; i++) {
-               bucket = cache->fci_hash_table + i;
-               hlist_for_each_entry_safe(flde, scan, next, bucket, fce_list) {
-                       hlist_del_init(&flde->fce_list);
-                        list_del_init(&flde->fce_lru);
-                        cache->fci_cache_count--;
-                       OBD_FREE_PTR(flde);
-               }
-       }
-        spin_unlock(&cache->fci_lock);
-        EXIT;
-}
-
-struct fld_cache *fld_cache_init(const char *name, int hash_size,
+/**
+ * create fld cache.
+ */
+struct fld_cache *fld_cache_init(const char *name,
                                  int cache_size, int cache_threshold)
 {
-       struct fld_cache *cache;
-        int i;
+        struct fld_cache *cache;
         ENTRY;
 
         LASSERT(name != NULL);
-        LASSERT(IS_PO2(hash_size));
         LASSERT(cache_threshold < cache_size);
 
         OBD_ALLOC_PTR(cache);
         if (cache == NULL)
                 RETURN(ERR_PTR(-ENOMEM));
 
-        INIT_LIST_HEAD(&cache->fci_lru);
+       INIT_LIST_HEAD(&cache->fci_entries_head);
+       INIT_LIST_HEAD(&cache->fci_lru);
 
-       cache->fci_cache_count = 0;
-        spin_lock_init(&cache->fci_lock);
+        cache->fci_cache_count = 0;
+       rwlock_init(&cache->fci_lock);
 
-        strncpy(cache->fci_name, name,
+       strlcpy(cache->fci_name, name,
                 sizeof(cache->fci_name));
 
-       cache->fci_hash_size = hash_size;
-       cache->fci_cache_size = cache_size;
+        cache->fci_cache_size = cache_size;
         cache->fci_threshold = cache_threshold;
 
         /* Init fld cache info. */
-        cache->fci_hash_mask = hash_size - 1;
-        OBD_ALLOC(cache->fci_hash_table,
-                  hash_size * sizeof(*cache->fci_hash_table));
-        if (cache->fci_hash_table == NULL) {
-                OBD_FREE_PTR(cache);
-                RETURN(ERR_PTR(-ENOMEM));
-        }
-
-        for (i = 0; i < hash_size; i++)
-                INIT_HLIST_HEAD(&cache->fci_hash_table[i]);
         memset(&cache->fci_stat, 0, sizeof(cache->fci_stat));
 
         CDEBUG(D_INFO, "%s: FLD cache - Size: %d, Threshold: %d\n",
@@ -127,8 +86,10 @@ struct fld_cache *fld_cache_init(const char *name, int hash_size,
 
         RETURN(cache);
 }
-EXPORT_SYMBOL(fld_cache_init);
 
+/**
+ * destroy fld cache.
+ */
 void fld_cache_fini(struct fld_cache *cache)
 {
         __u64 pct;
@@ -144,36 +105,117 @@ void fld_cache_fini(struct fld_cache *cache)
                 pct = 0;
         }
 
-        printk("FLD cache statistics (%s):\n", cache->fci_name);
-        printk("  Total reqs: "LPU64"\n", cache->fci_stat.fst_count);
-        printk("  Cache reqs: "LPU64"\n", cache->fci_stat.fst_cache);
-        printk("  Saved RPCs: "LPU64"\n", cache->fci_stat.fst_inflight);
-        printk("  Cache hits: "LPU64"%%\n", pct);
+        CDEBUG(D_INFO, "FLD cache statistics (%s):\n", cache->fci_name);
+        CDEBUG(D_INFO, "  Total reqs: "LPU64"\n", cache->fci_stat.fst_count);
+        CDEBUG(D_INFO, "  Cache reqs: "LPU64"\n", cache->fci_stat.fst_cache);
+        CDEBUG(D_INFO, "  Cache hits: "LPU64"%%\n", pct);
+
+        OBD_FREE_PTR(cache);
 
-       OBD_FREE(cache->fci_hash_table, cache->fci_hash_size *
-                sizeof(*cache->fci_hash_table));
-       OBD_FREE_PTR(cache);
-       
         EXIT;
 }
-EXPORT_SYMBOL(fld_cache_fini);
 
-static inline struct hlist_head *
-fld_cache_bucket(struct fld_cache *cache, seqno_t seq)
+/**
+ * delete given node from list.
+ */
+void fld_cache_entry_delete(struct fld_cache *cache,
+                           struct fld_cache_entry *node)
 {
-        return cache->fci_hash_table + (fld_cache_hash(seq) &
-                                        cache->fci_hash_mask);
+       list_del(&node->fce_list);
+       list_del(&node->fce_lru);
+       cache->fci_cache_count--;
+       OBD_FREE_PTR(node);
 }
 
-/*
- * Check if cache needs to be shrinked. If so - do it. Tries to keep all
- * collision lists well balanced. That is, check all of them and remove one
- * entry in list and so on until cache is shrinked enough.
+/**
+ * fix list by checking new entry with NEXT entry in order.
+ */
+static void fld_fix_new_list(struct fld_cache *cache)
+{
+        struct fld_cache_entry *f_curr;
+        struct fld_cache_entry *f_next;
+        struct lu_seq_range *c_range;
+        struct lu_seq_range *n_range;
+       struct list_head *head = &cache->fci_entries_head;
+        ENTRY;
+
+restart_fixup:
+
+       list_for_each_entry_safe(f_curr, f_next, head, fce_list) {
+                c_range = &f_curr->fce_range;
+                n_range = &f_next->fce_range;
+
+                LASSERT(range_is_sane(c_range));
+                if (&f_next->fce_list == head)
+                        break;
+
+               if (c_range->lsr_flags != n_range->lsr_flags)
+                       continue;
+
+                LASSERTF(c_range->lsr_start <= n_range->lsr_start,
+                         "cur lsr_start "DRANGE" next lsr_start "DRANGE"\n",
+                         PRANGE(c_range), PRANGE(n_range));
+
+                /* check merge possibility with next range */
+                if (c_range->lsr_end == n_range->lsr_start) {
+                        if (c_range->lsr_index != n_range->lsr_index)
+                                continue;
+                        n_range->lsr_start = c_range->lsr_start;
+                        fld_cache_entry_delete(cache, f_curr);
+                        continue;
+                }
+
+                /* check if current range overlaps with next range. */
+                if (n_range->lsr_start < c_range->lsr_end) {
+                        if (c_range->lsr_index == n_range->lsr_index) {
+                                n_range->lsr_start = c_range->lsr_start;
+                                n_range->lsr_end = max(c_range->lsr_end,
+                                                       n_range->lsr_end);
+                                fld_cache_entry_delete(cache, f_curr);
+                        } else {
+                                if (n_range->lsr_end <= c_range->lsr_end) {
+                                        *n_range = *c_range;
+                                        fld_cache_entry_delete(cache, f_curr);
+                                } else
+                                        n_range->lsr_start = c_range->lsr_end;
+                        }
+
+                        /* we could have overlap over next
+                         * range too. better restart. */
+                        goto restart_fixup;
+                }
+
+                /* kill duplicates */
+               if (c_range->lsr_start == n_range->lsr_start &&
+                   c_range->lsr_end == n_range->lsr_end)
+                       fld_cache_entry_delete(cache, f_curr);
+        }
+
+        EXIT;
+}
+
+/**
+ * add node to fld cache
+ */
+static inline void fld_cache_entry_add(struct fld_cache *cache,
+                                       struct fld_cache_entry *f_new,
+                                      struct list_head *pos)
+{
+       list_add(&f_new->fce_list, pos);
+       list_add(&f_new->fce_lru, &cache->fci_lru);
+
+       cache->fci_cache_count++;
+       fld_fix_new_list(cache);
+}
+
+/**
+ * Check if cache needs to be shrunk. If so - do it.
+ * Remove one entry in list and so on until cache is shrunk enough.
  */
 static int fld_cache_shrink(struct fld_cache *cache)
 {
         struct fld_cache_entry *flde;
-        struct list_head *curr;
+       struct list_head *curr;
         int num = 0;
         ENTRY;
 
@@ -185,258 +227,332 @@ static int fld_cache_shrink(struct fld_cache *cache)
         curr = cache->fci_lru.prev;
 
         while (cache->fci_cache_count + cache->fci_threshold >
-               cache->fci_cache_size && curr != &cache->fci_lru)
-        {
-                flde = list_entry(curr, struct fld_cache_entry, fce_lru);
-                curr = curr->prev;
-
-                /* keep inflights */
-                if (flde->fce_inflight)
-                        continue;
+               cache->fci_cache_size && curr != &cache->fci_lru) {
 
-                hlist_del_init(&flde->fce_list);
-                list_del_init(&flde->fce_lru);
-                cache->fci_cache_count--;
-                OBD_FREE_PTR(flde);
+               flde = list_entry(curr, struct fld_cache_entry, fce_lru);
+                curr = curr->prev;
+                fld_cache_entry_delete(cache, flde);
                 num++;
         }
 
-        CDEBUG(D_INFO, "%s: FLD cache - Shrinked by "
+        CDEBUG(D_INFO, "%s: FLD cache - Shrunk by "
                "%d entries\n", cache->fci_name, num);
 
         RETURN(0);
 }
 
-int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq)
+/**
+ * kill all fld cache entries.
+ */
+void fld_cache_flush(struct fld_cache *cache)
 {
-        struct fld_cache_entry *flde, *fldt;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        ENTRY;
+       ENTRY;
 
-        spin_lock(&cache->fci_lock);
+       write_lock(&cache->fci_lock);
+       cache->fci_cache_size = 0;
+       fld_cache_shrink(cache);
+       write_unlock(&cache->fci_lock);
 
-        /* Check if cache already has the entry with such a seq. */
-        bucket = fld_cache_bucket(cache, seq);
-        hlist_for_each_entry(fldt, scan, bucket, fce_list) {
-                if (fldt->fce_seq == seq) {
-                        spin_unlock(&cache->fci_lock);
-                        RETURN(-EEXIST);
-                }
-        }
-        spin_unlock(&cache->fci_lock);
+       EXIT;
+}
 
-        /* Allocate new entry. */
-        OBD_ALLOC_PTR(flde);
-        if (!flde)
-                RETURN(-ENOMEM);
+/**
+ * punch hole in existing range. divide this range and add new
+ * entry accordingly.
+ */
 
-        /*
-         * Check if cache has the entry with such a seq again. It could be added
-         * while we were allocating new entry.
-         */
-        spin_lock(&cache->fci_lock);
-        hlist_for_each_entry(fldt, scan, bucket, fce_list) {
-                if (fldt->fce_seq == seq) {
-                        spin_unlock(&cache->fci_lock);
-                        OBD_FREE_PTR(flde);
-                        RETURN(0);
-                }
+static void fld_cache_punch_hole(struct fld_cache *cache,
+                                struct fld_cache_entry *f_curr,
+                                struct fld_cache_entry *f_new)
+{
+        const struct lu_seq_range *range = &f_new->fce_range;
+       const u64 new_start  = range->lsr_start;
+       const u64 new_end  = range->lsr_end;
+        struct fld_cache_entry *fldt;
+
+        ENTRY;
+       OBD_ALLOC_GFP(fldt, sizeof *fldt, GFP_ATOMIC);
+        if (!fldt) {
+                OBD_FREE_PTR(f_new);
+                EXIT;
+                /* overlap is not allowed, so dont mess up list. */
+                return;
         }
+        /*  break f_curr RANGE into three RANGES:
+         *        f_curr, f_new , fldt
+         */
 
-        /* Add new entry to cache and lru list. */
-        INIT_HLIST_NODE(&flde->fce_list);
-        flde->fce_inflight = 1;
-        flde->fce_invalid = 1;
-        cfs_waitq_init(&flde->fce_waitq);
-        flde->fce_seq = seq;
+        /* f_new = *range */
 
-        hlist_add_head(&flde->fce_list, bucket);
-        list_add(&flde->fce_lru, &cache->fci_lru);
-        cache->fci_cache_count++;
+        /* fldt */
+        fldt->fce_range.lsr_start = new_end;
+        fldt->fce_range.lsr_end = f_curr->fce_range.lsr_end;
+        fldt->fce_range.lsr_index = f_curr->fce_range.lsr_index;
 
-        spin_unlock(&cache->fci_lock);
+        /* f_curr */
+        f_curr->fce_range.lsr_end = new_start;
 
-        RETURN(0);
+        /* add these two entries to list */
+        fld_cache_entry_add(cache, f_new, &f_curr->fce_list);
+        fld_cache_entry_add(cache, fldt, &f_new->fce_list);
+
+        /* no need to fixup */
+        EXIT;
 }
-EXPORT_SYMBOL(fld_cache_insert_inflight);
 
-int fld_cache_insert(struct fld_cache *cache,
-                     seqno_t seq, mdsno_t mds)
+/**
+ * handle range overlap in fld cache.
+ */
+static void fld_cache_overlap_handle(struct fld_cache *cache,
+                               struct fld_cache_entry *f_curr,
+                               struct fld_cache_entry *f_new)
 {
-        struct fld_cache_entry *flde, *fldt;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        int rc;
-        ENTRY;
+       const struct lu_seq_range *range = &f_new->fce_range;
+       const u64 new_start  = range->lsr_start;
+       const u64 new_end  = range->lsr_end;
+       const u32 mdt = range->lsr_index;
 
-        spin_lock(&cache->fci_lock);
+        /* this is overlap case, these case are checking overlapping with
+         * prev range only. fixup will handle overlaping with next range. */
 
-        /* Check if need to shrink cache. */
-        rc = fld_cache_shrink(cache);
-        if (rc) {
-                spin_unlock(&cache->fci_lock);
-                RETURN(rc);
-        }
+        if (f_curr->fce_range.lsr_index == mdt) {
+                f_curr->fce_range.lsr_start = min(f_curr->fce_range.lsr_start,
+                                                  new_start);
 
-        /* Check if cache already has the entry with such a seq. */
-        bucket = fld_cache_bucket(cache, seq);
-        hlist_for_each_entry(fldt, scan, bucket, fce_list) {
-                if (fldt->fce_seq == seq) {
-                        if (fldt->fce_inflight) {
-                                /* set mds for inflight entry */
-                                fldt->fce_mds = mds;
-                                fldt->fce_inflight = 0;
-                                fldt->fce_invalid = 0;
-                                cfs_waitq_signal(&fldt->fce_waitq);
-                                rc = 0;
-                        } else
-                                rc = -EEXIST;
-                        spin_unlock(&cache->fci_lock);
-                        RETURN(rc);
-                }
-        }
-        spin_unlock(&cache->fci_lock);
+                f_curr->fce_range.lsr_end = max(f_curr->fce_range.lsr_end,
+                                                new_end);
 
-        /* Allocate new entry. */
-        OBD_ALLOC_PTR(flde);
-        if (!flde)
-                RETURN(-ENOMEM);
+                OBD_FREE_PTR(f_new);
+                fld_fix_new_list(cache);
 
-        /*
-         * Check if cache has the entry with such a seq again. It could be added
-         * while we were allocating new entry.
-         */
-        spin_lock(&cache->fci_lock);
-        hlist_for_each_entry(fldt, scan, bucket, fce_list) {
-                if (fldt->fce_seq == seq) {
-                        spin_unlock(&cache->fci_lock);
-                        OBD_FREE_PTR(flde);
-                        RETURN(0);
-                }
-        }
+        } else if (new_start <= f_curr->fce_range.lsr_start &&
+                        f_curr->fce_range.lsr_end <= new_end) {
+                /* case 1: new range completely overshadowed existing range.
+                 *         e.g. whole range migrated. update fld cache entry */
 
-        /* Add new entry to cache and lru list. */
-        INIT_HLIST_NODE(&flde->fce_list);
-        flde->fce_mds = mds;
-        flde->fce_seq = seq;
-        flde->fce_inflight = 0;
-        flde->fce_invalid = 0;
+                f_curr->fce_range = *range;
+                OBD_FREE_PTR(f_new);
+                fld_fix_new_list(cache);
 
-        hlist_add_head(&flde->fce_list, bucket);
-        list_add(&flde->fce_lru, &cache->fci_lru);
-        cache->fci_cache_count++;
+        } else if (f_curr->fce_range.lsr_start < new_start &&
+                        new_end < f_curr->fce_range.lsr_end) {
+                /* case 2: new range fit within existing range. */
 
-        spin_unlock(&cache->fci_lock);
+                fld_cache_punch_hole(cache, f_curr, f_new);
 
-        RETURN(0);
+        } else  if (new_end <= f_curr->fce_range.lsr_end) {
+                /* case 3: overlap:
+                 *         [new_start [c_start  new_end)  c_end)
+                 */
+
+                LASSERT(new_start <= f_curr->fce_range.lsr_start);
+
+                f_curr->fce_range.lsr_start = new_end;
+                fld_cache_entry_add(cache, f_new, f_curr->fce_list.prev);
+
+        } else if (f_curr->fce_range.lsr_start <= new_start) {
+                /* case 4: overlap:
+                 *         [c_start [new_start c_end) new_end)
+                 */
+
+                LASSERT(f_curr->fce_range.lsr_end <= new_end);
+
+                f_curr->fce_range.lsr_end = new_start;
+                fld_cache_entry_add(cache, f_new, &f_curr->fce_list);
+        } else
+                CERROR("NEW range ="DRANGE" curr = "DRANGE"\n",
+                       PRANGE(range),PRANGE(&f_curr->fce_range));
 }
-EXPORT_SYMBOL(fld_cache_insert);
 
-void fld_cache_delete(struct fld_cache *cache, seqno_t seq)
+struct fld_cache_entry
+*fld_cache_entry_create(const struct lu_seq_range *range)
 {
-        struct fld_cache_entry *flde;
-        struct hlist_node *scan, *n;
-        struct hlist_head *bucket;
-        ENTRY;
+       struct fld_cache_entry *f_new;
 
-        bucket = fld_cache_bucket(cache, seq);
-       
-        spin_lock(&cache->fci_lock);
-        hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
-                if (flde->fce_seq == seq) {
-                        hlist_del_init(&flde->fce_list);
-                        list_del_init(&flde->fce_lru);
-                        if (flde->fce_inflight) {
-                                flde->fce_inflight = 0;
-                                flde->fce_invalid = 1;
-                                cfs_waitq_signal(&flde->fce_waitq);
-                        }
-                        cache->fci_cache_count--;
-                       OBD_FREE_PTR(flde);
-                        GOTO(out_unlock, 0);
-                }
-        }
+       LASSERT(range_is_sane(range));
 
-        EXIT;
-out_unlock:
-        spin_unlock(&cache->fci_lock);
+       OBD_ALLOC_PTR(f_new);
+       if (!f_new)
+               RETURN(ERR_PTR(-ENOMEM));
+
+       f_new->fce_range = *range;
+       RETURN(f_new);
 }
-EXPORT_SYMBOL(fld_cache_delete);
 
-static int fld_check_inflight(struct fld_cache_entry *flde)
+/**
+ * Insert FLD entry in FLD cache.
+ *
+ * This function handles all cases of merging and breaking up of
+ * ranges.
+ */
+int fld_cache_insert_nolock(struct fld_cache *cache,
+                           struct fld_cache_entry *f_new)
 {
-        return (flde->fce_inflight);
+       struct fld_cache_entry *f_curr;
+       struct fld_cache_entry *n;
+       struct list_head *head;
+       struct list_head *prev = NULL;
+       const u64 new_start  = f_new->fce_range.lsr_start;
+       const u64 new_end  = f_new->fce_range.lsr_end;
+       __u32 new_flags  = f_new->fce_range.lsr_flags;
+       ENTRY;
+
+       /*
+        * Duplicate entries are eliminated in insert op.
+        * So we don't need to search new entry before starting
+        * insertion loop.
+        */
+
+       if (!cache->fci_no_shrink)
+               fld_cache_shrink(cache);
+
+       head = &cache->fci_entries_head;
+
+       list_for_each_entry_safe(f_curr, n, head, fce_list) {
+               /* add list if next is end of list */
+               if (new_end < f_curr->fce_range.lsr_start ||
+                  (new_end == f_curr->fce_range.lsr_start &&
+                   new_flags != f_curr->fce_range.lsr_flags))
+                       break;
+
+               prev = &f_curr->fce_list;
+               /* check if this range is to left of new range. */
+               if (new_start < f_curr->fce_range.lsr_end &&
+                   new_flags == f_curr->fce_range.lsr_flags) {
+                       fld_cache_overlap_handle(cache, f_curr, f_new);
+                       goto out;
+               }
+       }
+
+       if (prev == NULL)
+               prev = head;
+
+       CDEBUG(D_INFO, "insert range "DRANGE"\n", PRANGE(&f_new->fce_range));
+       /* Add new entry to cache and lru list. */
+       fld_cache_entry_add(cache, f_new, prev);
+out:
+       RETURN(0);
 }
 
-int fld_cache_lookup(struct fld_cache *cache,
-                     seqno_t seq, mdsno_t *mds)
+int fld_cache_insert(struct fld_cache *cache,
+                    const struct lu_seq_range *range)
 {
-        struct fld_cache_entry *flde;
-        struct hlist_node *scan, *n;
-        struct hlist_head *bucket;
-        ENTRY;
+       struct fld_cache_entry  *flde;
+       int rc;
 
-        bucket = fld_cache_bucket(cache, seq);
-
-        spin_lock(&cache->fci_lock);
-        cache->fci_stat.fst_count++;
-        hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
-                if (flde->fce_seq == seq) {
-                        if (flde->fce_inflight) {
-                                /* lookup RPC is inflight need to wait */
-                                struct l_wait_info lwi;
-                                spin_unlock(&cache->fci_lock);
-                                lwi = LWI_TIMEOUT(0, NULL, NULL);
-                                l_wait_event(flde->fce_waitq,
-                                             !fld_check_inflight(flde), &lwi);
-                                LASSERT(!flde->fce_inflight);
-                                if (flde->fce_invalid) 
-                                        RETURN(-ENOENT);
-                                
-                                *mds = flde->fce_mds;
-                                cache->fci_stat.fst_inflight++;
-                        } else {
-                                LASSERT(!flde->fce_invalid);
-                                *mds = flde->fce_mds;
-                                list_del(&flde->fce_lru);
-                                list_add(&flde->fce_lru, &cache->fci_lru);
-                                cache->fci_stat.fst_cache++;
-                                spin_unlock(&cache->fci_lock);
-                        }
-                        RETURN(0);
-                }
-        }
-        spin_unlock(&cache->fci_lock);
-        RETURN(-ENOENT);
+       flde = fld_cache_entry_create(range);
+       if (IS_ERR(flde))
+               RETURN(PTR_ERR(flde));
+
+       write_lock(&cache->fci_lock);
+       rc = fld_cache_insert_nolock(cache, flde);
+       write_unlock(&cache->fci_lock);
+       if (rc)
+               OBD_FREE_PTR(flde);
+
+       RETURN(rc);
 }
-EXPORT_SYMBOL(fld_cache_lookup);
-#else
-int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq)
+
+void fld_cache_delete_nolock(struct fld_cache *cache,
+                     const struct lu_seq_range *range)
 {
-        return -ENOTSUPP;
+       struct fld_cache_entry *flde;
+       struct fld_cache_entry *tmp;
+       struct list_head *head;
+
+       head = &cache->fci_entries_head;
+       list_for_each_entry_safe(flde, tmp, head, fce_list) {
+               /* add list if next is end of list */
+               if (range->lsr_start == flde->fce_range.lsr_start ||
+                  (range->lsr_end == flde->fce_range.lsr_end &&
+                   range->lsr_flags == flde->fce_range.lsr_flags)) {
+                       fld_cache_entry_delete(cache, flde);
+                       break;
+               }
+       }
 }
-EXPORT_SYMBOL(fld_cache_insert_inflight);
 
-int fld_cache_insert(struct fld_cache *cache,
-                     seqno_t seq, mdsno_t mds)
+/**
+ * Delete FLD entry in FLD cache.
+ *
+ */
+void fld_cache_delete(struct fld_cache *cache,
+                     const struct lu_seq_range *range)
 {
-        return -ENOTSUPP;
+       write_lock(&cache->fci_lock);
+       fld_cache_delete_nolock(cache, range);
+       write_unlock(&cache->fci_lock);
 }
-EXPORT_SYMBOL(fld_cache_insert);
 
-void fld_cache_delete(struct fld_cache *cache,
-                      seqno_t seq)
+struct fld_cache_entry *
+fld_cache_entry_lookup_nolock(struct fld_cache *cache,
+                             const struct lu_seq_range *range)
 {
-        return;
+       struct fld_cache_entry *flde;
+       struct fld_cache_entry *got = NULL;
+       struct list_head *head;
+
+       head = &cache->fci_entries_head;
+       list_for_each_entry(flde, head, fce_list) {
+               if (range->lsr_start == flde->fce_range.lsr_start ||
+                  (range->lsr_end == flde->fce_range.lsr_end &&
+                   range->lsr_flags == flde->fce_range.lsr_flags)) {
+                       got = flde;
+                       break;
+               }
+       }
+
+       RETURN(got);
 }
-EXPORT_SYMBOL(fld_cache_delete);
 
-int fld_cache_lookup(struct fld_cache *cache,
-                     seqno_t seq, mdsno_t *mds)
+/**
+ * lookup \a seq sequence for range in fld cache.
+ */
+struct fld_cache_entry *
+fld_cache_entry_lookup(struct fld_cache *cache,
+                      const struct lu_seq_range *range)
 {
-        return -ENOTSUPP;
+       struct fld_cache_entry *got = NULL;
+       ENTRY;
+
+       read_lock(&cache->fci_lock);
+       got = fld_cache_entry_lookup_nolock(cache, range);
+       read_unlock(&cache->fci_lock);
+
+       RETURN(got);
 }
-EXPORT_SYMBOL(fld_cache_lookup);
-#endif
 
+/**
+ * lookup \a seq sequence for range in fld cache.
+ */
+int fld_cache_lookup(struct fld_cache *cache,
+                    const u64 seq, struct lu_seq_range *range)
+{
+       struct fld_cache_entry *flde;
+       struct fld_cache_entry *prev = NULL;
+       struct list_head *head;
+       ENTRY;
+
+       read_lock(&cache->fci_lock);
+       head = &cache->fci_entries_head;
+
+       cache->fci_stat.fst_count++;
+       list_for_each_entry(flde, head, fce_list) {
+               if (flde->fce_range.lsr_start > seq) {
+                       if (prev != NULL)
+                               *range = prev->fce_range;
+                       break;
+               }
+
+               prev = flde;
+                if (range_within(&flde->fce_range, seq)) {
+                        *range = flde->fce_range;
+
+                        cache->fci_stat.fst_cache++;
+                       read_unlock(&cache->fci_lock);
+                       RETURN(0);
+               }
+       }
+       read_unlock(&cache->fci_lock);
+       RETURN(-ENOENT);
+}