MODULES := fld
-fld-objs := fld_handler.o fld_request.o fld_index.o lproc_fld.o
+fld-objs := fld_handler.o fld_request.o fld_cache.o fld_index.o lproc_fld.o
EXTRA_PRE_CFLAGS := -I@LUSTRE@ -I@LUSTRE@/ldiskfs
if LIBLUSTRE
noinst_LIBRARIES = libfld.a
-libfld_a_SOURCES = fld_handler.c fld_request.c fld_index.c lproc_fld.c fld_internal.h
+libfld_a_SOURCES = fld_handler.c fld_request.c fld_cache.c fld_index.c lproc_fld.c fld_internal.h
libfld_a_CPPFLAGS = $(LLCPPFLAGS)
libfld_a_CFLAGS = $(LLCFLAGS)
endif
--- /dev/null
+/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * lustre/fld/fld_cache.c
+ * FLD (Fids Location Database)
+ *
+ * Copyright (C) 2006 Cluster File Systems, Inc.
+ * Author: Yury Umanets <umka@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
+ *
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
+ */
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_FLD
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+# include <linux/jbd.h>
+# include <asm/div64.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+# include <libcfs/list.h>
+#endif
+
+#include <obd.h>
+#include <obd_class.h>
+#include <lustre_ver.h>
+#include <obd_support.h>
+#include <lprocfs_status.h>
+
+#include <dt_object.h>
+#include <md_object.h>
+#include <lustre_req_layout.h>
+#include <lustre_fld.h>
+#include "fld_internal.h"
+
+#ifdef __KERNEL__
+struct fld_cache_info *fld_cache_init(int size)
+{
+ struct fld_cache_info *cache;
+ int i;
+ ENTRY;
+
+ OBD_ALLOC_PTR(cache);
+ if (cache == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ cache->fci_size = size;
+ spin_lock_init(&cache->fci_lock);
+
+ /* init fld cache info */
+ cache->fci_hash_mask = size - 1;
+ OBD_ALLOC(cache->fci_hash, size *
+ sizeof(*cache->fci_hash));
+ if (cache->fci_hash == NULL) {
+ OBD_FREE_PTR(cache);
+ RETURN(ERR_PTR(-ENOMEM));
+ }
+
+ for (i = 0; i < size; i++)
+ INIT_HLIST_HEAD(&cache->fci_hash[i]);
+
+ CDEBUG(D_INFO|D_WARNING, "FLD cache size %d\n",
+ size);
+
+ RETURN(cache);
+}
+
+void fld_cache_fini(struct fld_cache_info *cache)
+{
+ struct fld_cache_entry *flde;
+ struct hlist_head *bucket;
+ struct hlist_node *scan;
+ int i;
+ ENTRY;
+
+ LASSERT(cache != NULL);
+
+ /* free all cache entries */
+ spin_lock(&cache->fci_lock);
+ for (i = 0; i < cache->fci_size; i++) {
+ bucket = cache->fci_hash + i;
+ hlist_for_each_entry(flde, scan, bucket, fce_list) {
+ hlist_del_init(&flde->fce_list);
+ OBD_FREE_PTR(flde);
+ }
+ }
+ spin_unlock(&cache->fci_lock);
+
+ /* free cache hash table and cache itself */
+ OBD_FREE(cache->fci_hash, cache->fci_size *
+ sizeof(*cache->fci_hash));
+ OBD_FREE_PTR(cache);
+
+ EXIT;
+}
+
+int
+fld_cache_insert(struct fld_cache_info *cache,
+ __u64 seq, __u64 mds)
+{
+ struct fld_cache_entry *flde, *fldt;
+ struct hlist_head *bucket;
+ struct hlist_node *scan;
+ int rc = 0;
+ ENTRY;
+
+ OBD_ALLOC_PTR(flde);
+ if (!flde)
+ RETURN(-ENOMEM);
+
+ bucket = cache->fci_hash + (fld_cache_hash(seq) &
+ cache->fci_hash_mask);
+
+ spin_lock(&cache->fci_lock);
+ hlist_for_each_entry(fldt, scan, bucket, fce_list) {
+ if (fldt->fce_seq == seq)
+ GOTO(exit_unlock, rc = -EEXIST);
+ }
+
+ INIT_HLIST_NODE(&flde->fce_list);
+ flde->fce_mds = mds;
+ flde->fce_seq = seq;
+
+ hlist_add_head(&flde->fce_list, bucket);
+
+ EXIT;
+exit_unlock:
+ spin_unlock(&cache->fci_lock);
+ if (rc != 0)
+ OBD_FREE_PTR(flde);
+ return rc;
+}
+
+void
+fld_cache_delete(struct fld_cache_info *cache, __u64 seq)
+{
+ struct fld_cache_entry *flde;
+ struct hlist_head *bucket;
+ struct hlist_node *scan;
+ ENTRY;
+
+ bucket = cache->fci_hash + (fld_cache_hash(seq) &
+ cache->fci_hash_mask);
+
+ spin_lock(&cache->fci_lock);
+ hlist_for_each_entry(flde, scan, bucket, fce_list) {
+ if (flde->fce_seq == seq) {
+ hlist_del_init(&flde->fce_list);
+ OBD_FREE_PTR(flde);
+ GOTO(out_unlock, 0);
+ }
+ }
+
+ EXIT;
+out_unlock:
+ spin_unlock(&cache->fci_lock);
+ return;
+}
+
+struct fld_cache_entry *
+fld_cache_lookup(struct fld_cache_info *cache, __u64 seq)
+{
+ struct fld_cache_entry *flde;
+ struct hlist_head *bucket;
+ struct hlist_node *scan;
+ ENTRY;
+
+ bucket = cache->fci_hash + (fld_cache_hash(seq) &
+ cache->fci_hash_mask);
+
+ spin_lock(&cache->fci_lock);
+ hlist_for_each_entry(flde, scan, bucket, fce_list) {
+ if (flde->fce_seq == seq) {
+ spin_unlock(&cache->fci_lock);
+ RETURN(flde);
+ }
+ }
+ spin_unlock(&cache->fci_lock);
+ RETURN(NULL);
+}
+#endif
+
static int fld_init(void)
{
- int i;
+ int rc = 0;
ENTRY;
-
- OBD_ALLOC_PTR(fld_cache);
- if (fld_cache == NULL)
- RETURN(-ENOMEM);
-
- spin_lock_init(&fld_cache->fci_lock);
-
- /* init fld cache info */
- fld_cache->fci_hash_mask = FLD_HTABLE_MASK;
- OBD_ALLOC(fld_cache->fci_hash, FLD_HTABLE_SIZE *
- sizeof(*fld_cache->fci_hash));
- if (fld_cache->fci_hash == NULL) {
- OBD_FREE_PTR(fld_cache);
- RETURN(-ENOMEM);
- }
- for (i = 0; i < FLD_HTABLE_SIZE; i++)
- INIT_HLIST_HEAD(&fld_cache->fci_hash[i]);
+ fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
+ if (IS_ERR(fld_cache))
+ rc = PTR_ERR(fld_cache);
- CDEBUG(D_INFO|D_WARNING, "Client FLD, cache size %d\n",
- FLD_HTABLE_SIZE);
+ if (rc != 0)
+ fld_cache = NULL;
- RETURN(0);
+ RETURN(rc);
}
-static int fld_fini(void)
+static void fld_fini(void)
{
ENTRY;
if (fld_cache != NULL) {
- OBD_FREE(fld_cache->fci_hash, FLD_HTABLE_SIZE *
- sizeof(*fld_cache->fci_hash));
- OBD_FREE_PTR(fld_cache);
+ fld_cache_fini(fld_cache);
+ fld_cache = NULL;
}
- RETURN(0);
+ EXIT;
}
static int __init fld_mod_init(void)
return;
}
+/* insert index entry and update cache */
+static int
+fld_server_create(struct lu_server_fld *fld,
+ const struct lu_context *ctx,
+ __u64 seq, mdsno_t mds)
+{
+ int rc;
+ ENTRY;
+
+ rc = fld_index_create(fld, ctx, seq, mds);
+ if (rc == 0) {
+ /* do not return result of calling fld_cache_insert()
+ * here. First of all because it may return -EEXISTS. Another
+ * reason is that, we do not want to stop proceeding because of
+ * cache errors. --umka */
+ fld_cache_insert(fld_cache, seq, mds);
+ }
+ RETURN(rc);
+}
+
+/* delete index entry and update cache */
+static int
+fld_server_delete(struct lu_server_fld *fld,
+ const struct lu_context *ctx,
+ __u64 seq)
+{
+ ENTRY;
+ fld_cache_delete(fld_cache, seq);
+ RETURN(fld_index_delete(fld, ctx, seq));
+}
+
+/* lookup in cache first and then issue index lookup */
+static int
+fld_server_lookup(struct lu_server_fld *fld,
+ const struct lu_context *ctx,
+ __u64 seq, mdsno_t *mds)
+{
+ struct fld_cache_entry *flde;
+ int rc;
+ ENTRY;
+
+ /* lookup it in the cache first */
+ flde = fld_cache_lookup(fld_cache, seq);
+ if (flde != NULL) {
+ *mds = flde->fce_mds;
+ RETURN(0);
+ }
+
+ rc = fld_index_lookup(fld, ctx, seq, mds);
+ RETURN(rc);
+}
+
static int
fld_server_handle(struct lu_server_fld *fld,
const struct lu_context *ctx,
switch (opc) {
case FLD_CREATE:
- rc = fld_index_insert(fld, ctx,
- mf->mf_seq, mf->mf_mds);
+ rc = fld_server_create(fld, ctx,
+ mf->mf_seq, mf->mf_mds);
break;
case FLD_DELETE:
- rc = fld_index_delete(fld, ctx, mf->mf_seq);
+ rc = fld_server_delete(fld, ctx, mf->mf_seq);
break;
case FLD_LOOKUP:
- rc = fld_index_lookup(fld, ctx,
- mf->mf_seq, &mf->mf_mds);
+ rc = fld_server_lookup(fld, ctx,
+ mf->mf_seq, &mf->mf_mds);
break;
default:
rc = -EINVAL;
RETURN((void *)&info->fti_rec);
}
-int fld_index_insert(struct lu_server_fld *fld,
+int fld_index_create(struct lu_server_fld *fld,
const struct lu_context *ctx,
fidseq_t seq, mdsno_t mds)
{
struct fld_cache_info {
struct hlist_head *fci_hash;
spinlock_t fci_lock;
+ int fci_size;
int fci_hash_mask;
};
FLD_LOOKUP = 2
};
-enum {
- FLD_HTABLE_BITS = 8,
- FLD_HTABLE_SIZE = (1 << FLD_HTABLE_BITS),
- FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1
-};
+#define FLD_HTABLE_SIZE 256
extern struct lu_fld_hash fld_hash[3];
extern struct fld_cache_info *fld_cache;
#ifdef __KERNEL__
#define FLD_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
-int fld_index_insert(struct lu_server_fld *fld,
+int fld_index_init(struct lu_server_fld *fld,
+ const struct lu_context *ctx);
+
+void fld_index_fini(struct lu_server_fld *fld,
+ const struct lu_context *ctx);
+
+int fld_index_create(struct lu_server_fld *fld,
const struct lu_context *ctx,
fidseq_t seq, mdsno_t mds);
const struct lu_context *ctx,
fidseq_t seq, mdsno_t *mds);
-int fld_index_init(struct lu_server_fld *fld,
- const struct lu_context *ctx);
+struct fld_cache_info *fld_cache_init(int size);
-void fld_index_fini(struct lu_server_fld *fld,
- const struct lu_context *ctx);
+void fld_cache_fini(struct fld_cache_info *cache);
+
+int fld_cache_insert(struct fld_cache_info *cache,
+ __u64 seq, __u64 mds);
+
+void fld_cache_delete(struct fld_cache_info *cache,
+ __u64 seq);
+
+struct fld_cache_entry *
+fld_cache_lookup(struct fld_cache_info *cache,
+ __u64 seq);
+
+static inline __u32 fld_cache_hash(__u64 seq)
+{
+ return (__u32)seq;
+}
#endif
#include <lustre_fld.h>
#include "fld_internal.h"
-#ifdef __KERNEL__
-static __u32
-fld_cache_hash(__u64 seq)
-{
- return (__u32)seq;
-}
-
-static int
-fld_cache_insert(struct fld_cache_info *fld_cache,
- __u64 seq, __u64 mds)
-{
- struct fld_cache_entry *flde, *fldt;
- struct hlist_head *bucket;
- struct hlist_node *scan;
- int rc = 0;
- ENTRY;
-
- OBD_ALLOC_PTR(flde);
- if (!flde)
- RETURN(-ENOMEM);
-
- bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
- fld_cache->fci_hash_mask);
-
- spin_lock(&fld_cache->fci_lock);
- hlist_for_each_entry(fldt, scan, bucket, fce_list) {
- if (fldt->fce_seq == seq)
- GOTO(exit_unlock, rc = -EEXIST);
- }
-
- INIT_HLIST_NODE(&flde->fce_list);
- flde->fce_mds = mds;
- flde->fce_seq = seq;
-
- hlist_add_head(&flde->fce_list, bucket);
-
- EXIT;
-exit_unlock:
- spin_unlock(&fld_cache->fci_lock);
- if (rc != 0)
- OBD_FREE_PTR(flde);
- return rc;
-}
-
-static void
-fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
-{
- struct fld_cache_entry *flde;
- struct hlist_head *bucket;
- struct hlist_node *scan;
- ENTRY;
-
- bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
- fld_cache->fci_hash_mask);
-
- spin_lock(&fld_cache->fci_lock);
- hlist_for_each_entry(flde, scan, bucket, fce_list) {
- if (flde->fce_seq == seq) {
- hlist_del_init(&flde->fce_list);
- GOTO(out_unlock, 0);
- }
- }
-
- EXIT;
-out_unlock:
- spin_unlock(&fld_cache->fci_lock);
- return;
-}
-
-static struct fld_cache_entry *
-fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
-{
- struct fld_cache_entry *flde;
- struct hlist_head *bucket;
- struct hlist_node *scan;
- ENTRY;
-
- bucket = fld_cache->fci_hash + (fld_cache_hash(seq) &
- fld_cache->fci_hash_mask);
-
- spin_lock(&fld_cache->fci_lock);
- hlist_for_each_entry(flde, scan, bucket, fce_list) {
- if (flde->fce_seq == seq) {
- spin_unlock(&fld_cache->fci_lock);
- RETURN(flde);
- }
- }
- spin_unlock(&fld_cache->fci_lock);
- RETURN(NULL);
-}
-#endif
-
static int
fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
{
rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
#ifdef __KERNEL__
- if (rc == 0)
- rc = fld_cache_insert(fld_cache, seq, mds);
+ if (rc == 0) {
+ /* do not return result of calling fld_cache_insert()
+ * here. First of all because it may return -EEXISTS. Another
+ * reason is that, we do not want to stop proceeding because of
+ * cache errors. --umka */
+ fld_cache_insert(fld_cache, seq, mds);
+ }
#endif
RETURN(rc);
RETURN(rc);
#ifdef __KERNEL__
- rc = fld_cache_insert(fld_cache, seq, *mds);
+ /* do not return error here as well. See previous comment in same
+ * situation in function fld_client_create(). --umka */
+ fld_cache_insert(fld_cache, seq, *mds);
#endif
RETURN(rc);