Whamcloud - gitweb
- fixes after FLD CR:
authoryury <yury>
Fri, 30 Jun 2006 12:26:14 +0000 (12:26 +0000)
committeryury <yury>
Fri, 30 Jun 2006 12:26:14 +0000 (12:26 +0000)
  * cache API is moved to fld_cache.c;
  * cache is now used both on server and client;

  * fixed name aliasing. There was fld_cache global struct, also there were lots of functions which took fld_cache instance pointr which name was fld_cache;

  * fixed memory leaks in fld_cache_fini(). There was missed releasing of all cached entries;

  * do not translate fld_cache_*() errors to caller on server and client. If some error happens in cache working funtions, we do not care of that.

lustre/fld/Makefile.in
lustre/fld/autoMakefile.am
lustre/fld/fld_cache.c [new file with mode: 0644]
lustre/fld/fld_handler.c
lustre/fld/fld_index.c
lustre/fld/fld_internal.h
lustre/fld/fld_request.c

index 0eb8776..2887277 100644 (file)
@@ -1,5 +1,5 @@
 MODULES := fld 
-fld-objs := fld_handler.o fld_request.o fld_index.o lproc_fld.o
+fld-objs := fld_handler.o fld_request.o fld_cache.o fld_index.o lproc_fld.o
 
 EXTRA_PRE_CFLAGS := -I@LUSTRE@ -I@LUSTRE@/ldiskfs
 
index 4a48179..ceea4f6 100644 (file)
@@ -5,7 +5,7 @@
 
 if LIBLUSTRE
 noinst_LIBRARIES = libfld.a
-libfld_a_SOURCES = fld_handler.c fld_request.c fld_index.c lproc_fld.c fld_internal.h
+libfld_a_SOURCES = fld_handler.c fld_request.c fld_cache.c fld_index.c lproc_fld.c fld_internal.h
 libfld_a_CPPFLAGS = $(LLCPPFLAGS)
 libfld_a_CFLAGS = $(LLCFLAGS)
 endif
diff --git a/lustre/fld/fld_cache.c b/lustre/fld/fld_cache.c
new file mode 100644 (file)
index 0000000..6dca0b6
--- /dev/null
@@ -0,0 +1,201 @@
+/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/fld/fld_cache.c
+ *  FLD (Fids Location Database)
+ *
+ *  Copyright (C) 2006 Cluster File Systems, Inc.
+ *   Author: Yury Umanets <umka@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_FLD
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+# include <linux/jbd.h>
+# include <asm/div64.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+# include <libcfs/list.h>
+#endif
+
+#include <obd.h>
+#include <obd_class.h>
+#include <lustre_ver.h>
+#include <obd_support.h>
+#include <lprocfs_status.h>
+
+#include <dt_object.h>
+#include <md_object.h>
+#include <lustre_req_layout.h>
+#include <lustre_fld.h>
+#include "fld_internal.h"
+
+#ifdef __KERNEL__
+struct fld_cache_info *fld_cache_init(int size)
+{
+       struct fld_cache_info *cache;
+        int i;
+        ENTRY;
+
+        OBD_ALLOC_PTR(cache);
+        if (cache == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
+
+       cache->fci_size = size;
+        spin_lock_init(&cache->fci_lock);
+
+        /* init fld cache info */
+        cache->fci_hash_mask = size - 1;
+        OBD_ALLOC(cache->fci_hash, size *
+                  sizeof(*cache->fci_hash));
+        if (cache->fci_hash == NULL) {
+                OBD_FREE_PTR(cache);
+                RETURN(ERR_PTR(-ENOMEM));
+        }
+        
+        for (i = 0; i < size; i++)
+                INIT_HLIST_HEAD(&cache->fci_hash[i]);
+
+        CDEBUG(D_INFO|D_WARNING, "FLD cache size %d\n",
+               size);
+        
+        RETURN(cache);
+}
+
+void fld_cache_fini(struct fld_cache_info *cache)
+{
+        struct fld_cache_entry *flde;
+        struct hlist_head *bucket;
+        struct hlist_node *scan;
+       int i;
+        ENTRY;
+
+        LASSERT(cache != NULL);
+
+       /* free all cache entries */
+       spin_lock(&cache->fci_lock);
+       for (i = 0; i < cache->fci_size; i++) {
+               bucket = cache->fci_hash + i;
+               hlist_for_each_entry(flde, scan, bucket, fce_list) {
+                       hlist_del_init(&flde->fce_list);
+                       OBD_FREE_PTR(flde);
+               }
+       }
+        spin_unlock(&cache->fci_lock);
+
+       /* free cache hash table and cache itself */
+       OBD_FREE(cache->fci_hash, cache->fci_size *
+                sizeof(*cache->fci_hash));
+       OBD_FREE_PTR(cache);
+       
+        EXIT;
+}
+
+int
+fld_cache_insert(struct fld_cache_info *cache,
+                 __u64 seq, __u64 mds)
+{
+        struct fld_cache_entry *flde, *fldt;
+        struct hlist_head *bucket;
+        struct hlist_node *scan;
+        int rc = 0;
+        ENTRY;
+
+        OBD_ALLOC_PTR(flde);
+        if (!flde)
+                RETURN(-ENOMEM);
+
+        bucket = cache->fci_hash + (fld_cache_hash(seq) &
+                                   cache->fci_hash_mask);
+
+        spin_lock(&cache->fci_lock);
+        hlist_for_each_entry(fldt, scan, bucket, fce_list) {
+                if (fldt->fce_seq == seq)
+                        GOTO(exit_unlock, rc = -EEXIST);
+        }
+
+        INIT_HLIST_NODE(&flde->fce_list);
+        flde->fce_mds = mds;
+        flde->fce_seq = seq;
+        
+        hlist_add_head(&flde->fce_list, bucket);
+
+        EXIT;
+exit_unlock:
+        spin_unlock(&cache->fci_lock);
+        if (rc != 0)
+                OBD_FREE_PTR(flde);
+        return rc;
+}
+
+void
+fld_cache_delete(struct fld_cache_info *cache, __u64 seq)
+{
+        struct fld_cache_entry *flde;
+        struct hlist_head *bucket;
+        struct hlist_node *scan;
+        ENTRY;
+
+        bucket = cache->fci_hash + (fld_cache_hash(seq) &
+                                   cache->fci_hash_mask);
+       
+        spin_lock(&cache->fci_lock);
+        hlist_for_each_entry(flde, scan, bucket, fce_list) {
+                if (flde->fce_seq == seq) {
+                        hlist_del_init(&flde->fce_list);
+                       OBD_FREE_PTR(flde);
+                        GOTO(out_unlock, 0);
+                }
+        }
+
+        EXIT;
+out_unlock:
+        spin_unlock(&cache->fci_lock);
+        return;
+}
+
+struct fld_cache_entry *
+fld_cache_lookup(struct fld_cache_info *cache, __u64 seq)
+{
+        struct fld_cache_entry *flde;
+        struct hlist_head *bucket;
+        struct hlist_node *scan;
+        ENTRY;
+
+        bucket = cache->fci_hash + (fld_cache_hash(seq) &
+                                   cache->fci_hash_mask);
+
+        spin_lock(&cache->fci_lock);
+        hlist_for_each_entry(flde, scan, bucket, fce_list) {
+                if (flde->fce_seq == seq) {
+                        spin_unlock(&cache->fci_lock);
+                        RETURN(flde);
+                }
+        }
+        spin_unlock(&cache->fci_lock);
+        RETURN(NULL);
+}
+#endif
+
index e428655..2f1153c 100644 (file)
@@ -58,42 +58,27 @@ struct fld_cache_info *fld_cache = NULL;
 
 static int fld_init(void)
 {
-        int i;
+        int rc = 0;
         ENTRY;
-
-        OBD_ALLOC_PTR(fld_cache);
-        if (fld_cache == NULL)
-                RETURN(-ENOMEM);
-
-        spin_lock_init(&fld_cache->fci_lock);
-
-        /* init fld cache info */
-        fld_cache->fci_hash_mask = FLD_HTABLE_MASK;
-        OBD_ALLOC(fld_cache->fci_hash, FLD_HTABLE_SIZE *
-                  sizeof(*fld_cache->fci_hash));
-        if (fld_cache->fci_hash == NULL) {
-                OBD_FREE_PTR(fld_cache);
-                RETURN(-ENOMEM);
-        }
         
-        for (i = 0; i < FLD_HTABLE_SIZE; i++)
-                INIT_HLIST_HEAD(&fld_cache->fci_hash[i]);
+        fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
+        if (IS_ERR(fld_cache))
+                rc = PTR_ERR(fld_cache);
 
-        CDEBUG(D_INFO|D_WARNING, "Client FLD, cache size %d\n",
-               FLD_HTABLE_SIZE);
+        if (rc != 0)
+                fld_cache = NULL;
         
-        RETURN(0);
+        RETURN(rc);
 }
 
-static int fld_fini(void)
+static void fld_fini(void)
 {
         ENTRY;
         if (fld_cache != NULL) {
-                OBD_FREE(fld_cache->fci_hash, FLD_HTABLE_SIZE *
-                         sizeof(*fld_cache->fci_hash));
-                OBD_FREE_PTR(fld_cache);
+                fld_cache_fini(fld_cache);
+                fld_cache = NULL;
         }
-        RETURN(0);
+        EXIT;
 }
 
 static int __init fld_mod_init(void)
@@ -108,6 +93,58 @@ static void __exit fld_mod_exit(void)
         return;
 }
 
+/* insert index entry and update cache */
+static int
+fld_server_create(struct lu_server_fld *fld,
+                  const struct lu_context *ctx,
+                  __u64 seq, mdsno_t mds)
+{
+        int rc;
+        ENTRY;
+        
+        rc = fld_index_create(fld, ctx, seq, mds);
+        if (rc == 0) {
+                /* do not return result of calling fld_cache_insert()
+                 * here. First of all because it may return -EEXISTS. Another
+                 * reason is that, we do not want to stop proceeding because of
+                 * cache errors. --umka */
+                fld_cache_insert(fld_cache, seq, mds);
+        }
+        RETURN(rc);
+}
+
+/* delete index entry and update cache */
+static int
+fld_server_delete(struct lu_server_fld *fld,
+                  const struct lu_context *ctx,
+                  __u64 seq)
+{
+        ENTRY;
+        fld_cache_delete(fld_cache, seq);
+        RETURN(fld_index_delete(fld, ctx, seq));
+}
+
+/* lookup in cache first and then issue index lookup */
+static int
+fld_server_lookup(struct lu_server_fld *fld,
+                  const struct lu_context *ctx,
+                  __u64 seq, mdsno_t *mds)
+{
+        struct fld_cache_entry *flde;
+        int rc;
+        ENTRY;
+        
+        /* lookup it in the cache first */
+        flde = fld_cache_lookup(fld_cache, seq);
+        if (flde != NULL) {
+                *mds = flde->fce_mds;
+                RETURN(0);
+        }
+
+        rc = fld_index_lookup(fld, ctx, seq, mds);
+        RETURN(rc);
+}
+
 static int
 fld_server_handle(struct lu_server_fld *fld,
                   const struct lu_context *ctx,
@@ -118,15 +155,15 @@ fld_server_handle(struct lu_server_fld *fld,
 
         switch (opc) {
         case FLD_CREATE:
-                rc = fld_index_insert(fld, ctx,
-                                      mf->mf_seq, mf->mf_mds);
+                rc = fld_server_create(fld, ctx,
+                                       mf->mf_seq, mf->mf_mds);
                 break;
         case FLD_DELETE:
-                rc = fld_index_delete(fld, ctx, mf->mf_seq);
+                rc = fld_server_delete(fld, ctx, mf->mf_seq);
                 break;
         case FLD_LOOKUP:
-                rc = fld_index_lookup(fld, ctx,
-                                      mf->mf_seq, &mf->mf_mds);
+                rc = fld_server_lookup(fld, ctx,
+                                       mf->mf_seq, &mf->mf_mds);
                 break;
         default:
                 rc = -EINVAL;
index 28b3b64..88a84f1 100644 (file)
@@ -127,7 +127,7 @@ static struct dt_rec *fld_rec(const struct lu_context *ctx,
         RETURN((void *)&info->fti_rec);
 }
 
-int fld_index_insert(struct lu_server_fld *fld,
+int fld_index_create(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
                      fidseq_t seq, mdsno_t mds)
 {
index 60882aa..812378f 100644 (file)
@@ -39,6 +39,7 @@ struct fld_cache_entry {
 struct fld_cache_info {
         struct hlist_head *fci_hash;
         spinlock_t         fci_lock;
+        int                fci_size;
         int                fci_hash_mask;
 };
 
@@ -48,11 +49,7 @@ enum fld_op {
         FLD_LOOKUP = 2
 };
 
-enum {
-        FLD_HTABLE_BITS = 8,
-        FLD_HTABLE_SIZE = (1 << FLD_HTABLE_BITS),
-        FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1
-};
+#define FLD_HTABLE_SIZE 256
 
 extern struct lu_fld_hash fld_hash[3];
 extern struct fld_cache_info *fld_cache;
@@ -60,7 +57,13 @@ extern struct fld_cache_info *fld_cache;
 #ifdef __KERNEL__
 #define FLD_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
 
-int fld_index_insert(struct lu_server_fld *fld,
+int fld_index_init(struct lu_server_fld *fld,
+                   const struct lu_context *ctx);
+
+void fld_index_fini(struct lu_server_fld *fld,
+                    const struct lu_context *ctx);
+
+int fld_index_create(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
                      fidseq_t seq, mdsno_t mds);
 
@@ -72,11 +75,24 @@ int fld_index_lookup(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
                      fidseq_t seq, mdsno_t *mds);
 
-int fld_index_init(struct lu_server_fld *fld,
-                   const struct lu_context *ctx);
+struct fld_cache_info *fld_cache_init(int size);
 
-void fld_index_fini(struct lu_server_fld *fld,
-                    const struct lu_context *ctx);
+void fld_cache_fini(struct fld_cache_info *cache);
+
+int fld_cache_insert(struct fld_cache_info *cache,
+                     __u64 seq, __u64 mds);
+
+void fld_cache_delete(struct fld_cache_info *cache,
+                      __u64 seq);
+
+struct fld_cache_entry *
+fld_cache_lookup(struct fld_cache_info *cache,
+                 __u64 seq);
+
+static inline __u32 fld_cache_hash(__u64 seq)
+{
+        return (__u32)seq;
+}
 
 #endif
 
index edb7e35..28c3c0b 100644 (file)
 #include <lustre_fld.h>
 #include "fld_internal.h"
 
-#ifdef __KERNEL__
-static __u32
-fld_cache_hash(__u64 seq)
-{
-        return (__u32)seq;
-}
-
-static int
-fld_cache_insert(struct fld_cache_info *fld_cache,
-                 __u64 seq, __u64 mds)
-{
-        struct fld_cache_entry *flde, *fldt;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        int rc = 0;
-        ENTRY;
-
-        OBD_ALLOC_PTR(flde);
-        if (!flde)
-                RETURN(-ENOMEM);
-
-        bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
-                                        fld_cache->fci_hash_mask);
-
-        spin_lock(&fld_cache->fci_lock);
-        hlist_for_each_entry(fldt, scan, bucket, fce_list) {
-                if (fldt->fce_seq == seq)
-                        GOTO(exit_unlock, rc = -EEXIST);
-        }
-
-        INIT_HLIST_NODE(&flde->fce_list);
-        flde->fce_mds = mds;
-        flde->fce_seq = seq;
-        
-        hlist_add_head(&flde->fce_list, bucket);
-
-        EXIT;
-exit_unlock:
-        spin_unlock(&fld_cache->fci_lock);
-        if (rc != 0)
-                OBD_FREE_PTR(flde);
-        return rc;
-}
-
-static void
-fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
-{
-        struct fld_cache_entry *flde;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        ENTRY;
-
-        bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
-                                        fld_cache->fci_hash_mask);
-
-        spin_lock(&fld_cache->fci_lock);
-        hlist_for_each_entry(flde, scan, bucket, fce_list) {
-                if (flde->fce_seq == seq) {
-                        hlist_del_init(&flde->fce_list);
-                        GOTO(out_unlock, 0);
-                }
-        }
-
-        EXIT;
-out_unlock:
-        spin_unlock(&fld_cache->fci_lock);
-        return;
-}
-
-static struct fld_cache_entry *
-fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
-{
-        struct fld_cache_entry *flde;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        ENTRY;
-
-        bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
-                                        fld_cache->fci_hash_mask);
-
-        spin_lock(&fld_cache->fci_lock);
-        hlist_for_each_entry(flde, scan, bucket, fce_list) {
-                if (flde->fce_seq == seq) {
-                        spin_unlock(&fld_cache->fci_lock);
-                        RETURN(flde);
-                }
-        }
-        spin_unlock(&fld_cache->fci_lock);
-        RETURN(NULL);
-}
-#endif
-
 static int
 fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
 {
@@ -428,8 +336,13 @@ fld_client_create(struct lu_client_fld *fld,
         rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
         
 #ifdef __KERNEL__
-        if (rc  == 0)
-                rc = fld_cache_insert(fld_cache, seq, mds);
+        if (rc  == 0) {
+                /* do not return result of calling fld_cache_insert()
+                 * here. First of all because it may return -EEXISTS. Another
+                 * reason is that, we do not want to stop proceeding because of
+                 * cache errors. --umka */
+                fld_cache_insert(fld_cache, seq, mds);
+        }
 #endif
         
         RETURN(rc);
@@ -508,7 +421,9 @@ fld_client_lookup(struct lu_client_fld *fld,
                 RETURN(rc);
 
 #ifdef __KERNEL__
-        rc = fld_cache_insert(fld_cache, seq, *mds);
+        /* do not return error here as well. See previous comment in same
+         * situation in function fld_client_create(). --umka */
+        fld_cache_insert(fld_cache, seq, *mds);
 #endif
         
         RETURN(rc);